线程
- 队列:先进先出
- 堆栈:后进先出
- 优先级:数字越小优先级越大,越先输出


import queueq = queue.Queue(3) # 先进先出-->队列 q.put('first') q.put(2) # q.put('third') # q.put(4) #由于没有人取走,就会卡主 q.put(4,block=False) #等同于q.get_nowait(), Ture 阻塞,Flase不阻塞,报异常满了 # # q.put(4,block=True,timeout=3)print(q.get()) print(q.get()) print(q.get()) print(q.get(block=True,timeout=3)) # 阻塞等待3秒 没有取走数据就报异常 # print(q.get(block=False)) #等同于q.get_nowait() # print(q.get_nowait()) q = queue.LifoQueue(3) #后进先出-->堆栈 q.put('first') q.put(2) q.put('third')print(q.get()) print(q.get()) print(q.get()) ''' 打印结果: third 2 first '''q = queue.PriorityQueue(3) #优先级队列 q.put((10,'one')) q.put((40,'two')) q.put((30,'three'))print(q.get()) print(q.get()) print(q.get()) ''' 数字越小优先级越高 打印结果 (10, 'one') (30, 'three') (40, 'two') '''
进程池线程池
- 池:是用来对进程(线程)的数量加以限制
- 进程池:计算密集型,用多进程
- 线程池:IO密集型,用多线程,例如:sockect网络通信就应该用多线程


from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor import os,time,random''' sockect网络通信是IO操作,所以用多线程 计算密集型:用多进程 '''def task(name):print('name:%s pid:%s run' %(name,os.getpid()))time.sleep(random.randint(1,3))if __name__ == '__main__':# pool = ProcessPoolExecutor(4) # 进程池最多装4个进程,不指定的话默认是cpu的核数pool = ThreadPoolExecutor(5)for i in range(10):pool.submit(task,'yang%s' %i) # 异步调用池子收了10个任务,但同一时间只有4个任务在进行 pool.shutdown(wait=True) # 类似join 代表往池子里面丢任务的入口关掉 计数器-1print('主') ''' 打印结果: name:yang0 pid:11120 run name:yang1 pid:11120 run name:yang2 pid:11120 run name:yang3 pid:11120 run name:yang4 pid:11120 runname:yang5 pid:11120 run name:yang6 pid:11120 run name:yang7 pid:11120 runname:yang8 pid:11120 run name:yang9 pid:11120 run 主 '''from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor from threading import currentThread import os,time,randomdef task():print('name:%s pid:%s run' %(currentThread().getName(),os.getpid()))time.sleep(random.randint(1,3))if __name__ == '__main__':# pool = ProcessPoolExecutor(4) # 进程池最多装4个进程,不指定的话默认是cpu的核数pool = ThreadPoolExecutor(5)for i in range(10):pool.submit(task) # 异步调用池子收了10个任务,但同一时间只有4个任务在进行 pool.shutdown(wait=True) # 类似join 代表往池子里面丢任务的入口关掉 计数器-1print('主') ''' 打印结果: name:ThreadPoolExecutor-0_0 pid:14052 run name:ThreadPoolExecutor-0_1 pid:14052 run name:ThreadPoolExecutor-0_2 pid:14052 run name:ThreadPoolExecutor-0_3 pid:14052 run name:ThreadPoolExecutor-0_4 pid:14052 run name:ThreadPoolExecutor-0_2 pid:14052 run name:ThreadPoolExecutor-0_1 pid:14052 run name:ThreadPoolExecutor-0_3 pid:14052 run name:ThreadPoolExecutor-0_4 pid:14052 run name:ThreadPoolExecutor-0_0 pid:14052 run 主 '''
同步调用和异步调用
提交任务的两种方式:
- 同步调用:提交完任务后,就在原地等待任务执行完毕,拿到结果,再执行下一行代码,导致程序是串行执行
- 异步调用:提交完任务后,不在原地等待任务执行完。回调机制:自动触发


#1.同步调用:提交完任务后,就在原地等待任务执行完毕,拿到结果,再执行下一行代码,导致程序是串行执行from concurrent.futures import ThreadPoolExecutor import time import randomdef la(name):print('%s is laing' %name)time.sleep(random.randint(3,5))res = random.randint(7,13)*'#'return {'name':name,'res':res}def weigh(shit):name = shit['name']size = len(shit['res'])print('%s 拉了 <%s>kg' %(name,size))if __name__ == '__main__':pool = ThreadPoolExecutor(10)shit1 = pool.submit(la,'alex').result()weigh(shit1)shit2 = pool.submit(la,'yang').result()weigh(shit2)shit3 = pool.submit(la,'hang').result()weigh(shit3) ''' 打印结果: alex is laing alex 拉了 <8>kg yang is laing yang 拉了 <8>kg hang is laing hang 拉了 <7>kg '''


#2.异步调用:提交完任务后,不在原地等待任务执行完 from concurrent.futures import ThreadPoolExecutor import time import randomdef la(name):print('%s is laing' %name)time.sleep(random.randint(3,5))res = random.randint(7,13)*'#'return {'name':name,'res':res}# weigh({'name':name,'res':res}) # 这样写,所有功能 不能体现出解耦合def weigh(shit):shit = shit.result() # 拿到是一个对象,需要进行result()name = shit['name']size = len(shit['res'])print('%s 拉了 <%s>kg' %(name,size))if __name__ == '__main__':pool = ThreadPoolExecutor(10)shit1 = pool.submit(la,'alex').add_done_callback(weigh)shit2 = pool.submit(la,'yang').add_done_callback(weigh)shit3 = pool.submit(la,'hang').add_done_callback(weigh) ''' 打印结果: alex is laing yang is laing hang is laing hang 拉了 <10>kg alex 拉了 <7>kg yang 拉了 <12>kg '''
异步调用的应用


from concurrent.futures import ThreadPoolExecutor import requests import timedef get(url):print('GET %s'%url)response = requests.get(url)time.sleep(3)return {'url':url,'content':response.text}def parse(res):res = res.result()print('%s parse res is %s' %(res['url'],len(res['content'])))if __name__ == '__main__':urls = ['http://www.cnblogs.com/linhaifeng','https://www.python.org','https://www.openstack.org',]pool = ThreadPoolExecutor(2)for url in urls:pool.submit(get,url).add_done_callback(parse) ''' 打印结果: GET http://www.cnblogs.com/linhaifeng GET https://www.python.org http://www.cnblogs.com/linhaifeng parse res is 16320 GET https://www.openstack.org https://www.python.org parse res is 49273 https://www.openstack.org parse res is 64040 '''