首页 > 程序开发 > 移动开发 > 其他 >

linux工作队列 - workqueue_struct创建

2016-10-18

根据FLAG的不同,创建workqueue的API分好几种(见系列文章1说明),根据情况使用,但最终这些API都会调用到alloc_workqueue,这是一个宏定义,它的调用序列图如下所示:

1.创建workqueue代码分析

1.1整体代码分析

根据FLAG的不同,创建workqueue的API分好几种(见系列文章1说明),根据情况使用,但最终这些API都会调用到alloc_workqueue,这是一个宏定义,它的调用序列图如下所示:

创建wq整体序列图

这里重点介绍函数alloc_and_link_pwqs(),wq在此函数中创建:<喎"http://www.2cto.com/kf/ware/vc/" target="_blank" class="keylink">vcD4NCjxwcmUgY2xhc3M9"brush:java;"> static int alloc_and_link_pwqs(struct workqueue_struct *wq) { bool highpri = wq->flags & WQ_HIGHPRI; int cpu, ret; if (!(wq->flags & WQ_UNBOUND)) {------------------------------创建bound类型的wq wq->cpu_pwqs = alloc_percpu(struct pool_workqueue); if (!wq->cpu_pwqs) |------得到per-CPU pool_workqueue return -ENOMEM; for_each_possible_cpu(cpu) {---------------对于每个CPU,分配pool_workqueue和worker_pool struct pool_workqueue *pwq = per_cpu_ptr(wq->cpu_pwqs, cpu);--分配对应cpu的pool_workqueue struct worker_pool *cpu_pools = per_cpu(cpu_worker_pools, cpu);--分配对应cpu的worker_pool,worker_pool的初始化在系统启动时创建,并加入到全局变量cpu_worker_pools中,具体系统启动是wq的初始化分析见第二节 init_pwq(pwq, wq, &cpu_pools[highpri]); mutex_lock(&wq->mutex); link_pwq(pwq);----------------------把对应的pool_workqueue加入到wq中,即链表字段wq->pwqs中 mutex_unlock(&wq->mutex); } return 0; } else if (wq->flags & __WQ_ORDERED) {-----------------------创建orderd类型的wq,only single pwq ret = apply_workqueue_attrs(wq, ordered_wq_attrs[highpri]); /* there should only be single pwq for ordering guarantee */ WARN(!ret && (wq->pwqs.next != &wq->dfl_pwq->pwqs_node || wq->pwqs.prev != &wq->dfl_pwq->pwqs_node), "ordering guarantee broken for workqueue %s\n", wq->name); return ret; } else {-----------------------------------------------------创建unbound类型的wq return apply_workqueue_attrs(wq, unbound_std_wq_attrs[highpri]); } }

从上面分析可以清晰了解到bound类型的wq的创建过程,下面分析unbound类型的wq创建过程

1.2unbound类型的wq创建

orderd类型的和unbound类型调用接口都是apply_workqueue_attrs,不同的是attrs不一样,ordered_wq_attrs和unbound_std_wq_attrs在wq初始化时创建,具体见第二节介绍,apply_workqueue_attrs简易序列图如下:
这里写图片描述

最终调用到函数alloc_unbound_pwq,主要是要得到pool_workqueue,代码分析如下:

