日本综合一区二区|亚洲中文天堂综合|日韩欧美自拍一区|男女精品天堂一区|欧美自拍第6页亚洲成人精品一区|亚洲黄色天堂一区二区成人|超碰91偷拍第一页|日韩av夜夜嗨中文字幕|久久蜜综合视频官网|精美人妻一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時(shí)間:8:30-17:00
你可能遇到了下面的問題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營銷解決方案
Python任務(wù)調(diào)度利器:APScheduler

任務(wù)調(diào)度應(yīng)用場景

創(chuàng)新互聯(lián)公司從2013年創(chuàng)立,先為天寧等服務(wù)建站,天寧等地企業(yè),進(jìn)行企業(yè)商務(wù)咨詢服務(wù)。為天寧企業(yè)網(wǎng)站制作PC+手機(jī)+微官網(wǎng)三網(wǎng)同步一站式服務(wù)解決您的所有建站問題。

所謂的任務(wù)調(diào)度是指安排任務(wù)的執(zhí)行計(jì)劃,即何時(shí)執(zhí)行,怎么執(zhí)行等。在現(xiàn)實(shí)項(xiàng)目中經(jīng)常出現(xiàn)它們的身影;特別是數(shù)據(jù)類項(xiàng)目,比如實(shí)時(shí)統(tǒng)計(jì)每5分鐘網(wǎng)站的訪問量,就需要每5分鐘定時(shí)從日志數(shù)據(jù)分析訪問量。

總結(jié)下任務(wù)調(diào)度應(yīng)用場景:

  •  離線作業(yè)調(diào)度:按時(shí)間粒度執(zhí)行某項(xiàng)任務(wù)
  •  共享緩存更新:定時(shí)刷新緩存,如redis緩存;不同進(jìn)程間的共享數(shù)據(jù)

任務(wù)調(diào)度工具

  •  linux的crontab, 支持按照分鐘/小時(shí)/天/月/周粒度,執(zhí)行任務(wù)
  •  java的Quartz
  •  windows的任務(wù)計(jì)劃

本文介紹的是python中的任務(wù)調(diào)度庫,APScheduler(advance python scheduler)。如果你了解Quartz的話,可以看出APScheduler是Quartz的python實(shí)現(xiàn);APScheduler提供了基于時(shí)間,固定時(shí)間點(diǎn)和crontab方式的任務(wù)調(diào)用方案, 可以當(dāng)作一個(gè)跨平臺的調(diào)度工具來使用。

APScheduler

組件介紹

APScheduler由5個(gè)部分組成:觸發(fā)器、調(diào)度器、任務(wù)存儲器、執(zhí)行器和任務(wù)事件。

  •  任務(wù)job:任務(wù)id和任務(wù)執(zhí)行func
  •  觸發(fā)器triggers:確定任務(wù)何時(shí)開始執(zhí)行
  •  任務(wù)存儲器job stores: 保存任務(wù)的狀態(tài)
  •  執(zhí)行器executors:確定任務(wù)怎么執(zhí)行
  •  任務(wù)事件event:監(jiān)控任務(wù)執(zhí)行異常情況
  •  調(diào)度器schedulers:串聯(lián)任務(wù)的整個(gè)生命周期,添加編輯任務(wù)到任務(wù)存儲器,在任務(wù)的執(zhí)行時(shí)間到來時(shí),把任務(wù)交給執(zhí)行器執(zhí)行返回結(jié)果;同時(shí)發(fā)出事件監(jiān)聽,監(jiān)控任務(wù)事件 。

安裝

 
 
 
 
  1. pip install apscheduler 

簡單例子

 
 
 
 
  1. from apscheduler.schedulers.background import BackgroundScheduler  
  2. from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor  
  3. from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore  
  4. from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR  
  5. import logging  
  6. import datetime  
  7. # 任務(wù)執(zhí)行函數(shù)  
  8. def job_func(job_id):  
  9.     print('job %s is runed at %s' % (job_id, datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))  
  10. # 事件監(jiān)聽  
  11. def job_exception_listener(event):  
  12.     if event.exception:  
  13.         # todo:異常處理, 告警等  
  14.         print('The job crashed :(')  
  15.     else:  
  16.         print('The job worked :)')  
  17. # 日志  
  18. logging.basicConfig()  
  19. logging.getLogger('apscheduler').setLevel(logging.DEBUG)  
  20. # 定義一個(gè)后臺任務(wù)非阻塞調(diào)度器  
  21. scheduler = BackgroundScheduler()  
  22. # 添加一個(gè)任務(wù)到內(nèi)存中   
  23. # 觸發(fā)器:trigger='interval' seconds=10 每10s觸發(fā)執(zhí)行一次  
  24. # 執(zhí)行器:executor='default' 線程執(zhí)行  
  25. # 任務(wù)存儲器:jobstore='default' 默認(rèn)內(nèi)存存儲  
  26. # 最大并發(fā)數(shù):max_instances  
  27. scheduler.add_job(job_func, trigger='interval', args=[1], id='1', name='a test job', max_instances=10, jobstore='default', executor='default', seconds=10)  
  28. # 設(shè)置任務(wù)監(jiān)聽  
  29. scheduler.add_listener(job_exception_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)  
  30. # 啟動調(diào)度器  
  31. scheduler.start() 

