stop_machine用于停止整个机器,并在该状态下由一个CPU或指定范围的多个CPU执行指定的函数。可以看作一个很重的锁,这个锁对应的读临界区是所有关闭抢占的代码区域(比如自旋锁的临界区就是关闭抢占的)。
该调用会在每个CPU上调度一个内核线程,每个内核线程都会占据该CPU且禁用中断,并在所有CPU都被占据且禁用中断状态下由一个或指定范围的多个CPU执行由参数传入的函数指针。函数指针执行时,所有的CPU都不会持有自旋锁或处于关闭抢占区域。

代码内核版本: 3.10.0-862.el7.x86_64。只关注配置了CONFIG_STOP_MACHINE及CONFIG_SMP的情况

内核线程部分可参考Linux kernel thread 内核线程

关系图

stop_machine主要涉及下面四个相关的部分

  • smp_hotplug_thread结构与percpu线程
    该结构描述了一个支持cpu热插拔的percpu线程,该结构体实例初始化后会为每一个cpu创建一个绑定该cpu的内核线程。
  • cpu_stop_threads与migration线程
    cpu_stop_threads是一个smp_hotplug_threads结构体实例,该实例对应的percpu线程即为migration线程。
  • stop_cpus与cpu_stop_work结构体
    stop_cpus函数借助cpu_stop_work结构体与migration线程,达成使指定范围cpu在关闭抢占状态下执行指定函数的功能。
  • stop_machine、multi_cpu_stop与multi_stop_data结构体
    stop_machine函数借助stop_cpus函数,在所有在线cpu关闭抢占状态下执行multi_cpu_stop函数,参数为multi_stop_data实例msdata。multi_cpu_stop函数占据所有在线cpu,并使用一个简单的状态机使所有在线cpu同步在关闭中断状态,在这个状态下由另外指定的一个或多个CPU执行一个指定的函数。

下图展示了各个函数与结构体实例的关系,其中数字标号描述的是时间或者说逻辑顺序关系

  1. cpu_stop_init做了migration线程初始化
  2. migration线程运行在smpboot_thread_fn函数中等待
  3. stop_machine调用
  4. migration线程运行到cpu_stopper_thread
  5. migration线程继续运行到multi_cpu_stop
  6. stop_machine结束返回

stop_machine

相关结构体及变量

cpu_stopper,percpu变量,也就是每个CPU有自己的一块内存,每个percpu变量在每块内存有固定的偏移地址。
比如smp_hotplug_thread结构体变量中成员store存储的就是cpu_stopper.thread的内存在每块percpu内存的偏移地址。

cpu_stopper
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/* the actual stopper, one per every possible cpu, enabled on online cpus */   
struct cpu_stopper {
// 每个cpu对应的内核线程,名字为"migration/序号"
struct task_struct *thread;

spinlock_t lock;
bool enabled; /* is this stopper enabled? */
// 排队等待执行的work链表,由上面lock保护
struct list_head works; /* list of pending works */

// stop_cpus使用该成员挂载到works链表,被stop_cpus_mutex保护
struct cpu_stop_work stop_work; /* for stop_cpus */
};

static DEFINE_PER_CPU(struct cpu_stopper, cpu_stopper);

smp_hotplug_thread这个结构体是一个CPU热插拔相关的线程描述,随着CPU热插拔而为每个CPU对应创建或销毁一个内核线程。
比如这里用到的migration线程,其他的比如ksoftirqd、watchdog等。
这里的各个回调函数在percpu线程各个阶段会被调用到。

cpu_stop_threads
1
2
3
4
5
6
7
8
9
10
11
static struct smp_hotplug_thread cpu_stop_threads = {
.store = &cpu_stopper.thread,
.thread_should_run = cpu_stop_should_run,
.thread_fn = cpu_stopper_thread,
.thread_comm = "migration/%u",
.create = cpu_stop_create,
.setup = cpu_stop_unpark,
.park = cpu_stop_park,
.pre_unpark = cpu_stop_unpark,
.selfparking = true,
};

stop_machine参数中的fn、data、cpus设置进msdata的fn、data、active_cpus

