多线程测试
import threading
import timeclass A(threading.Thread):def __init__(self):threading.Thread.__init__(self)def run(self):for i in range(3):print("我是线程A")time.sleep(1)class B(threading.Thread):def __init__(self):threading.Thread.__init__(self)def run(self):for i in range(3):print("我是线程B")time.sleep(1)
t1 = A()
t1.start()
t2 = B()
t2.start()
结果:
我是线程A
我是线程B
我是线程B
我是线程A
我是线程B
我是线程A
可以看到,两个线程交替执行
队列测试
import queuea = queue.Queue()
for i in range(3):a.put("hello")a.task_done()for i in range(4):print(a.get())
结果
hello
hello
hello
注意,这里线程一直没有结束,因为队列中总共有3个,但是出队要有4个,所以等3个都出来后就阻塞了