|
1 |
| -使用信号进行线程同步 |
2 |
| -==================== |
| 1 | +使用信号量进行线程同步 |
| 2 | +====================== |
| 3 | + |
| 4 | +信号量由E.Dijkstra发明并第一次应用在操作系统中,信号量是由操作系统管理的一种抽象数据类型,用于在多线程中同步对共享资源的使用。本质上说,信号量是一个内部数据,用于标明当前的共享资源可以有多少并发读取。 |
| 5 | + |
| 6 | +同样的,在threading模块中,信号量的操作有两个函数,即 ``acquire()`` 和 ``release()`` ,解释如下: |
| 7 | + |
| 8 | +- 每当线程想要读取关联了信号量的共享资源时,必须调用 ``acquire()`` ,此操作减少信号量的内部变量, 如果此变量的值非负,那么分配该资源的权限。如果是负值,那么线程被挂起,直到有其他的线程释放资源。 |
| 9 | +- 当线程不再需要该共享资源,必须通过 ``release()`` 释放。这样,信号量的内部变量增加,在信号量等待队列中排在最前面的线程会拿到共享资源的权限。 |
| 10 | + |
| 11 | +.. image:: ../images/semaphores.png |
| 12 | + |
| 13 | +虽然表面上看信号量机制没什么明显的问题,如果信号量的等待和通知操作都是原子的,确实没什么问题。但如果不是,或者两个操作有一个终止了,就会导致糟糕的情况。 |
| 14 | + |
| 15 | +举个例子,假设有两个并发的线程,都在等待一个信号量,目前信号量的内部值为1。假设第线程A将信号量的值从1减到0,这时候控制权切换到了线程B,线程B将信号量的值从0减到-1,并且在这里被挂起等待,这时控制权回到线程A,信号量已经成为了负值,于是第一个线程也在等待。 |
| 16 | + |
| 17 | +这样的话,尽管当时的信号量是可以让线程访问资源的,但是因为费原子操作导致了所有的线程都在等待状态。 |
| 18 | + |
| 19 | +|ready| |
| 20 | +------- |
| 21 | + |
| 22 | +下面的代码展示了信号量的使用,我们有两个线程, ``producer()`` 和 ``consumer()`` ,它们使用共同的资源,即item。 ``producer()`` 的任务是生产item, ``consumer()`` 的任务是消费item。 |
| 23 | + |
| 24 | +当item还没有被生产出来, ``consumer()`` 一直等待,当item生产出来, ``producer()`` 线程通知消费者资源可以使用了。 |
| 25 | + |
| 26 | +|how| |
| 27 | +----- |
| 28 | + |
| 29 | +在以下的代码中,我们使用生产者-消费者模型展示通过信号量的同步。当生产者生产出item,便释放信号量。然后消费者拿到资源进行消费。 :: |
| 30 | + |
| 31 | + # -*- coding: utf-8 -*- |
| 32 | + |
| 33 | + """Using a Semaphore to synchronize threads""" |
| 34 | + import threading |
| 35 | + import time |
| 36 | + import random |
| 37 | + |
| 38 | + # The optional argument gives the initial value for the internal |
| 39 | + # counter; |
| 40 | + # it defaults to 1. |
| 41 | + # If the value given is less than 0, ValueError is raised. |
| 42 | + semaphore = threading.Semaphore(0) |
| 43 | + |
| 44 | + def consumer(): |
| 45 | + print("consumer is waiting.") |
| 46 | + # Acquire a semaphore |
| 47 | + semaphore.acquire() |
| 48 | + # The consumer have access to the shared resource |
| 49 | + print("Consumer notify : consumed item number %s " % item) |
| 50 | + |
| 51 | + def producer(): |
| 52 | + global item |
| 53 | + time.sleep(10) |
| 54 | + # create a random item |
| 55 | + item = random.randint(0,1000) |
| 56 | + print("producer notify : produced item number %s" % item) |
| 57 | + # Release a semaphore, incrementing the internal counter by one. |
| 58 | + # When it is zero on entry and another thread is waiting for it |
| 59 | + # to become larger than zero again, wake up that thread. |
| 60 | + semaphore.release() |
| 61 | + |
| 62 | + if __name__ == '__main__': |
| 63 | + for i in range (0,5) : |
| 64 | + t1 = threading.Thread(target=producer) |
| 65 | + t2 = threading.Thread(target=consumer) |
| 66 | + t1.start() |
| 67 | + t2.start() |
| 68 | + t1.join() |
| 69 | + t2.join() |
| 70 | + print("program terminated") |
| 71 | + |
| 72 | +程序会运行5轮,结果如下: |
| 73 | + |
| 74 | +.. image:: ../images/Page-71-Image-11.png |
| 75 | + |
| 76 | +|work| |
| 77 | +------ |
| 78 | + |
| 79 | + |
0 commit comments