1.多進程實現方式(類似于多線程)
1 import multiprocessing 2 import time,threading 3 4 def thread_run():#定義一個線程函數 5 print("我是子線程%s" %threading.get_ident()) ?#threading.get_ident()函數獲取當前線程的id 6 def run(name):#定義一個進程函數 7 time.sleep(1) 8 print("hello,我是進程%s" %name) 9 t = threading.Thread(target=thread_run(),)#在進程下運行子線程 10 t.start() 11 if __name__ == '__main__': ?#注意:啟多進程必須要加這個判斷,否則會有意想不到的錯誤 12 for i in range(3): 13 p = multiprocessing.Process(target=run,args=('bob %s' %i,)) 14 p.start() 15 p.join() 16 17 輸出: 18 hello,我是進程bob 0 19 我是子線程28176 20 hello,我是進程bob 1 21 我是子線程27016 22 hello,我是進程bob 2 23 我是子線程27516
2.進程都是由父進程創建和啟動的
1 from multiprocessing import Process 2 import os 3 4 5 def info(title): 6 print(title) 7 print('module name:', __name__) 8 print('當前進程父進程id:', os.getppid()) 9 print('當前進程 id:', os.getpid()) 10 print("\n\n") 11 12 13 def f(name): 14 info('\033[31;1mcalled from child process function f\033[0m') 15 print('hello', name) 16 17 if __name__ == '__main__': 18 info('\033[32;1mmain process line\033[0m') 19 p = Process(target=f, args=('bob',)) # 20 p.start() 21 22 輸出: 23 main process line 24 module name: __main__ 25 當前進程父進程id: 12468 #這是pycharm的進程號 26 當前進程 id: 25692 #這是第一個info函數的進程號 27 28 29 30 called from child process function f 31 module name: __mp_main__ 32 當前進程父進程id: 25692 #這是第一個info函數的進程號 33 當前進程 id: 27252 #這是f函數內調用的info函數的進程號 34 35 36 37 hello bob
3.進程間是不共享內存的(不同于線程),即進程之間是不能直接交換信息的
可以用以下方式實現進程間通信,用Queue隊列(線程中用queue)
1 import threading 2 from multiprocessing import Process ,Queue 3 4 5 def f(zi_q):#定義一個子進程函數,它往Queue里放值 6 zi_q.put([42,None,'hello']) 7 8 if __name__ == '__main__': 9 fu_q = Queue() ?#實例化父進程的一個隊列Queue 10 zi_p = Process(target=f, args=(fu_q,))#將父進程的Queue作為參數傳給子進程zi_p,這里父進程的Queue和子進程的Queue 11 #不是同一個Queue,這里fu_q作為參數傳給子進程的時候,是將fu_q 12 #復制了一份傳給了子進程,子進程將值賦給子進程的Queue后,函數內部 13 #又通過pickle形式將值傳給了父進程的Queue(fu_q) 14 zi_p.start() 15 print(fu_q.get()) #父進程獲取Queue內的值 16 zi_p.join() 17 18 19 輸出: 20 [42, None, 'hello']
4.實現不同進程間通信的另外一種方法
?(1)管道(Pipe()),類似于socket
1 from multiprocessing import Process, Pipe 2 def f(conn): 3 conn.send([42, 'hello'])#類似于socket,這里子進程發了兩次,下邊父進程就需要收兩次 4 conn.send([32, 'yyyy']) 5 print(conn.recv()) #子進程也可以接收父進程發過來的信息 6 conn.close() 7 8 if __name__ == '__main__': 9 parent_conn, child_conn = Pipe() 10 p = Process(target=f, args=(child_conn,)) 11 p.start() 12 print(parent_conn.recv())#收兩次 13 print(parent_conn.recv()) 14 parent_conn.send('我是父進程A') #父進程可以給子進程發信息 15 p.join() 16 17 輸出: 18 [42, 'hello'] 19 [32, 'yyyy'] 20 我是父進程A
5.實現進程間內存的共享(所謂共享即所有進程操作同一份列表、字典等等,其實也不真的是同一份數據,只不過是程序內部通過復制原始列表、字典給進程,進程運行以后修改列表、字典,最后合為一個列表、字典,讓人們以為是所有進程共享了相同的內存)
manager模塊可實現這一功能
1 from multiprocessing import Process, Manager 2 import os 3 def f(d, li): 4 d[os.getpid()] = os.getpid() #生成進程號字典 5 li.append(os.getpid()) #生成進程號列表 6 print(li) 7 8 if __name__ == '__main__': 9 with Manager() as manager: #等同于manager = Manager() 10 d = manager.dict() #創建一個字典,可在多個進程間共享和傳遞 11 li = manager.list()#創建一個列表,可在多個進程間共享和傳遞 12 p_list = [] 13 for i in range(5):#生成5個進程 14 p = Process(target=f, args=(d, li)) 15 p.start() 16 p_list.append(p) #將這5個進程放進一個列表里 17 for res in p_list: #讓每個進程都執行完 18 res.join() 19 print(d) 20 21 輸出: 22 [53812] 23 [53812, 51612] 24 [53812, 51612, 53520] 25 [53812, 51612, 53520, 53696] 26 [53812, 51612, 53520, 53696, 53608] 27 {53520: 53520, 51612: 51612, 53608: 53608, 53812: 53812, 53696: 53696}
6.進程鎖,防止在屏幕上或日志里打印亂了
??如這種亂:hello worhello world 8
1 from multiprocessing import Process, Lock 2 3 4 def f(l, i): 5 l.acquire()#獲得一把鎖 6 print('hello world', i) 7 l.release()#釋放一把鎖 8 if __name__ == '__main__': 9 lock = Lock() 10 for num in range(10): 11 Process(target=f, args=(lock, num)).start() 12 13 輸出: 14 hello world 2 15 hello world 1 16 hello world 4 17 hello world 3 18 hello world 8 19 hello world 0 20 hello world 7 21 hello world 5 22 hello world 6 23 hello world 9
7.很重要? 進程池????防止進程啟動太多,消耗太多資源,一般是幾個cpu就啟幾個進程
?
1 from multiprocessing import Process, Pool 2 import time 3 import os 4 5 def foo(i): 6 time.sleep(1) 7 print("正在執行子進程", os.getpid()) 8 return os.getpid() #返回的是子進程的id號 9 10 def bar(arg): #定義回調函數,參數arg是foo函數的返回值 11 print("子進程%s執行完畢,主進程%s調用的我"% (arg, os.getpid())) 12 13 if __name__ == '__main__': 14 pool = Pool(5) #允許進程池同時放入5個進程 15 print("主進程", os.getpid()) 16 for i in range(10): 17 pool.apply_async(func=foo, args=(i,), callback=bar)#異步,異步最好,callback后跟回調函數,回調函數是由主進程執行的 18 #回調函數的參數不用寫,默認就是前邊的func 19 # pool.apply(func=foo, args=(i,)) #同步,是串行的,看不出是5個5個的執行 20 print("主進程:end") 21 pool.close() #注意,這里要先close,再join,否則會有錯誤 22 pool.join()
輸出:
1 主進程 57784 2 主進程:end 3 正在執行子進程 58060 4 子進程58060執行完畢,主進程57784調用的我 5 正在執行子進程 55428 6 子進程55428執行完畢,主進程57784調用的我 7 正在執行子進程 58356 8 子進程58356執行完畢,主進程57784調用的我 9 正在執行子進程 56716 10 子進程56716執行完畢,主進程57784調用的我 11 正在執行子進程 57380 12 子進程57380執行完畢,主進程57784調用的我
13 正在執行子進程 58060 14 子進程58060執行完畢,主進程57784調用的我 15 正在執行子進程 55428 16 子進程55428執行完畢,主進程57784調用的我 17 正在執行子進程 58356 18 子進程58356執行完畢,主進程57784調用的我 19 正在執行子進程 56716 20 子進程56716執行完畢,主進程57784調用的我 21 正在執行子進程 57380 22 子進程57380執行完畢,主進程57784調用的我
?8.協程
又稱微線程,是一種用戶態的輕量級線程,協程擁有自己的寄存器和棧,協程調度切換時,將寄存器上下文和棧保存在其他地方,當切回來的時候,恢復先前保存的寄存器上下文和棧,因此,每一次協程恢復時,
都能在它離開中斷的地方繼續。
協程的好處:
- 無需線程上下文切換的開銷
- 無需原子操作鎖定及同步的開銷
- 方便切換控制流,簡化編程模型
- 高并發+高擴展性+低成本:一個CPU支持上萬的協程都不是問題。所以很適合用于高并發處理。
缺點:
- 無法利用多核資源:協程的本質是個單線程,它不能同時將 單個CPU 的多個核用上,協程需要和進程配合才能運行在多CPU上.當然我們日常所編寫的絕大部分應用都沒有這個必要,除非是cpu密集型應用。
- 進行阻塞(Blocking)操作(如IO時)會阻塞掉整個程序
用greenlet模塊實現協程(手動):
1 from greenlet import greenlet 2 def test1(): 3 print("test1") #4.打印 4 gr2.switch() #5.切換到test2,test2開始執行 5 print("test1...") #8.接上次離開的地方,開始執行,打印 6 gr2.switch() #9.切換到test2,test2開始執行 7 8 def test2(): 9 print("test2") #6.打印 10 gr1.switch() #7.切換到tets1,test1開始執行 11 print("test2...") #10.接上次離開的地方,開始執行,打印,程序結束 12 13 gr1 = greenlet(test1) #1.啟動一個協程 14 gr2 = greenlet(test2) #2.啟動一個協程 15 gr1.switch() #3.切換到test1,test1開始執行 16 17 輸出: 18 test1 19 test2 20 test1... 21 test2...
重要:用gevent模塊實現協程(自動切換)。遇到IO(程序睡覺或卡主)便自動切換到下一個協程
1 import gevent 2 3 4 def t1(): 5 print("t1 運行") #4.t1開始運行,打印 6 gevent.sleep(2) #5.切換到下一個協程,按生成的順序,切換到t2 7 #10.程序運行到這里,只花費了不到0.1秒鐘,這時t1還在睡覺,又觸發了切換操作,切換到t2 8 #13.程序運行到這里,只花費了不到0.1秒鐘,這時t1還在睡覺,又觸發了切換操作,切換到t2 9 print("再次回到t1") #16.接上次離開的地方,執行打印操作 10 11 12 def t2(): 13 print("t2 運行") #6.t2開始運行,打印 14 gevent.sleep(1) #7.切換到下一個協程,按生成的順序,切換到t3 15 #11.程序運行到這里,只花費了不到0.1秒鐘,這時t2還在睡覺,又觸發了切換操作,切換到t3 16 #14.程序運行到這里,只花費了不到0.1秒鐘,這時t2還在睡覺,又觸發了切換操作,但是t3運行 17 # 完畢,無法切換到t3,只能切換到t1,t1還在睡覺,又切換到t2,互相切換過了1秒后,t2睡覺 18 # 完畢,這時切換到t2 19 print("再次回到t2") #15.接上次離開的地方,執行打印操作,切換到t1 20 21 22 def t3(): 23 print("t3運行") #8.t3開始運行,打印 24 gevent.sleep(0) #9.雖然睡覺0秒,但是會觸發切換操作,切換到下一個協程,按生成的順序,切換到t1 25 print("再次回到t3") #12.由于t3沒有睡覺,執行打印操作。t3運行完畢,自動切換到t1 26 27 gevent.joinall([ 28 gevent.spawn(t1), #1.生成協程1 29 gevent.spawn(t2), #2.生成協程2 30 gevent.spawn(t3), #3.生成協程3 31 ]) 32 33 輸出:
t1 運行
t2 運行
t3運行
再次回到t3
再次回到t2
再次回到t1
?9.爬蟲,簡單下載網頁,使用gevent和gevent.monkey模塊(需python3.5以上,gevent在linux完美支持,在Windows可能會有問題)
1 from urllib import request 2 import gevent,time 3 from gevent import monkey 4 monkey.patch_all() #把當前程序的所有的io操作給gevent單獨的做上標記,使gevent能辨識出io操作,從而達到協程遇到io就切換的效果,不加monkey是不能識別出io的 5 6 def f(url): 7 print('GET: %s' % url) 8 resp = request.urlopen(url) 9 data = resp.read() 10 print('%d bytes received from %s.' % (len(data), url)) 11 12 urls = ['https://www.python.org/', 13 'https://www.yahoo.com/', 14 'https://github.com/' ] 15 time_start = time.time() 16 for url in urls: 17 f(url) 18 print("同步cost",time.time() - time_start) 19 async_time_start = time.time() 20 gevent.joinall([ 21 gevent.spawn(f, 'https://www.python.org/'), 22 gevent.spawn(f, 'https://www.yahoo.com/'), 23 gevent.spawn(f, 'https://github.com/'), 24 ]) 25 print("異步cost",time.time() - async_time_start)
?輸出:
1 GET: https://www.python.org/ 2 47403 bytes received from https://www.python.org/. 3 GET: https://www.yahoo.com/ 4 460629 bytes received from https://www.yahoo.com/. 5 GET: https://github.com/ 6 25734 bytes received from https://github.com/. 7 同步cost 5.380241632461548 8 GET: https://www.python.org/ 9 GET: https://www.yahoo.com/ 10 GET: https://github.com/ 11 47403 bytes received from https://www.python.org/. 12 430893 bytes received from https://www.yahoo.com/. 13 25734 bytes received from https://github.com/. 14 異步cost 1.2949371337890625
?10.基于協程的異步多并發
socket?server
1 #!usr/bin/env/python 2 # -*- coding:utf-8 -*- 3 # from wangteng 4 #通過協程實現并發 5 import sys 6 import socket 7 import time 8 import gevent 9 10 from gevent import socket,monkey 11 monkey.patch_all() 12 13 14 def server(port): 15 s = socket.socket() 16 s.bind(('0.0.0.0',port)) 17 s.listen(500)#最大連接數為500 18 while True: 19 cli, addr = s.accept() 20 gevent.spawn(handle_request, cli)# 創建協程,這里做了一個操作,將cli作為參數傳給handle_request, handle_request(cli) 21 22 23 def handle_request(conn): 24 try: 25 while True: 26 data = conn.recv(1024)#接收client的請求 27 28 with open("bbb.txt",'ab') as f:#接收文件內容,并寫入文件 29 f.write(data) 30 print("recv:", data) 31 # post = bytes('get your request: ',encoding='utf-8') 32 # post = post + data #post為要向client發送的數據,必須為bytes類型 33 conn.send(data) #向client發送數據 34 if not data: 35 conn.shutdown(socket.SHUT_WR) 36 37 except Exception as ex: 38 print(ex) 39 40 finally: 41 conn.close() 42 43 if __name__ == "__main__": 44 server(9000)
client
1 #!usr/bin/env/python 2 # -*- coding:utf-8 -*- 3 # from wangteng 4 #本客戶端可以跟異步socket_server多并發、select socket server一起用 5 import socket 6 7 host = 'localhost' 8 port = 9000 9 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 10 s.connect((host, port)) 11 while True: 12 msg = bytes(input(">>:"), encoding="utf-8") #輸入要發送的內容,必須是bytes類型 13 s.sendall(msg) #發送數據 14 # filename = input("filename:>>") #注釋的內容是可以發送文件的 15 # with open(filename,'rb') as f:#發送文件 16 # for line in f: 17 # s.sendall(line) 18 # data = s.recv(1024) 19 # print('recv:',data) 20 data = s.recv(1024)#接收數據 21 print('recv:', data.decode())#解碼可以顯示中文 22 s.close()
11.io多路復用,不是異步???基于select
server
1 #io多路復用實現并發,并不是異步 select 2 3 import select 4 import socket 5 import queue 6 7 8 server = socket.socket() 9 server.bind(('localhost',9000)) 10 server.listen(1000) 11 12 13 server.setblocking(False) #不阻塞 14 15 msg_dic = {} 16 17 inputs = [server, ]#新建鏈接的集合列表,實際情況是隨著連接的增多,列表中的conn會越來越多,而server只有一個 18 #inputs = [server,conn] #[conn,] 19 #inputs = [server,conn,conn2,conn3...] #實際情況是隨著連接的增多,列表中的conn會越來越多,而server只有一個 20 outputs = [] #要返回數據的客戶端連接的集合列表,比如conn1需要server給conn1返回請求數據,就將conn1放進outputs里 21 #outputs = [r1,] # 22 while True: 23 readable, writeable, exceptional = select.select(inputs, outputs, inputs )#select監測新建立的連接,readble是inputs列表中 24 #的元素集合,writeable是outputs列表中的元素集合,exceptional是異常連接的集合,它包含在inputs中 25 print(readable, writeable, exceptional) 26 for r in readable: 27 if r is server: #代表來了一個新連接 28 conn,addr = server.accept()#建立連接 29 print("來了個新連接",addr) 30 inputs.append(conn) #是因為這個新建立的連接還沒發數據過來,現在就接收的話程序就報錯了, 31 #所以要想實現這個客戶端發數據來時server端能知道,就需要讓select再監測這個conn,所以將新連接放到inputs里 32 msg_dic[conn] = queue.Queue() #初始化一個隊列,用來存放要返回給這個客戶端的數據 33 else: #conn2 這里不是新來了一個新連接,而是之前已經建立好的連接,是繼續要發送數據的客戶端連接 34 data = r.recv(1024) 35 print("收到數據",data.decode()) 36 msg_dic[r].put(data)#將要返回給客戶端的數據存放到隊列里 37 outputs.append(r) #將連接r 放入要返回數據的連接隊列里,如果r不是需要返回數據的連接,就可以不將r放進outputs中 38 39 for w in writeable: #要返回給客戶端的連接列表 40 data_to_client = msg_dic[w].get()#從隊列里取出要返回給客戶端的數據 41 w.send(data_to_client) #給客戶端返回數據 42 outputs.remove(w) #確保下次循環writeable的時候,不返回這個已經處理完的連接了 43 44 for e in exceptional:#有異常的客戶端連接列表,稱之為壞連接 45 if e in outputs: #如果壞連接在返回數據列表里,就將其去掉,否則會有錯誤 46 outputs.remove(e) 47 48 inputs.remove(e) #壞連接一定在inputs中,所以不需要判斷,因為所有連接都是最先存放在inputs中 49 50 del msg_dic[e] #同時,將隊列中的壞連接也去掉(如果隊列中存在的話)
client
1 #!usr/bin/env/python 2 # -*- coding:utf-8 -*- 3 # from wangteng 4 #本客戶端可以跟異步socket_server多并發、select socket server一起用 5 import socket 6 7 host = 'localhost' 8 port = 9000 9 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 10 s.connect((host, port)) 11 while True: 12 msg = bytes(input(">>:"), encoding="utf-8") #輸入要發送的內容,必須是bytes類型 13 s.sendall(msg) #發送數據 14 # filename = input("filename:>>") #注釋的內容是可以發送文件的 15 # with open(filename,'rb') as f:#發送文件 16 # for line in f: 17 # s.sendall(line) 18 # data = s.recv(1024) 19 # print('recv:',data) 20 data = s.recv(1024)#接收數據 21 print('recv:', data.decode())#解碼可以顯示中文 22 s.close()
12.??重要要會:IO多路復用之? selector
server
1 #io多路復用實現并發,并不是異步 selector最好 2 import selectors 3 import socket 4 5 sel = selectors.DefaultSelector() 6 7 8 def accept(sock, mask): 9 conn, addr = sock.accept() # 新建一個連接 10 print('accepted', conn, 'from', addr,mask) 11 conn.setblocking(False) #設置為非阻塞 12 sel.register(conn, selectors.EVENT_READ, read) #將新連接注冊到sel實例中,由sel監測新連接conn,read為回調函數,即新連接conn將會調用read函數 13 14 15 def read(conn, mask): 16 data = conn.recv(1024) # Should be ready 17 if data: 18 print('echoing', repr(data), 'to', conn) 19 conn.send(data) # Hope it won't block 20 else: #如果沒有數據接收進來 21 print('closing', conn) 22 sel.unregister(conn) #取消注冊該連接conn 23 conn.close() 24 25 26 sock = socket.socket() 27 sock.bind(('localhost', 9000)) 28 sock.listen(100) 29 sock.setblocking(False) 30 sel.register(sock, selectors.EVENT_READ, accept) #將sock注冊到sel實例中,由sel監測sock,accept為回調函數,即sock要活動的話,
?#將會調用accept函數去新建一個連接 31 32 while True: 33 events = sel.select() #默認阻塞,有活動連接就返回活動的連接列表,events是一個列表,里邊存放所有的連接 34 for key, mask in events: #mask是連接的順序數 35 callback = key.data #調用accept函數 36 callback(key.fileobj, mask) #key.fileobj= 文件句柄
client
1 #!usr/bin/env/python 2 # -*- coding:utf-8 -*- 3 # from wangteng 4 #本客戶端可以跟異步socket_server多并發、select socket server一起用 5 import socket 6 7 host = 'localhost' 8 port = 9000 9 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 10 s.connect((host, port)) 11 while True: 12 msg = bytes(input(">>:"), encoding="utf-8") #輸入要發送的內容,必須是bytes類型 13 s.sendall(msg) #發送數據 14 # filename = input("filename:>>") #注釋的內容是可以發送文件的 15 # with open(filename,'rb') as f:#發送文件 16 # for line in f: 17 # s.sendall(line) 18 # data = s.recv(1024) 19 # print('recv:',data) 20 data = s.recv(1024)#接收數據 21 print('recv:', data.decode())#解碼可以顯示中文 22 s.close()
?