static struct pool_workqueue *alloc_unbound_pwq(struct workqueue_struct *wq,
                    const struct workqueue_attrs *attrs)
{
    struct worker_pool *pool;
    struct pool_workqueue *pwq;

    lockdep_assert_held(&wq_pool_mutex);

    pool = get_unbound_pool(attrs);--------pool_workqueue必须指向相应的worker_pool,得到该种属性的worker_pool 
    if (!pool)
        return NULL;

    pwq = kmem_cache_alloc_node(pwq_cache, GFP_KERNEL, pool->node);--给pool_workqueue 分配内存
    if (!pwq) {
        put_unbound_pool(pool);
        return NULL;
    }

    init_pwq(pwq, wq, pool);
    return pwq;---------------------------返回pool_workqueue 

上面的代码很简单最主要的是要进入函数get_unbound_pool中,主要是要得到worker_pool ,分析如下:

static struct worker_pool *get_unbound_pool(const struct workqueue_attrs *attrs)
{
    u32 hash = wqattrs_hash(attrs);-------根据attrs计算对应的散列值
    struct worker_pool *pool;
    int node;
    int target_node = NUMA_NO_NODE;

    lockdep_assert_held(&wq_pool_mutex);

    /* do we already have a matching pool? */
    hash_for_each_possible(unbound_pool_hash, pool, hash_node, hash) {
        if (wqattrs_equal(pool->attrs, attrs)) {
            pool->refcnt++;
            return pool;------------------从全局散列表unbound_pool_hash中,根据attrs对比找到存在的worker_pool,找到就返回
        }
    }
    //走到这一步说明没有找到存在的worker_pool,那么下面就得新创建一个
    /* if cpumask is contained inside a NUMA node, we belong to that node */
    if (wq_numa_enabled) {
        for_each_node(node) {
            if (cpumask_subset(attrs->cpumask,
                       wq_numa_possible_cpumask[node])) {
                target_node = node;
                break;
            }
        }
    }

    /* nope, create a new one */
    pool = kzalloc_node(sizeof(*pool), GFP_KERNEL, target_node);--创建一个worker_pool 
    if (!pool || init_worker_pool(pool) < 0)
        goto fail;

    lockdep_set_subclass(&pool->lock, 1);   /* see put_pwq() */
    copy_workqueue_attrs(pool->attrs, attrs);---------------------copy属性给worker_pool
    pool->node = target_node;

    /*
     * no_numa isn&#39;t a worker_pool attribute, always clear it.  See
     * &#39;struct workqueue_attrs&#39; comments for detail.
     */
    pool->attrs->no_numa = false;

    if (worker_pool_assign_id(pool) < 0)
        goto fail;

    /* create and start the initial worker */
    if (!create_worker(pool))-------------------------------------创建工作者线程,详细分析见第三节
        goto fail;

    /* install */
    hash_add(unbound_pool_hash, &pool->hash_node, hash);----------加入到全局unbound_pool_hash散列表中

    return pool;
fail:
    if (pool)
        put_unbound_pool(pool);
    return NULL;
}

2.系统启动时wq的初始化

系统初始化的时候会调用early_initcall(init_workqueues),下面来分析一下init_workqueues:

static int __init init_workqueues(void)
{
    int std_nice[NR_STD_WORKER_POOLS] = { 0, HIGHPRI_NICE_LEVEL };
    int i, cpu;

    WARN_ON(__alignof__(struct pool_workqueue) < __alignof__(long long));

    BUG_ON(!alloc_cpumask_var(&wq_unbound_cpumask, GFP_KERNEL));
    cpumask_copy(wq_unbound_cpumask, cpu_possible_mask);

    pwq_cache = KMEM_CACHE(pool_workqueue, SLAB_PANIC);-----初始化pwq_cache,具体内存这块以后再补

    cpu_notifier(workqueue_cpu_up_callback, CPU_PRI_WORKQUEUE_UP);
    hotcpu_notifier(workqueue_cpu_down_callback, CPU_PRI_WORKQUEUE_DOWN);

    wq_numa_init();
    //对每个cpu初始化worker_pool 
    /* initialize CPU pools */
    for_each_possible_cpu(cpu) {
        struct worker_pool *pool;

        i = 0;
        for_each_cpu_worker_pool(pool, cpu) {
            BUG_ON(init_worker_pool(pool));
            pool->cpu = cpu;
            cpumask_copy(pool->attrs->cpumask, cpumask_of(cpu));
            pool->attrs->nice = std_nice[i++];
            pool->node = cpu_to_node(cpu);

            /* alloc pool ID */
            mutex_lock(&wq_pool_mutex);
            BUG_ON(worker_pool_assign_id(pool));
            mutex_unlock(&wq_pool_mutex);
        }
    }
    //对每个cpu创建工作者线程池
    /* create the initial worker */
    for_each_online_cpu(cpu) {
        struct worker_pool *pool;

        for_each_cpu_worker_pool(pool, cpu) {
            pool->flags &= ~POOL_DISASSOCIATED;
            BUG_ON(!create_worker(pool));
        }
    }
    //创建默认的unbound and ordered wq attrs
    /* create default unbound and ordered wq attrs */
    for (i = 0; i < NR_STD_WORKER_POOLS; i++) {
        struct workqueue_attrs *attrs;

        BUG_ON(!(attrs = alloc_workqueue_attrs(GFP_KERNEL)));
        attrs->nice = std_nice[i];
        unbound_std_wq_attrs[i] = attrs;

        /*
         * An ordered wq should have only one pwq as ordering is
         * guaranteed by max_active which is enforced by pwqs.
         * Turn off NUMA so that dfl_pwq is used for all nodes.
         */
        BUG_ON(!(attrs = alloc_workqueue_attrs(GFP_KERNEL)));
        attrs->nice = std_nice[i];
        attrs->no_numa = true;
        ordered_wq_attrs[i] = attrs;
    }
    //创建workqueue_struct,一般我们不会另外再创建新的workqueue_struct,开发者使用这些就可以了
    system_wq = alloc_workqueue("events", 0, 0);
    system_highpri_wq = alloc_workqueue("events_highpri", WQ_HIGHPRI, 0);
    system_long_wq = alloc_workqueue("events_long", 0, 0);
    system_unbound_wq = alloc_workqueue("events_unbound", WQ_UNBOUND,
                        WQ_UNBOUND_MAX_ACTIVE);
    system_freezable_wq = alloc_workqueue("events_freezable",
                          WQ_FREEZABLE, 0);
    system_power_efficient_wq = alloc_workqueue("events_power_efficient",
                          WQ_POWER_EFFICIENT, 0);
    system_freezable_power_efficient_wq = alloc_workqueue("events_freezable_power_efficient",
                          WQ_FREEZABLE | WQ_POWER_EFFICIENT,
                          0);
    BUG_ON(!system_wq || !system_highpri_wq || !system_long_wq ||
           !system_unbound_wq || !system_freezable_wq ||
           !system_power_efficient_wq ||
           !system_freezable_power_efficient_wq);

    wq_watchdog_init();

    return 0;
}

3.创建工作者worker线程池

下面分析函数create_worker:

static struct worker *create_worker(struct worker_pool *pool)
{
    struct worker *worker = NULL;
    int id = -1;
    char id_buf[16];

    /* ID is needed to determine kthread name */
    id = ida_simple_get(&pool->worker_ida, 0, 0, GFP_KERNEL);
    if (id < 0)
        goto fail;

    worker = alloc_worker(pool->node);-----------分配一个工作者worker 
    if (!worker)
        goto fail;

    worker->pool = pool;
    worker->id = id;

    if (pool->cpu >= 0)
        snprintf(id_buf, sizeof(id_buf), "%d:%d%s", pool->cpu, id,
             pool->attrs->nice < 0  ? "H" : "");
    else
        snprintf(id_buf, sizeof(id_buf), "u%d:%d", pool->id, id);

    worker->task = kthread_create_on_node(worker_thread, worker, pool->node,
                          "kworker/%s", id_buf);---为工作者创建线程
    if (IS_ERR(worker->task))
        goto fail;

    set_user_nice(worker->task, pool->attrs->nice);
    kthread_bind_mask(worker->task, pool->attrs->cpumask);

    /* successful, attach the worker to the pool */
    worker_attach_to_pool(worker, pool);------------把工作者加入到工作者线程池worker_pool 

    /* start the newly created worker */
    spin_lock_irq(&pool->lock);
    worker->pool->nr_workers++;
    worker_enter_idle(worker);
    wake_up_process(worker->task);-----------------唤醒工作者
    spin_unlock_irq(&pool->lock);

    return worker;---------------------------------返回worker_pool 

fail:
    if (id >= 0)
        ida_simple_remove(&pool->worker_ida, id);
    kfree(worker);
    return NULL;
相关文章
最新文章
热点推荐