日本综合一区二区|亚洲中文天堂综合|日韩欧美自拍一区|男女精品天堂一区|欧美自拍第6页亚洲成人精品一区|亚洲黄色天堂一区二区成人|超碰91偷拍第一页|日韩av夜夜嗨中文字幕|久久蜜综合视频官网|精美人妻一区二区三区

RELATEED CONSULTING
相關(guān)咨詢(xún)
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時(shí)間:8:30-17:00
你可能遇到了下面的問(wèn)題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營(yíng)銷(xiāo)解決方案
一篇文章帶你了解Python的分布式進(jìn)程接口

一篇文章帶你了解python的分布式進(jìn)程接口

作者:Go進(jìn)階者 2021-05-12 16:09:18

開(kāi)發(fā)

后端

分布式 本文基于Python基礎(chǔ),Python的分布式進(jìn)程接口簡(jiǎn)單,封裝良好,適合需要把繁重任務(wù)分布到多臺(tái)機(jī)器的環(huán)境下。通過(guò)講解Queue的作用是用來(lái)傳遞任務(wù)和接收結(jié)果。

一、前言

在Thread和Process中,應(yīng)當(dāng)優(yōu)選Process,因?yàn)镻rocess更穩(wěn)定,而且,Process可以分布到多臺(tái)機(jī)器上,而Thread最多只能分布到同一臺(tái)機(jī)器的多個(gè)CPU上。

Python的multiprocessing模塊不但支持多進(jìn)程,其中managers子模塊還支持把多進(jìn)程分布到多臺(tái)機(jī)器上。可以寫(xiě)一個(gè)服務(wù)進(jìn)程作為調(diào)度者,將任務(wù)分布到其他多個(gè)進(jìn)程中,依靠網(wǎng)絡(luò)通信進(jìn)行管理。

二、案例分析

在做爬蟲(chóng)程序時(shí),抓取某個(gè)網(wǎng)站的所有圖片,如果使用多進(jìn)程的話(huà),一般是一個(gè)進(jìn)程負(fù)責(zé)抓取圖片的鏈接地址,將鏈接地址放到queue中,另外的進(jìn)程負(fù)責(zé) 從queue中取鏈接地址進(jìn)行下載和存儲(chǔ)到本地。

怎么用分布式進(jìn)程實(shí)現(xiàn)?

一臺(tái)機(jī)器上的進(jìn)程負(fù)責(zé)抓取鏈接地址,其他機(jī)器上的進(jìn)程負(fù)責(zé)系在存儲(chǔ)。那么遇到的主要問(wèn)題是將queue 暴露到網(wǎng)絡(luò)中,讓其他機(jī)器進(jìn)程都可以訪問(wèn),分布式進(jìn)程就是將這個(gè)過(guò)程進(jìn)行了封裝,可以將這個(gè)過(guò)程稱(chēng)為本地隊(duì)列的網(wǎng)絡(luò)化。

例:

1.py

  
 
 
 
  1. from multiprocessing.managers import BaseManager 
  2. from multiprocessing import freeze_support, Queue 
  3. # 任務(wù)個(gè)數(shù) 
  4. task_number = 10 
  5.  
  6. # 收發(fā)隊(duì)列 
  7. task_quue = Queue(task_number) 
  8. result_queue = Queue(task_number) 
  9.  
  10. def get_task(): 
  11.     return task_quue 
  12.  
  13. def get_result(): 
  14.     return result_queue 
  15. # 創(chuàng)建類(lèi)似的queueManager 
  16. class QueueManager(BaseManager): 
  17.     pass 
  18.  
  19. def win_run(): 
  20.     # 注冊(cè)在網(wǎng)絡(luò)上,callable 關(guān)聯(lián)了Queue 對(duì)象 
  21.     # 將Queue對(duì)象在網(wǎng)絡(luò)中暴露 
  22.     # window下綁定調(diào)用接口不能直接使用lambda,所以只能先定義函數(shù)再綁定 
  23.     QueueManager.register('get_task_queue', callable=get_task) 
  24.     QueueManager.register('get_result_queue', callable=get_result) 
  25.     # 綁定端口和設(shè)置驗(yàn)證口令 
  26.     manager = QueueManager(address=('127.0.0.1', 8001), authkey='qiye'.encode()) 
  27.     # 啟動(dòng)管理,監(jiān)聽(tīng)信息通道 
  28.     manager.start() 
  29.  
  30.     try: 
  31.         # 通過(guò)網(wǎng)絡(luò)獲取任務(wù)隊(duì)列和結(jié)果隊(duì)列 
  32.         task = manager.get_task_queue() 
  33.         result = manager.get_result_queue() 
  34.  
  35.         # 添加任務(wù) 
  36.         for url in ["ImageUrl_" + str(i) for i in range(10)]: 
  37.             print('url is %s' % url) 
  38.             task.put(url) 
  39.              
  40.         print('try get result') 
  41.         for i in range(10): 
  42.             print('result is %s' % result.get(timeout=10)) 
  43.  
  44.     except: 
  45.         print('Manager error') 
  46.     finally: 
  47.         manager.shutdown() 
  48.  
  49. if __name__ == '__main__': 
  50.     freeze_support() 
  51.     win_run() 