運(yùn)行情況:

 
 
 
 
  1. job 1 is runed at 2020-03-21 20:00:38  
  2. The job worked :)  
  3. job 1 is runed at 2020-03-21 20:00:48  
  4. The job worked :)  
  5. job 1 is runed at 2020-03-21 20:00:58  
  6. The job worked :) 

觸發(fā)器

觸發(fā)器決定何時(shí)執(zhí)行任務(wù),APScheduler支持的觸發(fā)器有3種

  •  trigger='interval':按固定時(shí)間周期執(zhí)行,支持weeks,days,hours,minutes, seconds, 還可指定時(shí)間范圍   
 
 
 
 
  1. sched.add_job(job_function, 'interval', hours=2, start_date='2010-10-10 09:30:00', end_date='2014-06-15 11:00:00') 
  •  trigger='date': 固定時(shí)間,執(zhí)行一次   
 
 
 
 
  1. sched.add_job(my_job, 'date', run_date=datetime(2009, 11, 6, 16, 30, 5), args=['text']) 
  •   trigger='cron': 支持crontab方式,執(zhí)行任務(wù)
  •   參數(shù):分鐘/小時(shí)/天/月/周粒度,也可指定時(shí)間范圍     
 
 
 
 
  1. year (int|str) – 4-digit year  
  2.       month (int|str) – month (1-12)  
  3.       day (int|str) – day of the (1-31)  
  4.       week (int|str) – ISO week (1-53)  
  5.       day_of_week (int|str) – number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun)  
  6.       hour (int|str) – hour (0-23)  
  7.       minute (int|str) – minute (0-59)  
  8.       second (int|str) – second (0-59)  
  9.       start_date (datetime|str) – earliest possible date/time to trigger on (inclusive)  
  10.       end_date (datetime|str) – latest possible date/time to trigger on (inclusive) 
  •   例子           
 
 
 
 
  1. # 星期一到星期五,5點(diǎn)30執(zhí)行任務(wù)job_function,直到2014-05-30 00:00:00  
  2.            sched.add_job(job_function, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2014-05-30')  
  3.            # 按照crontab格式執(zhí)行, 格式為:分鐘 小時(shí) 天 月 周,*表示所有  
  4.            # 5月到8月的1號到15號,0點(diǎn)0分執(zhí)行任務(wù)job_function  
  5.            sched.add_job(job_function, CronTrigger.from_crontab('0 0 1-15 may-aug *')) 

執(zhí)行器

執(zhí)行器決定如何執(zhí)行任務(wù);APScheduler支持4種不同執(zhí)行器,常用的有pool(線程/進(jìn)程)和gevent(io多路復(fù)用,支持高并發(fā)),默認(rèn)為pool中線程池, 不同的執(zhí)行器可以在調(diào)度器的配置中進(jìn)行配置(見調(diào)度器)

  •  apscheduler.executors.asyncio:同步io,阻塞
  •  apscheduler.executors.gevent:io多路復(fù)用,非阻塞
  •  apscheduler.executors.pool: 線程ThreadPoolExecutor和進(jìn)程ProcessPoolExecutor
  •  apscheduler.executors.twisted:基于事件驅(qū)動

任務(wù)存儲器

任務(wù)存儲器決定任務(wù)的保存方式, 默認(rèn)存儲在內(nèi)存中(MemoryJobStore),重啟后就沒有了。APScheduler支持的任務(wù)存儲器有:

  •  apscheduler.jobstores.memory:內(nèi)存
  •  apscheduler.jobstores.mongodb:存儲在mongodb
  •  apscheduler.jobstores.redis:存儲在redis
  •  apscheduler.jobstores.rethinkdb:存儲在rethinkdb
  •  apscheduler.jobstores.sqlalchemy:支持sqlalchemy的數(shù)據(jù)庫如mysql,sqlite等
  •  apscheduler.jobstores.zookeeper:zookeeper

不同的任務(wù)存儲器可以在調(diào)度器的配置中進(jìn)行配置(見調(diào)度器)

調(diào)度器

APScheduler支持的調(diào)度器方式如下,比較常用的為BlockingScheduler和BackgroundScheduler

  •  BlockingScheduler:適用于調(diào)度程序是進(jìn)程中唯一運(yùn)行的進(jìn)程,調(diào)用start函數(shù)會阻塞當(dāng)前線程,不能立即返回。
  •  BackgroundScheduler:適用于調(diào)度程序在應(yīng)用程序的后臺運(yùn)行,調(diào)用start后主線程不會阻塞。
  •  AsyncIOScheduler:適用于使用了asyncio模塊的應(yīng)用程序。
  •  GeventScheduler:適用于使用gevent模塊的應(yīng)用程序。
  •  TwistedScheduler:適用于構(gòu)建Twisted的應(yīng)用程序。
  •  QtScheduler:適用于構(gòu)建Qt的應(yīng)用程序。

從前面的例子,我們可以看到,調(diào)度器可以操作任務(wù)(并為任務(wù)指定觸發(fā)器、任務(wù)存儲器和執(zhí)行器)和監(jiān)控任務(wù)。

 
 
 
 
  1. scheduler.add_job(job_func, trigger='interval', args=[1], id='1', name='a test job', max_instances=10, jobstore='default', executor='default', seconds=10) 

我們來詳細(xì)看下各個(gè)部分

  •  調(diào)度器配置:在add_job我們看到j(luò)obstore和executor都是default,APScheduler在定義調(diào)度器時(shí)可以指定不同的任務(wù)存儲和執(zhí)行器,以及初始的參數(shù)   
 
 
 
 
  1. from pytz import utc  
  2.    from apscheduler.schedulers.background import BackgroundScheduler  
  3.    from apscheduler.jobstores.mongodb import MongoDBJobStore  
  4.    from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore  
  5.    from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor  
  6.    # 通過dict方式執(zhí)行不同的jobstores、executors和默認(rèn)的參數(shù)  
  7.    jobstores = {  
  8.        'mongo': MongoDBJobStore(),  
  9.        'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')  
  10.    }  
  11.    executors = {  
  12.        'default': ThreadPoolExecutor(20),  
  13.        'processpool': ProcessPoolExecutor(5)  
  14.    }  
  15.    job_defaults = {  
  16.        'coalesce': False,  
  17.        'max_instances': 3  
  18.    }  
  19.    # 定義調(diào)度器  
  20.    scheduler = BackgroundScheduler(jobstoresjobstores=jobstores, executorsexecutors=executors, job_defaultsjob_defaults=job_defaults, timezone=utc)  
  21.    def job_func(job_id):  
  22.        print('job %s is runed at %s' % (job_id, datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))  
  23.    # 添加任務(wù)  
  24.    scheduler.add_job(job_func, trigger='interval', args=[1], id='1', name='a test job', jobstore='default', executor='processpool', seconds=10)  
  25.    # 啟動調(diào)度器  
  26.    scheduler.start() 
  •  操作任務(wù):調(diào)度器可以增加,刪除,暫停,恢復(fù)和修改任務(wù)。需要注意的是這里的操作只是對未執(zhí)行的任務(wù)起作用,已經(jīng)執(zhí)行和正在執(zhí)行的任務(wù)不受這些操作的影響。
  •   add_job     
 
 
 
 
  1. scheduler.add_job(job_func, trigger='interval', args=[1], id='1', name='a test job', max_instances=10, jobstore='default', executor='default', seconds=10) 
  •   remove_job: 通過任務(wù)唯一的id,刪除的時(shí)候?qū)?yīng)的任務(wù)存儲器里記錄也會刪除 
 
 
 
 
  1. scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')  
  2.  scheduler.remove_job('my_job_id') 
  •   Pausing and resuming jobs:暫停和重啟任務(wù)       
 
 
 
 
  1. scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')  
  2.         scheduler.pause_job('my_job_id')  
  3.         scheduler.resume_job('my_job_id') 
  •   Modifying jobs:修改任務(wù)的配置       
 
 
 
 
  1. job = scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id', max_instances=10)  
  2.        # 修改任務(wù)的屬性  
  3.        job.modify(max_instances=6, name='Alternate name')  
  4.        # 修改任務(wù)的觸發(fā)器  
  5.        scheduler.reschedule_job('my_job_id', trigger='cron', minute='*/5') 
  •  監(jiān)控任務(wù)事件類型,比較常用的類型有:
    •   EVENT_JOB_ERROR: 表示任務(wù)在執(zhí)行過程的出現(xiàn)異常觸發(fā)
    •   EVENT_JOB_EXECUTED:任務(wù)執(zhí)行成功時(shí)
    •   EVENT_JOB_MAX_INSTANCES:調(diào)度器上執(zhí)行的任務(wù)超過配置的參數(shù)時(shí)       
 
 
 
 
  1. scheduler.add_listener(job_exception_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)    

文章標(biāo)題:Python任務(wù)調(diào)度利器:APScheduler
文章轉(zhuǎn)載:http://www.dlmjj.cn/article/dpipeei.html