通过锁可以使数据结构线程安全(thread safe),但具体如何加锁则决定了该数据结构的效率。本章将探讨怎么给数据结构加锁,才能让该结构功能正确的同时保证高性能。
并发计数器 下面先看一个简单的非并发的 计数器,然后一步步的在此基础上构建一个并发安全且高性能的计数器:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 typedef struct counter_t { int value; } counter_t ; void init (counter_t *c) { c->value = 0 ; } void increment (counter_t *c) { c->value++; } void decrement (counter_t *c) { c->value--; } int get (counter_t *c) { return c->value; }
可以看到计数器的代码非常简单,而我们想其并发安全显而易见的方法便是在结构体中添加一把锁,在数据做修改操作的临界区代码添加上这把锁,修改后的代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 typedef struct counter_t { int value; pthread_mutex_t lock; } counter_t ; void init (counter_t *c) { c->value = 0 ; Pthread_mutex_init(&c->lock, NULL ); } void increment (counter_t *c) { Pthread_mutex_lock(&c->lock); c->value++; Pthread_mutex_unlock(&c->lock); } void decrement (counter_t *c) { Pthread_mutex_lock(&c->lock); c->value--; Pthread_mutex_unlock(&c->lock); } int get (counter_t *c) { Pthread_mutex_lock(&c->lock); int rc = c->value; Pthread_mutex_unlock(&c->lock); return rc; }
这样做在多CPU环境下性能很差,因为这种锁导致了多 CPU 情况下也只允许一个线程在运行,其他都在自旋等待,没发挥出多 CPU 的优势。相当于退化成串行,并且还增加了锁的开销。理想情况下,虽然工作量增多,但并行执行后,完成任务的时间并没有增加。
可扩展的计数器(扩展的意思是支持多 CPU) 为了解决上面的问题,有一种解决方法称为:懒惰计数器(sloppy counter)
懒惰计数器通过多个局部计数器 和一个全局计数器 来实现一个逻辑计数器,其中每个 CPU 核心有一个局部计数器。具体来说,在 4 个 CPU 的机器上,有 4 个局部计数器和 1 个全局计数器。除了这些计数器,还有锁:每个局部计数器有一个锁,全局计数器有一个。
懒惰计数器的基本思想是这样的。如果一个核心上的线程想增加计数器,那就增加它的局部计数器,访问这个局部计数器是通过对应的局部锁 同步的。因为每个 CPU 有自己的局部计数器,不同 CPU 上的线程不会竞争,所以计数器的更新操作可扩展性好。
为了保持全局计数器更新(以防某个线程要读取该值),局部值会定期转移给全局计数器 ,方法是获取全局锁 ,让全局计数器加上局部计数器的值,然后将局部计数器置零
局部转全局的频度,取决于一个阈值 ,这里称为 S(表示 sloppiness)。S 越小,懒惰计数器则越趋近于非扩展的计数器。S 越大,扩展性越强,但是全局计数器与实际计数的偏差越大。
在这个例子中,阈值 S 设置为 5,4 个 CPU 上分别有一个线程更新局部计数器 L1,…, L4。随着时间增加,全局计数器 G 的值也会记录下来。每一段时间,局部计数器可能会增加。如果局部计数值增加到阈值 S,就把局部值转移到全局计数器,局部计数器清零。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 typedef struct counter_t { int global; pthread_mutex_t glock; int local[NUMCPUS]; pthread_mutex_t llock[NUMCPUS]; int threshold; } counter_t ; void init (counter_t *c, int threshold) { c->threshold = threshold; c->global = 0 ; pthread_mutex_init(&c->glock, NULL ); int i; for (i = 0 ; i < NUMCPUS; i++) { c->local[i] = 0 ; pthread_mutex_init(&c->llock[i], NULL ); } } void update (counter_t *c, int threadID, int amt) { pthread_mutex_lock(&c->llock[threadID]); c->local[threadID] += amt; if (c->local[threadID] >= c->threshold) { pthread_mutex_lock(&c->glock); c->global += c->local[threadID]; pthread_mutex_unlock(&c->glock); c->local[threadID] = 0 ; } pthread_mutex_unlock(&c->llock[threadID]); } int get (counter_t *c) { pthread_mutex_lock(&c->glock); int val = c->global; pthread_mutex_unlock(&c->glock); return val; }
并发链表 我们和上面一样,先添加一把大锁保证了并发安全,然后再进一步探讨如何优化性能:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 typedef struct node_t { int key; struct node_t *next ; } node_t ; typedef struct list_t { node_t *head; pthread_mutex_t lock; } list_t ; void List_Init (list_t *L) { L->head = NULL ; pthread_mutex_init(&L->lock, NULL ); } int List_Insert (list_t *L, int key) { pthread_mutex_lock(&L->lock); node_t *new = malloc (sizeof (node_t )); if (new == NULL ) { perror("malloc" ); pthread_mutex_unlock(&L->lock); return -1 ; } new->key = key; new->next = L->head; L->head = new; pthread_mutex_unlock(&L->lock); return 0 ; } int List_Lookup (list_t *L, int key) { pthread_mutex_lock(&L->lock); node_t *curr = L->head; while (curr) { if (curr->key == key) { pthread_mutex_unlock(&L->lock); return 0 ; } curr = curr->next; } pthread_mutex_unlock(&L->lock); return -1 ; }
从代码中可以看出,插入函数 入口处获取锁,结束时释放锁。如果 malloc 失败(在极少的时候),会有一点小问题,在这种情况下,代码在插入失败之前,必须释放锁。
我们调整代码,让获取锁和释放锁只环绕插入代码的真正临界区(缩小临界区)。前面的方法有效是因为部分工作实际上不需要锁,假定 malloc()是线程安全的,每个线程都可以调用它,不需要担心竞争条件和其他并发缺陷。只有在更新共享列表时需要持有锁。下面展示了这些修改的细节。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 void List_Init (list_t *L) { L->head = NULL ; pthread_mutex_init(&L->lock, NULL ); } void List_Insert (list_t *L, int key) { node_t *new = malloc (sizeof (node_t )); if (new == NULL ) { perror("malloc" ); return ; } new->key = key; pthread_mutex_lock(&L->lock); new->next = L->head; L->head = new; pthread_mutex_unlock(&L->lock); } int List_Lookup (list_t *L, int key) { int rv = -1 ; pthread_mutex_lock(&L->lock); node_t *curr = L->head; while (curr) { if (curr->key == key) { rv = 0 ; break ; } curr = curr->next; } pthread_mutex_unlock(&L->lock); return rv; }
现在已经能保证这个链表的并发安全了,想要解决性能问题,可以使用一种叫过手锁(hand-over-hand locking,也叫锁耦合,lock coupling),其原理十分简单,就是每个节点都有一个锁,替代之前整个链表一个锁。遍历链表的时候,首先抢占下一个节点的锁,然后释放当前节点的锁。
从理论上来说确实有点合理,但实际上在遍历的时候,每个节点都要加锁、解锁,而这个开销也是十分巨大的,很难说比单锁的方法快。即使有大量的线程和很大的链表,也不一定就比单锁快,它只适用于十分单一且特殊的场景,或者夹杂在某些方案里面。
并发队列 现在我们知道可以用标准的方法来创建一个并发安全的数据结构:添加一把大锁。现在队列这种数据结构其实和链表很像,只是队列的操作有限制,因此大锁这种方案我们跳过 。我们来看看 Michael 和 Scott 设计的更并发的队列:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 typedef struct __node_t { int value; struct __node_t *next ; } node_t ; typedef struct queue_t { node_t *head; node_t *tail; pthread_mutex_t headLock; pthread_mutex_t tailLock; } queue_t ; void Queue_Init (queue_t *q) { node_t *tmp = malloc (sizeof (node_t )); tmp->next = NULL ; q->head = q->tail = tmp; pthread_mutex_init(&q->headLock, NULL ); pthread_mutex_init(&q->tailLock, NULL ); } void Queue_Enqueue (queue_t *q, int value) { node_t *tmp = malloc (sizeof (node_t )); assert(tmp != NULL ); tmp->value = value; tmp->next = NULL ; pthread_mutex_lock(&q->tailLock); q->tail->next = tmp; q->tail = tmp; pthread_mutex_unlock(&q->tailLock); } int Queue_Dequeue (queue_t *q, int *value) { pthread_mutex_lock(&q->headLock); node_t *tmp = q->head; node_t *newHead = tmp->next; if (newHead == NULL ) { pthread_mutex_unlock(&q->headLock); return -1 ; } *value = newHead->value; q->head = newHead; pthread_mutex_unlock(&q->headLock); free (tmp); return 0 ; }
这段代码十分巧妙,用到了两个锁和一个哨兵节点。一个锁负责队头,另一个负责队尾,使得入队出队操作可以并发执行。而哨兵节点是在初始化的时候分配的,利用它分开了头和尾的操作。
并发散列表 最后讨论的是一个广泛应用的数据结构——散列表,为了突出重点,这里不对扩缩容做深入:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 #define BUCKETS (101) typedef struct hash_t { list_t lists[BUCKETS]; } hash_t ; void Hash_Init (hash_t *H) { int i; for (i = 0 ; i < BUCKETS; i++) { List_Init(&H->lists[i]); } } int Hash_Insert (hash_t *H, int key) { int bucket = key % BUCKETS; return List_Insert(&H->lists[bucket], key); } int Hash_Lookup (hash_t *H, int key) { int bucket = key % BUCKETS; return List_Lookup(&H->lists[bucket], key); }
这个散列表使用我们上面实现的并发链表 ,性能特别好。每个散列桶(每个桶都是一个链表)都有一个锁,而不是整个散列表只有一个锁,从而支持许多并发操作,其实就是分段加锁 了。
这个简单的并发散列表扩展性(性能)极好,相较于单纯的链表。原因就是缩小了临界区,减少了不同线程间操作的冲突。
小结 并发其实并不意味着高性能,在实现并发数据结构时,先从最简单的开始(也就是加一把大锁),有性能问题的时候再做优化。关于最后一点,避免不成熟的优化 (premature optimization),对于所有关心性能的开发者都有用。我们让整个应用的某一小部分变快,却没有提高整体性能,其实没有价值