連接服務(wù)器,端口和驗(yàn)證口令注意保持與服務(wù)器進(jìn)程中完全一致從網(wǎng)絡(luò)獲取Queue,進(jìn)行本地化,從task隊(duì)列獲取任務(wù),并且把結(jié)果寫(xiě)入result隊(duì)列

2.py

  
 
 
 
  1. #coding:utf-8 
  2. import time 
  3. from multiprocessing.managers import BaseManager 
  4. # 創(chuàng)建類(lèi)似的Manager: 
  5. class Manager(BaseManager): 
  6.     pass 
  7. #使用QueueManager注冊(cè)獲取Queue的方法名稱(chēng) 
  8. Manager.register('get_task_queue') 
  9. Manager.register('get_result_queue') 
  10. #連接到服務(wù)器: 
  11. server_addr = '127.0.0.1' 
  12. print('Connect to server %s...' % server_addr) 
  13. # 端口和驗(yàn)證口令注意保持與服務(wù)進(jìn)程設(shè)置的完全一致: 
  14. m = Manager(address=(server_addr, 8001), authkey='qiye') 
  15. # 從網(wǎng)絡(luò)連接: 
  16. m.connect() 
  17. #獲取Queue的對(duì)象: 
  18. task = m.get_task_queue() 
  19. result = m.get_result_queue() 
  20. #從task隊(duì)列取任務(wù),并把結(jié)果寫(xiě)入result隊(duì)列: 
  21. while(not task.empty()): 
  22.         image_url = task.get(True,timeout=5) 
  23.         print('run task download %s...' % image_url) 
  24.         time.sleep(1) 
  25.         result.put('%s--->success'%image_url) 
  26. #結(jié)束: 
  27. print('worker exit.') 

任務(wù)進(jìn)程要通過(guò)網(wǎng)絡(luò)連接到服務(wù)進(jìn)程,所以要指定服務(wù)進(jìn)程的IP。

運(yùn)行結(jié)果如下:

獲取圖片地址,將地址傳到2.py。

接收1.py傳遞的地址,進(jìn)行圖片的下載,控制臺(tái)顯示爬取結(jié)果。

三、總結(jié)

本文基于Python基礎(chǔ),Python的分布式進(jìn)程接口簡(jiǎn)單,封裝良好,適合需要把繁重任務(wù)分布到多臺(tái)機(jī)器的環(huán)境下。通過(guò)講解Queue的作用是用來(lái)傳遞任務(wù)和接收結(jié)果。

歡迎大家積極嘗試,有時(shí)候看到別人實(shí)現(xiàn)起來(lái)很簡(jiǎn)單,但是到自己動(dòng)手實(shí)現(xiàn)的時(shí)候,總會(huì)有各種各樣的問(wèn)題,切勿眼高手低,勤動(dòng)手,才可以理解的更加深刻。


網(wǎng)站欄目:一篇文章帶你了解Python的分布式進(jìn)程接口
網(wǎng)站URL:http://www.dlmjj.cn/article/dpcpidp.html