点击(此处)折叠或打开
- # two entities here that try to share a common resource, a queue
- import threading
- from threading import Thread,Event
- from queue import Queue
- import time,random
- import logging
- import sys
- logging.basicConfig(level=logging.DEBUG,format='%(asctime)s (%(threadName)-10s) %(message)s',)
- _sentinel=object()
- #_sentinel = None
- class Producer(threading.Thread):
- def __init__(self,queue):
- Thread.__init__(self)
- self.queue=queue
- self.daemon=True
- def run(self):
- for i in range(5):
- item=random.randint(0,256)
- self.queue.put(item)
- logging.debug('Producer notify: item %d appended to queue by %s\n' %(item,self.name))
- self.queue.put(_sentinel)
-
- class Consumer(threading.Thread):
- def __init__(self,queue):
- Thread.__init__(self)
- self.queue=queue
- self.daemon=True
-
- def run(self):
- while True:
- item=self.queue.get()
- if item is _sentinel:
- self.queue.put(_sentinel)
- break
- else:
- logging.debug('Consumer notify: %d poped from queue by %s' %(item,self.name))
- self.queue.task_done()
- def main():
- shared_queue=Queue()
- consumers = []
- #start the producer
- p1=Producer(shared_queue)
- p1.start()
-
- #start 3 consumer
- for i in range(0, 3):
- t=Consumer(shared_queue)
- t.start()
- consumers.append(t)
-
- #join the queue
- shared_queue.join()
-
- #stop producer
- p1.join()
-
- #stop consumers
- for t in consumers:
- t.join()
- if __name__=='__main__':
- main()
点击(此处)折叠或打开
- root@kali:/usr/local/src/py/network# ./thread_share_queue_hang.py
- 2018-09-05 20:20:01,481 (Thread-1 ) Producer notify: item 93 appended to queue by Thread-1
- 2018-09-05 20:20:01,482 (Thread-1 ) Producer notify: item 144 appended to queue by Thread-1
- 2018-09-05 20:20:01,482 (Thread-2 ) Consumer notify: 93 poped from queue by Thread-2
- 2018-09-05 20:20:01,482 (Thread-1 ) Producer notify: item 45 appended to queue by Thread-1
- 2018-09-05 20:20:01,482 (Thread-2 ) Consumer notify: 144 poped from queue by Thread-2
- 2018-09-05 20:20:01,482 (Thread-3 ) Consumer notify: 45 poped from queue by Thread-3
- 2018-09-05 20:20:01,482 (Thread-1 ) Producer notify: item 106 appended to queue by Thread-1
- 2018-09-05 20:20:01,483 (Thread-4 ) Consumer notify: 106 poped from queue by Thread-4
- 2018-09-05 20:20:01,483 (Thread-1 ) Producer notify: item 219 appended to queue by Thread-1
- 2018-09-05 20:20:01,483 (Thread-2 ) Consumer notify: 219 poped from queue by Thread-2
- ^CTraceback (most recent call last):
- File "./thread_share_queue_hang.py", line 73, in <module>
- main()
- File "./thread_share_queue_hang.py", line 63, in main
- shared_queue.join()
这段代码会hang在63行,shared_queue.join(). 为什么呢?看官方文档的解释,join方法会阻塞直到queue里面所有的item都被处理完。producer 和3个consumer 都往queue里面放了一个None, 共有4个None, 而其中三个None被用来做sentinel 来使得3个consumer线程能够结束。最后留下一个None在queue里面,而没有人来处理最后的这个None,所以会导致queue.join()会一直hang.
点击(此处)折叠或打开
- Queue.join()
- Blocks until all items in the queue have been gotten and processed.
那么问题找到了,我就重新写了这个main()部分,但是我用了不同的思路:
第一种:
main thread来检查是否只剩下最后一个线程了,如果只剩下最后一个线程(即它自己)
就从queue里面取出最后一个元素处理它.用task_done()函数。
点击(此处)折叠或打开
- def main():
- shared_queue=Queue()
- consumers = []
- #start the producer
- p1=Producer(shared_queue)
- p1.start()
-
- #start 3 consumer
- for i in range(0, 3):
- t=Consumer(shared_queue)
- t.start()
- consumers.append(t)
-
- while True:
- #if I - the main thread also am the last thread
- #print(threading.active_count())
- if threading.active_count() == 1:
- #get the last elem from the queue
- if shared_queue.get() == _sentinel:
- shared_queue.task_done()
- break
- else:
- time.sleep(0.5)
第二种:
使用qsize() 方法来检查如果queue里面只剩下一个elem,那么我处理它就可以了。
因为这时候其他线程都已经结束了。但是这个qsize() 根据文档来说并不可靠。not reliable
点击(此处)折叠或打开
- #create shared queue
- shared_queue=Queue()
- consumers = []
- #start the producer
- p1=Thread(target=producer, args=(shared_queue, ))
- p1.start()
-
- #start 3 consumer
- for i in range(0, 3):
- t=Thread(target=consumer, args=(shared_queue,))
- t.start()
- consumers.append(t)
-
- while True:
- #print(threading.active_count())
- #qsize() method of queue.Queue instance
- #Return the approximate size of the queue (not
- #use the qsize() method to justify whether there is only 1 None left in the shared queue
- if shared_queue.qsize() == 1:
- #get the last elem from the queue
- if shared_queue.get_nowait() is None:
- shared_queue.task_done()
- break
- else:
- time.sleep(0.5)
第三种:
干脆我不对queue做join操作,只要我的线程做join就可以了。
点击(此处)折叠或打开
- shared_queue=Queue()
- consumers = []
- #start the producer
- p1=Producer(shared_queue)
- p1.start()
-
- #start 3 consumer
- for i in range(0, 3):
- t=Consumer(shared_queue)
- t.start()
- consumers.append(t)
- #block until all task in shared_queue are done
- #we can not do this, because we put 4 None in the queue, but only use 3 of them
- #as sentinel for consumer, thus use shared_queue.join() will block forever
- #shared_queue.join()
-
- #stop producer
- p1.join()
-
- #stop consumers
- for t in consumers:
- t.join()
这三种写法都达到了同样的效果,但是重新看了queue的文档,让我感觉学到了新知识。