multi_stop_data变量名一般为msdata
1
2
3
4
5
6
7
8
9
10
11
12
13
14
struct multi_stop_data {                                                   
int (*fn)(void *);
void *data;
/* Like num_online_cpus(), but hotplug cpu uses us, so we need this. */
// 记录需要停止的cpu数量,用于multi_stop_cpu中简单状态机每次状态更新后对thread_ack的重置
unsigned int num_threads;
// active_cpus用掩码表示需要执行fn的cpu范围,这个与需要停止的cpu范围不同。
const struct cpumask *active_cpus;

// 状态机的当前状态
enum multi_stop_state state;
// 记录当前状态下剩余的cpu数量,最后一个度过当前状态的cpu负责更新状态和thread_ack重置
atomic_t thread_ack;
};

migration线程初始化

stop_machine相关的初始化由函数cpu_stop_init完成

cpu_stop_init
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
static int __init cpu_stop_init(void)                             
{
unsigned int cpu;

// 遍历每个可能的CPU,为per_cpu变量cpu_stopper做初始化
for_each_possible_cpu(cpu) {
struct cpu_stopper *stopper = &per_cpu(cpu_stopper, cpu);

spin_lock_init(&stopper->lock);
INIT_LIST_HEAD(&stopper->works);
}
// 注册percpu线程,并为每个CPU启动对应线程。后续出现CPU热插拔也会对应处理
BUG_ON(smpboot_register_percpu_thread(&cpu_stop_threads));
// 标记stop_machine需要的percpu线程初始化完成
stop_machine_initialized = true;
return 0;
}
early_initcall(cpu_stop_init);

注册percpu线程并为每个CPU启动对应线程的具体实现

smpboot_register_percpu_thread
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/**                                                                                            
* smpboot_register_percpu_thread - Register a per_cpu thread related to hotplug
* @plug_thread: Hotplug thread descriptor
*
* Creates and starts the threads on all online cpus.
*/
int smpboot_register_percpu_thread(struct smp_hotplug_thread *plug_thread)
{
unsigned int cpu;
int ret = 0;

if (!alloc_cpumask_var(&plug_thread->cpumask, GFP_KERNEL))
return -ENOMEM;
cpumask_copy(plug_thread->cpumask, cpu_possible_mask);

// 暂时阻止CPU热插拔
get_online_cpus();
mutex_lock(&smpboot_threads_lock);
for_each_online_cpu(cpu) {
// 启动该CPU对应的线程,task_struct实例地址通过上面提到的store写入对应位置
// 这里面会设置该线程对应的kthread结构体成员标记KTHREAD_IS_PER_CPU,并记录其对应CPU序号
// 如果create函数指针存在,调用该函数。这里该函数为cpu_stop_create
ret = __smpboot_create_thread(plug_thread, cpu);
if (ret) {
smpboot_destroy_threads(plug_thread);
goto out;
}
// 尝试执行pre_unpark回调函数。这里是cpu_stop_unpark了,就是给cpu_stopper成员enable设置为true
// 调用kthread_unpark。这里会根据之前设置的KTHREAD_IS_PER_CPU和CPU序号给内核线程做CPU绑定,并清除park标记
// 唤醒该线程,使其进入TASK_RUNNING状态,继续执行传入的函数指针参数,也就是函数smpboot_thread_fn
smpboot_unpark_thread(plug_thread, cpu);
}
// 将这个percpu线程描述加入链表hotplug_threads。这样CPU热插拔时就可以对这些percpu线程做对应操作。
list_add(&plug_thread->list, &hotplug_threads);
out:
mutex_unlock(&smpboot_threads_lock);
// percpu线程操作结束,可以允许CPU热插拔了
put_online_cpus();
return ret;
}
__smpboot_create_thread
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
static int __smpboot_create_thread(struct smp_hotplug_thread *ht, unsigned int cpu)        
{
struct task_struct *tsk = *per_cpu_ptr(ht->store, cpu);
struct smpboot_thread_data *td;

if (tsk)
return 0;

td = kzalloc_node(sizeof(*td), GFP_KERNEL, cpu_to_node(cpu));
if (!td)
return -ENOMEM;
td->cpu = cpu;
td->ht = ht;

// 内部通过kthread_create_on_node创建出内核线程
// 通过设置该线程对应的kthread结构体成员标记KTHREAD_IS_PER_CPU,并记录其对应CPU
// 将内核线程设置为TASK_PARKED状态
// 函数smpboot_thread_fn此时还没有开始执行
tsk = kthread_create_on_cpu(smpboot_thread_fn, td, cpu,
ht->thread_comm);
if (IS_ERR(tsk)) {
kfree(td);
return PTR_ERR(tsk);
}
get_task_struct(tsk);
// 线程的task_struct保存到相应位置
*per_cpu_ptr(ht->store, cpu) = tsk;
// 尝试执行create回调函数。这里该函数为cpu_stop_create。看起来好像是调度器相关的,不管了。
if (ht->create) {
/*
* Make sure that the task has actually scheduled out
* into park position, before calling the create
* callback. At least the migration thread callback
* requires that the task is off the runqueue.
*/
if (!wait_task_inactive(tsk, TASK_PARKED))
WARN_ON(1);
else
ht->create(cpu);
}
return 0;
}

