惊群现象是在多进程或者多线程场景下,多个进程或者多个线程在同一条件下睡眠,当唤醒条件发生的时候,会同时唤醒这些睡眠的进程或者线程,但是只有一个是可以成功执行的,而其他的进程或者线程被唤醒后存在着执行开销的浪费。

Linux中惊群的触发

  • 多个进程或者线程在获取同一把锁的时候睡眠
  • 多个进程或者线程同时进行accept
  • 多个进程在同一个epoll上竞争
  • 多个进程在多个epoll上对于同一个监听的socket进行accept

Linux中的解决方案

通用解决方案

Linux提供了wake_up_process方法,用于唤醒一个指定的进程。

1
2
3
4
int wake_up_process(struct task_struct *p)
{
    return try_to_wake_up(p, TASK_ALL, 0);
}

Linux系统中进程唤醒的流程为,当有多个进程同时睡眠的时候,Linux通过工作队列的方式维护这些睡眠的进程。在进程睡眠之前,会先进行一个操作:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
void
prepare_to_wait(wait_queue_head_t *q, wait_queue_t *wait, int state)
{
    unsigned long flags;

    wait->flags &= ~WQ_FLAG_EXCLUSIVE;
    spin_lock_irqsave(&q->lock, flags);
    if (list_empty(&wait->task_list))
        __add_wait_queue(q, wait);
    set_current_state(state);
    spin_unlock_irqrestore(&q->lock, flags);
}

prepare_to_wait函数主要是将当前的flags加上了WQ_FLAG_EXCLUSIVE标志,该标志意味着进程想要被独占地唤醒,然后放入到工作队列的尾部(如果含有独占标志的进程并不位于队列尾部,将导致其后的不含有该标志的进程无法执行),最后设置相应状态,如TASK_INTERUPTIBLE表示可以被wake_up唤醒。当需要唤醒进程的时候,Linux提供了__wake_up_common方法唤醒工作队列中的进程:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
/*
 * The core wakeup function. Non-exclusive wakeups (nr_exclusive == 0) just
 * wake everything up. If it's an exclusive wakeup (nr_exclusive == small +ve
 * number) then we wake all the non-exclusive tasks and one exclusive task.
 *
 * There are circumstances in which we can try to wake a task which has already
 * started to run but is not in state TASK_RUNNING. try_to_wake_up() returns
 * zero in this (rare) case, and we handle it by continuing to scan the queue.
 */
static void __wake_up_common(wait_queue_head_t *q, unsigned int mode,
            int nr_exclusive, int wake_flags, void *key)
{
    wait_queue_t *curr, *next;

    list_for_each_entry_safe(curr, next, &q->task_list, task_list) {
        unsigned flags = curr->flags;

        if (curr->func(curr, mode, wake_flags, key) &&
                (flags & WQ_FLAG_EXCLUSIVE) && !--nr_exclusive)
            break;
    }
}

__wake_up_common函数会遍历工作队列,寻找flags中含有WQ_FLAG_EXCLUSIVE标志的进程,如果没有设置独占标志,则根据mode唤醒每个睡眠的进程。nr_exclusiv表示需要唤醒的设置了独占标志进程的数目,当nr_exclusive减为零的时候,就会跳出循环,它在wake_up中设置为1,表明当处理了一个含有WQ_FLAG_EXCLUSIVE标志进程后,将不再处理,所以只能唤醒nr_exclusive个进程。

其中回调函数curr->func的回调函数是通过DEFINE_WAIT(wait)宏定义的:

1
2
3
4
5
6
7
8
#define DEFINE_WAIT_FUNC(name, function)                \
    wait_queue_t name = {                               \
        .private    = current,                          \
        .func       = function,                         \
        .task_list  = LIST_HEAD_INIT((name).task_list), \
    }

#define DEFINE_WAIT(name) DEFINE_WAIT_FUNC(name, autoremove_wake_function)

通过上面的代码可以发现回调函数为autoremove_wake_function

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
//kernel/wait.c
int autoremove_wake_function(wait_queue_t *wait, unsigned mode, int sync, void *key)
{
    int ret = default_wake_function(wait, mode, sync, key);

    if (ret)
        list_del_init(&wait->task_list);
    return ret;
}

//kernel/sched.c
int default_wake_function(wait_queue_t *curr, unsigned mode, int wake_flags,
              void *key)
{
    return try_to_wake_up(curr->private, mode, wake_flags);
}

autoremove_wake_function最终通过default_wake_function调用try_to_wake_up实现唤醒指定进程。

socket accept场景下的惊群方案

在Linux中当对服务器监听的socket进行accept操作时,假如没有新的accept事件则会进行睡眠。sys_accept调用最终会在TCP层执行inet_csk_accept函数:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
struct sock *inet_csk_accept(struct sock *sk, int flags, int *err)
{
    struct inet_connection_sock *icsk = inet_csk(sk);
    struct sock *newsk;
    int error;

    lock_sock(sk);
...
    //阻塞等待直到有全连接建立。如果用户设置了等待超时时间,超时后会退出
        error = inet_csk_wait_for_connect(sk, timeo);
...
out:
    release_sock(sk);
...
}

其中inet_csk_accept在等待accept连接到来的时候,会执行inet_csk_wait_for_connect

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
/*
 * Wait for an incoming connection, avoid race conditions. This must be called
 * with the socket locked.
 */
