1、為甚需要進程池,線程池
?
介紹官網:https://docs.python.org/dev/library/concurrent.futures.html concurrent.futures模塊提供了高度封裝的異步調用接口 ThreadPoolExecutor:線程池,提供異步調用 ProcessPoolExecutor: 進程池,提供異步調用 Both implement the same interface, which is defined by the abstract Executor class.
?? ?
?
?
2、基本方法
1、submit(fn, *args, **kwargs) 異步提交任務2、map(func, *iterables, timeout=None, chunksize=1) 取代for循環submit的操作3、shutdown(wait=True)
相當于進程池的pool.close()+pool.join()操作
wait=True,等待池內所有任務執行完畢回收完資源后才繼續
wait=False,立即返回,并不會等待池內的任務執行完畢
但不管wait參數為何值,整個程序都會等到所有任務執行完畢
submit和map必須在shutdown之前4、result(timeout=None) 取得結果5、add_done_callback(fn) 回調函數
?
3、進程池
The ProcessPoolExecutor class is an Executor subclass that uses a pool of processes to execute calls asynchronously.
ProcessPoolExecutor uses the multiprocessing module, which allows it to side-step the Global Interpreter Lock but also means that only picklable objects can be executed and returned.class concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None) An Executor subclass that executes calls asynchronously using a pool of at most max_workers processes. If max_workers is None or not given, it will default to the number of processors on the machine.If max_workers is lower or equal to 0, then a ValueError will be raised.
?
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor import os import timedef task(name):print('%s is running 《pid: %s》' % (name, os.getpid()))time.sleep(2)if __name__ == '__main__':# p = Process(target=task, args=('子',))# p.start pool = ProcessPoolExecutor(4) # 進程池max_workers:4個for i in range(10): # 總共執行10次,每次4個進程的執行pool.submit(task, '子進程%s' % i)print('主')
?
?
?
?
4、線程池
ThreadPoolExecutor is an Executor subclass that uses a pool of threads to execute calls asynchronously. class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='') An Executor subclass that uses a pool of at most max_workers threads to execute calls asynchronously.Changed in version 3.5: If max_workers is None or not given, it will default to the number of processors on the machine, multiplied by 5, assuming that ThreadPoolExecutor is often used to overlap I/O instead of CPU work and the number of workers should be higher than the number of workers for ProcessPoolExecutor.New in version 3.6: The thread_name_prefix argument was added to allow users to control the threading. Thread names for worker threads created by the pool for easier debugging.
?
?
?
?5、map函數:取代了for+submit
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutorimport os,time,random def task(n):print('%s is runing' %os.getpid())time.sleep(random.randint(1,3))return n**2if __name__ == '__main__':executor=ThreadPoolExecutor(max_workers=3)# for i in range(11):# future=executor.submit(task,i) executor.map(task,range(1,12)) #map取代了for+submit
?
?
?6、異步調用與回調機制
(1)提交任務的兩種方式
# 提交任務的兩種方式
# 1、同步調用 提交完任務后,拿到結果,再執行下一行代碼,導致程序是串行執行
# 2、異步調用 提交完任務后,不用等待任務執行完畢
(2)同步調用
from concurrent.futures import ThreadPoolExecutor import time import random# 吃飯 def eat(name):print('%s is eat' % name)time.sleep(random.randint(1,5))ret = random.randint(7, 13) * '#'return {'name': name, 'ret': ret}# 稱重 def weight(body):name = body['name']size = len(body['ret'])print('%s 現在的體重是%s' %(name, size))if __name__ == '__main__':pool = ThreadPoolExecutor(15)rice1 = pool.submit(eat, 'alex').result() # 取得結果 # 執行函數eatweight(rice1) # 執行函數weight rice2 = pool.submit(eat, 'jack').result() weight(rice2)rice3 = pool.submit(eat, 'tom').result() weight(rice3)
(2)同步調用2
?
? (3)回調函數
?
?
(4)是鉤子函數?
鉤子函數是Windows消息處理機制的一部分,通過設置“鉤子”,應用程序可以在系統級對所有消息、事件進行過濾,訪問在正常情況下無法訪問的消息。鉤子的本質是一段用以處理系統消息的程序,通過系統調用,把它掛入系統 ---?百度百科的定義
?
對于前端來說,鉤子函數就是指再所有函數執行前,我先執行了的函數,即 鉤住 我感興趣的函數,只要它執行,我就先執行。此概念(或者說現象)跟AOP(面向切面編程)很像
?7.線程池爬蟲應用
(1)requests模塊
import requests# 輸入網址,得到網址的源代碼 response = requests.get('http://www.cnblogs.com/venicid/p/8923096.html') print(response) # 輸出<Response [200]> print(response.text) # 以文本格式輸出
?
(2)線程池爬蟲
import requests import time from concurrent.futures import ThreadPoolExecutor# 輸入網址,得到網址的源代碼 def get_code(url):print('GET ', url)response = requests.get(url)time.sleep(3)code = response.textreturn {'url': url, 'code': code}# 打印源代碼的長度 def print_len(ret):ret = ret.result()url = ret['url']code_len = len(ret['code'])print('%s length is %s' % (url, code_len))if __name__ == '__main__':url_list = ['http://www.cnblogs.com/venicid/default.html?page=2','http://www.cnblogs.com/venicid/p/8747383.html','http://www.cnblogs.com/venicid/p/8923096.html',]pool = ThreadPoolExecutor(2)for i in url_list:pool.submit(get_code, i).add_done_callback(print_len)pool.map(get_code, url_list)
?