get_online_cpus用于阻止put_online_cpus前的CPU热插拔操作。

get_online_cpus
1
2
3
4
5
6
7
8
9
10
11
12
void get_online_cpus(void)                   
{
// 如果调用发生在原子上下文(不可调度)会打印栈信息做警告。主动尝试调度
might_sleep();
if (cpu_hotplug.active_writer == current)
return;
cpuhp_lock_acquire_read();
mutex_lock(&cpu_hotplug.lock);
// 增加引用计数,防止后续有CPU热插拔情况
cpu_hotplug.refcount++;
mutex_unlock(&cpu_hotplug.lock);
}

smpboot_thread_fn是一个循环,检查需要处理各种状态。需要执行percpu线程操作时,调用percpu线程描述结构的thread_fn回调函数

smpboot_thread_fn
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
/**                                                               
* smpboot_thread_fn - percpu hotplug thread loop function
* @data: thread data pointer
*
* Checks for thread stop and park conditions. Calls the necessary
* setup, cleanup, park and unpark functions for the registered
* thread.
*
* Returns 1 when the thread should exit, 0 otherwise.
*/
static int smpboot_thread_fn(void *data)
{
struct smpboot_thread_data *td = data;
struct smp_hotplug_thread *ht = td->ht;

while (1) {
set_current_state(TASK_INTERRUPTIBLE);
preempt_disable();
if (kthread_should_stop()) {
__set_current_state(TASK_RUNNING);
preempt_enable();
if (ht->cleanup)
ht->cleanup(td->cpu, cpu_online(td->cpu));
kfree(td);
return 0;
}

if (kthread_should_park()) {
__set_current_state(TASK_RUNNING);
preempt_enable();
if (ht->park && td->status == HP_THREAD_ACTIVE) {
BUG_ON(td->cpu != smp_processor_id());
ht->park(td->cpu);
td->status = HP_THREAD_PARKED;
}
kthread_parkme();
/* We might have been woken for stop */
continue;
}

BUG_ON(td->cpu != smp_processor_id());

/* Check for state change setup */
switch (td->status) {
// 初始为该状态
case HP_THREAD_NONE:
__set_current_state(TASK_RUNNING);
preempt_enable();
// 这里cpu_stop_threads.setup为cpu_stop_unpark
if (ht->setup)
ht->setup(td->cpu);
td->status = HP_THREAD_ACTIVE;
continue;

case HP_THREAD_PARKED:
__set_current_state(TASK_RUNNING);
preempt_enable();
if (ht->unpark)
ht->unpark(td->cpu);
td->status = HP_THREAD_ACTIVE;
continue;
}

// 这里是HP_THREAD_ACTIVE状态了
// cpu_stop_threads.thread_should_runw为cpu_stop_should_run
if (!ht->thread_should_run(td->cpu)) {
// 该CPU对应的cpu_stopper.works链表空,没活干,就以TASK_INTERRUPTIBLE状态调度走,不占据CPU
preempt_enable_no_resched();
schedule();
} else {
// 该CPU对应的cpu_stooper.works链表不为空,有活干
// 更新为TASK_RUNNING转台。调用thread_fn回调函数,cpu_stop_threads.thread_fn为cpu_stopper_thread
__set_current_state(TASK_RUNNING);
preempt_enable();
ht->thread_fn(td->cpu);
}
}
}

stop_machine调用

在每个CPU上调度一个内核线程,每个内核线程都会禁用中断。这样当函数指针参数fn运行时,任何CPU都不会持有spinlock或处于关闭抢占区域。