static int inet_csk_wait_for_connect(struct sock *sk, long timeo)
{
    struct inet_connection_sock *icsk = inet_csk(sk);
    DEFINE_WAIT(wait);
    ...
    for (;;) {
        prepare_to_wait_exclusive(sk_sleep(sk), &wait,
                      TASK_INTERRUPTIBLE);
        ...
        if (reqsk_queue_empty(&icsk->icsk_accept_queue))
            timeo = schedule_timeout(timeo);
        ...
    }
    ...
}

其中函数prepare_to_wait_exclusive的作用与之前的例子一样,此处的DEFINE_WAIT(wait)将当前的进程包装成wait_queue_t结构放入到监听socket的等待队列的尾部,然后通过schedule_timeout函数让当前进程睡眠timeo个时间。

该进程被唤醒只有:
1. 睡眠timeo后被timer定时器唤醒
2. accept事件到来
第二种唤醒场景是通过网络层的代码触发。在TCP V4协议中执行过程为:tcp_v4->tcp_v4_do_rcv->tcp_child_process,而在tcp_child_process方法中会调用父socket,也就是监听socket的parent->sk_data_ready(parent)方法,在sock_init_data的时候其定义为:sk->sk_data_ready = sock_def_readable

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
static void sock_def_readable(struct sock *sk, int len)
{
    struct socket_wq *wq;

    rcu_read_lock();
    wq = rcu_dereference(sk->sk_wq);
    if (wq_has_sleeper(wq))
        wake_up_interruptible_sync_poll(&wq->wait, POLLIN | POLLPRI |
                        POLLRDNORM | POLLRDBAND);
    sk_wake_async(sk, SOCK_WAKE_WAITD, POLL_IN);
    rcu_read_unlock();
}

sock_def_readable首先判断是否在等待队列中有睡眠进程,然后通过wake_up_interruptible_sync_poll进行唤醒。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
//include/linux/wait.h
#define wake_up_interruptible_sync_poll(x, m)               \
    __wake_up_sync_key((x), TASK_INTERRUPTIBLE, 1, (void *) (m))

//kernel/sched.c
void __wake_up_sync_key(wait_queue_head_t *q, unsigned int mode,
            int nr_exclusive, void *key)
{
    unsigned long flags;
    int wake_flags = WF_SYNC;

    if (unlikely(!q))
        return;

    if (unlikely(!nr_exclusive))
        wake_flags = 0;

    spin_lock_irqsave(&q->lock, flags);
    __wake_up_common(q, mode, nr_exclusive, wake_flags, key);
    spin_unlock_irqrestore(&q->lock, flags);
}

可以看出最终还是通过__wake_up_common执行唤醒操作,并且nr_exclusive = 1说明一次只会唤醒一个进程,不会发生惊群。

其实在inet_csk_accept时候,先进行了lock_sock(sk)操作,理论上锁住了监听的socket,每次只有一个进程可以accept。但是实际上lock_sock(sk)的时候要是没有拿到锁也会进行睡眠,加入不做特殊处理也有可能惊群。

epoll的惊群解决方案

在之前的文章linux源码角度看epoll中介绍了,在使用epoll的时候会在注册事件之后会调用epoll_wait系统调用,该方法会调用ep_poll方法:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
           int maxevents, long timeout)
{
...
    if (list_empty(&ep->rdllist)) {
        //没有事件,需要睡眠。当有事件到来时,睡眠会被ep_poll_callback函数唤醒
    //将current初始化为等待队列项wait后,放入ep->wg的等待队列中
        init_waitqueue_entry(&wait, current);
        wait.flags |= WQ_FLAG_EXCLUSIVE;
        __add_wait_queue(&ep->wq, &wait);

        for (;;) {
            //执行ep_poll_callback()时应当将当前进程唤醒,所以当前进程状态应该为“可唤醒”TASK_INTERRUPTIBLE
            set_current_state(TASK_INTERRUPTIBLE);
      //如果就绪队列不为空或者已超时则退出循环
            if (!list_empty(&ep->rdllist) || !jtimeout)
                break;
      //如果当前进程收到信号则退出循环,返回EINTR错误
            if (signal_pending(current)) {
                res = -EINTR;
                break;
            }

            spin_unlock_irqrestore(&ep->lock, flags);
      //主动让出处理器,等待ep_poll_callback()将当前进程唤醒或者超时,返回值是剩余的时间。
      //从这里开始当前进程会进入睡眠状态,直到某些文件的状态就绪或者超时。
      //当文件状态就绪时,eventpoll的回调函数ep_poll_callback()会唤醒在ep->wq指向的等待队列中的进程。
            jtimeout = schedule_timeout(jtimeout);
            spin_lock_irqsave(&ep->lock, flags);
        }
        __remove_wait_queue(&ep->wq, &wait);

        set_current_state(TASK_RUNNING);
    }
...
}

其中在没有事件需要睡眠时,通过函数__add_wait_queue将当前进程放入等待队列的对头中。此外会先将WQ_FLAG_EXCLUSIVE赋给flags,表示该睡眠进程是一个互斥进程。睡眠的当前进程会被回调函数ep_poll_callback唤醒:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key)
{
...
    if (waitqueue_active(&ep->wq))
        wake_up_locked(&ep->wq);
    if (waitqueue_active(&ep->poll_wait))
        pwake++;
...
}

#define wake_up_locked(x)       __wake_up_locked((x), TASK_NORMAL)

void __wake_up_locked(wait_queue_head_t *q, unsigned int mode)
{
    __wake_up_common(q, mode, 1, 0, NULL);
}

ep_poll_callback最终通过__wake_up_common来唤醒等待队列中的互斥进程。