stop_machine
1
2
3
4
5
6
7
8
9
10
11
12
13
int stop_machine(int (*fn)(void *), void *data, const struct cpumask *cpus)
{
int ret;

/* No CPUs can come up or down during this. */
// 暂时阻止CPU热插拔
get_online_cpus();
// 不考虑CPU热插拔的stop_machine实现
ret = __stop_machine(fn, data, cpus);
// 恢复CPU热插拔
put_online_cpus();
return ret;
}

有一些场景是已经调用过get_online_cpus的,因此单独封装了__stop_machine这个函数

__stop_machine
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
int __stop_machine(int (*fn)(void *), void *data, const struct cpumask *cpus)
{
// 栈上的msdata,multi_cpu_stop函数会接收其指针作为参数
struct multi_stop_data msdata = {
.fn = fn,
.data = data,
.num_threads = num_online_cpus(),
.active_cpus = cpus,
};

// 如果在cpu_stop_init初始化完成之前,就是直接关中断执行fn。正常不会进入这里
if (!stop_machine_initialized) {
/*
* Handle the case where stop_machine() is called
* early in boot before stop_machine() has been
* initialized.
*/
unsigned long flags;
int ret;

WARN_ON_ONCE(msdata.num_threads != 1);

local_irq_save(flags);
hard_irq_disable();
ret = (*fn)(data);
local_irq_restore(flags);

return ret;
}

/* Set the initial state and stop all online cpus. */
// 初始化msdata状态机相关
set_state(&msdata, MULTI_STOP_PREPARE);
// stop_cpus参数cpu_online_mask。停止所有在线的cpu的其他工作,调用函数multi_cpu_stop,函数参数为msdata
return stop_cpus(cpu_online_mask, multi_cpu_stop, &msdata);
}
multi_stop_state
1
2
3
4
5
6
7
8
9
10
11
12
13
/* This controls the threads on each CPU. */
enum multi_stop_state {
/* Dummy starting state for thread. */
MULTI_STOP_NONE,
/* Awaiting everyone to be scheduled. */
MULTI_STOP_PREPARE,
/* Disable interrupts. */
MULTI_STOP_DISABLE_IRQ,
/* Run the function */
MULTI_STOP_RUN,
/* Exit */
MULTI_STOP_EXIT,
};
stop_cpus
1
2
3
4
5
6
7
8
9
10
11
12
13
int stop_cpus(const struct cpumask *cpumask, cpu_stop_fn_t fn, void *arg)
{
int ret;

/* static works are used, process one request at a time */
// 同一时间只允许有一个stop_cpus运行
// 因为这里面调用的函数queue_stop_cpus_work用到了cpu_stopper.stop_work
// 将其插入了cpu_stopper.works链表,在运行完成前,stop_work这个成员不能被重复使用。
mutex_lock(&stop_cpus_mutex);
ret = __stop_cpus(cpumask, fn, arg);
mutex_unlock(&stop_cpus_mutex);
return ret;
}
__stop_cpus
1
2
3
4
5
6
7
8
9
10
11
12
13
static int __stop_cpus(const struct cpumask *cpumask,     
cpu_stop_fn_t fn, void *arg)
{
struct cpu_stop_done done;

// 初始化栈上变量,包括待执行cpu的计数和completion结构
cpu_stop_init_done(&done, cpumask_weight(cpumask));
// 遍历需要停止的cpu,设置其cpu_stopper.stop_work,插入待执行链表cpu_stopper.works
queue_stop_cpus_work(cpumask, fn, arg, &done);
// 等待completion
wait_for_completion(&done.completion);
return done.executed ? done.ret : -ENOENT;
}
queue_stop_cpus_work
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
static void queue_stop_cpus_work(const struct cpumask *cpumask,  
cpu_stop_fn_t fn, void *arg,
struct cpu_stop_done *done)
{
struct cpu_stop_work *work;
unsigned int cpu;

/*
* Disable preemption while queueing to avoid getting
* preempted by a stopper which might wait for other stoppers
* to enter @fn which can lead to deadlock.
*/
// 在向每个需要停止的cpu发送排队任务前关闭抢占,为了避免任务发送完之前当前上下文被抢占导致已经开始执行任务的cpu死锁等待。
// 对percpu的自旋锁按顺序对每个cpu上锁,应该是为了确保有多个排队任务时,在所有cpu上排队任务的顺序相同。
lg_global_lock(&stop_cpus_lock);
for_each_cpu(cpu, cpumask) {
work = &per_cpu(cpu_stopper.stop_work, cpu);
work->fn = fn;
work->arg = arg;
work->done = done;
cpu_stop_queue_work(cpu, work);
}
lg_global_unlock(&stop_cpus_lock);
}

回到migration线程

有活干的时候,migration线程会由smpboot_thread_fn调用cpu_stop_threads.thread_fn,也就是cpu_stopper_thread

cpu_stopper_thread
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
static void cpu_stopper_thread(unsigned int cpu)                         
{
struct cpu_stopper *stopper = &per_cpu(cpu_stopper, cpu);
struct cpu_stop_work *work;
int ret;

repeat:
work = NULL;
// 链表不为空时,从cpu_stopper.works链表取第一个
spin_lock_irq(&stopper->lock);
if (!list_empty(&stopper->works)) {
work = list_first_entry(&stopper->works,
struct cpu_stop_work, list);
list_del_init(&work->list);
}
spin_unlock_irq(&stopper->lock);

if (work) {
cpu_stop_fn_t fn = work->fn;
void *arg = work->arg;
struct cpu_stop_done *done = work->done;
char ksym_buf[KSYM_NAME_LEN] __maybe_unused;

/* cpu stop callbacks are not allowed to sleep */
preempt_disable();

// 关抢占情况下执行fn,这里是__stop_machine中设置的multi_cpu_stop,参数是栈上的msdata
ret = fn(arg);
if (ret)
done->ret = ret;

/* restore preemption and check it's still balanced */
preempt_enable();
WARN_ONCE(preempt_count(),
"cpu_stop: %s(%p) leaked preempt count\n",
kallsyms_lookup((unsigned long)fn, NULL, NULL, NULL,
ksym_buf), arg);

cpu_stop_signal_done(done, true);
goto repeat;
}
}

stop_machine参数中的fn由multi_cpu_stop调用,处于禁用中断状态,整个机器停止在multi_cpu_stop函数中。

multi_cpu_stop返回后,cpu_stopper_thread中最后调用cpu_stop_signal_done的migration线程会激活completion,等待的stop_machine调用线程被唤醒并返回。

multi_cpu_stop
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
/* This is the cpu_stop function which stops the CPU. */                  
static int multi_cpu_stop(void *data)
{
struct multi_stop_data *msdata = data;
enum multi_stop_state curstate = MULTI_STOP_NONE;
int cpu = smp_processor_id(), err = 0;
unsigned long flags;
bool is_active;

/*
* When called from stop_machine_from_inactive_cpu(), irq might
* already be disabled. Save the state and restore it on exit.
*/
local_save_flags(flags);

// 选择哪些cpu执行函数msdata->fn
// active_cpus为NULL,则只有第一个在线cpu执行fn。不为NULL则掩码内所有cpu执行fn
// 由stop_machine接收的参数决定
if (!msdata->active_cpus)
is_active = cpu == cpumask_first(cpu_online_mask);
else
is_active = cpumask_test_cpu(cpu, msdata->active_cpus);

/* Simple state machine */
// 一个简单的状态机,用于同步所有需要停止的cpu的工作阶段。
// 所有需要停止的cpu都经历过一个状态后,才会进入下一个状态,由最后一个计数的cpu做msdata的状态变更。
// 先是进入准备状态
// 关闭中断状态,每个进入此状态的cpu关闭中断
// 任务执行状态,根据前面判定的is_active决定当前cpu是否执行函数fn,stop_machine参数中的函数
// 最后是退出状态
do {
/* Chill out and ensure we re-read multi_stop_state. */
cpu_relax();
if (msdata->state != curstate) {
curstate = msdata->state;
switch (curstate) {
case MULTI_STOP_DISABLE_IRQ:
local_irq_disable();
hard_irq_disable();
break;
case MULTI_STOP_RUN:
if (is_active)
err = msdata->fn(msdata->data);
break;
default:
break;
}
// 计数和状态切换
ack_state(msdata);
} else if (curstate > MULTI_STOP_PREPARE) {
/*
* At this stage all other CPUs we depend on must spin
* in the same loop. Any reason for hard-lockup should
* be detected and reported on their side.
*/
touch_nmi_watchdog();
}
} while (curstate != MULTI_STOP_EXIT);

local_irq_restore(flags);
return err;
}