欧美性猛交XXXX免费看蜜桃,成人网18免费韩国,亚洲国产成人精品区综合,欧美日韩一区二区三区高清不卡,亚洲综合一区二区精品久久

打開(kāi)APP
userphoto
未登錄

開(kāi)通VIP,暢享免費電子書(shū)等14項超值服

開(kāi)通VIP
Python 實(shí)現定時(shí)任務(wù)的八種方案!
作者:錢(qián)魏Way

來(lái)源:https://www.biaodianfu.com/python-schedule.html

在日常工作中,我們常常會(huì )用到需要周期性執行的任務(wù),一種方式是采用 Linux 系統自帶的 crond 結合命令行實(shí)現。另外一種方式是直接使用Python。接下來(lái)整理的是常見(jiàn)的Python定時(shí)任務(wù)的實(shí)現方式。

目錄

  • 利用while True: + sleep()實(shí)現定時(shí)任務(wù)
  • 使用Timeloop庫運行定時(shí)任務(wù)
  • 利用threading.Timer實(shí)現定時(shí)任務(wù)
  • 利用內置模塊sched實(shí)現定時(shí)任務(wù)
  • 利用調度模塊schedule實(shí)現定時(shí)任務(wù)
  • 利用任務(wù)框架APScheduler實(shí)現定時(shí)任務(wù)
    • Job 作業(yè)
    • Trigger 觸發(fā)器
    • Executor 執行器
    • Jobstore 作業(yè)存儲
    • Event 事件
    • 調度器
    • APScheduler中的重要概念
    • Scheduler的工作流程
  • 使用分布式消息系統Celery實(shí)現定時(shí)任務(wù)
  • 使用數據流工具Apache Airflow實(shí)現定時(shí)任務(wù)
    • Airflow 產(chǎn)生的背景
    • Airflow 核心概念
    • Airflow 的架構

利用while True: + sleep()實(shí)現定時(shí)任務(wù)

位于 time 模塊中的 sleep(secs) 函數,可以實(shí)現令當前執行的線(xiàn)程暫停 secs 秒后再繼續執行。所謂暫停,即令當前線(xiàn)程進(jìn)入阻塞狀態(tài),當達到 sleep() 函數規定的時(shí)間后,再由阻塞狀態(tài)轉為就緒狀態(tài),等待 CPU 調度。

基于這樣的特性我們可以通過(guò)while死循環(huán)+sleep()的方式實(shí)現簡(jiǎn)單的定時(shí)任務(wù)。

代碼示例:

import datetime
import time
def time_printer():
    now = datetime.datetime.now()
    ts = now.strftime('%Y-%m-%d %H:%M:%S')
    print('do func time :', ts)
def loop_monitor():
    while True:
        time_printer()
        time.sleep(5)  # 暫停5秒
if __name__ == '__main__':
    loop_monitor()

主要缺點(diǎn):

  • 只能設定間隔,不能指定具體的時(shí)間,比如每天早上8:00
  • sleep 是一個(gè)阻塞函數,也就是說(shuō) sleep 這一段時(shí)間,程序什么也不能操作。

使用Timeloop庫運行定時(shí)任務(wù)

Timeloop是一個(gè)庫,可用于運行多周期任務(wù)。這是一個(gè)簡(jiǎn)單的庫,它使用decorator模式在線(xiàn)程中運行標記函數。

示例代碼:

import time
from timeloop import Timeloop
from datetime import timedelta
tl = Timeloop()
@tl.job(interval=timedelta(seconds=2))
def sample_job_every_2s():
    print '2s job current time : {}'.format(time.ctime())
@tl.job(interval=timedelta(seconds=5))
def sample_job_every_5s():
    print '5s job current time : {}'.format(time.ctime())
@tl.job(interval=timedelta(seconds=10))
def sample_job_every_10s():
    print '10s job current time : {}'.format(time.ctime())

利用threading.Timer實(shí)現定時(shí)任務(wù)

threading 模塊中的 Timer 是一個(gè)非阻塞函數,比 sleep 稍好一點(diǎn),timer最基本理解就是定時(shí)器,我們可以啟動(dòng)多個(gè)定時(shí)任務(wù),這些定時(shí)器任務(wù)是異步執行,所以不存在等待順序執行問(wèn)題。

Timer(interval, function, args=[ ], kwargs={ })

  • interval: 指定的時(shí)間
  • function: 要執行的方法
  • args/kwargs: 方法的參數

代碼示例:


備注:Timer只能執行一次,這里需要循環(huán)調用,否則只能執行一次

利用內置模塊sched實(shí)現定時(shí)任務(wù)

sched模塊實(shí)現了一個(gè)通用事件調度器,在調度器類(lèi)使用一個(gè)延遲函數等待特定的時(shí)間,執行任務(wù)。同時(shí)支持多線(xiàn)程應用程序,在每個(gè)任務(wù)執行后會(huì )立刻調用延時(shí)函數,以確保其他線(xiàn)程也能執行。

class sched.scheduler(timefunc, delayfunc)這個(gè)類(lèi)定義了調度事件的通用接口,它需要外部傳入兩個(gè)參數,timefunc是一個(gè)沒(méi)有參數的返回時(shí)間類(lèi)型數字的函數(常用使用的如time模塊里面的time),delayfunc應該是一個(gè)需要一個(gè)參數來(lái)調用、與timefunc的輸出兼容、并且作用為延遲多個(gè)時(shí)間單位的函數(常用的如time模塊的sleep)。

代碼示例:

import datetime
import time
import sched
def time_printer():
    now = datetime.datetime.now()
    ts = now.strftime('%Y-%m-%d %H:%M:%S')
    print('do func time :', ts)
    loop_monitor()
def loop_monitor():
    s = sched.scheduler(time.time, time.sleep)  # 生成調度器
    s.enter(51, time_printer, ())
    s.run()
if __name__ == '__main__':
    loop_monitor()

scheduler對象主要方法:

  • enter(delay, priority, action, argument),安排一個(gè)事件來(lái)延遲delay個(gè)時(shí)間單位。
  • cancel(event):從隊列中刪除事件。如果事件不是當前隊列中的事件,則該方法將跑出一個(gè)ValueError。
  • run():運行所有預定的事件。這個(gè)函數將等待(使用傳遞給構造函數的delayfunc()函數),然后執行事件,直到不再有預定的事件。

個(gè)人點(diǎn)評:比threading.Timer更好,不需要循環(huán)調用。

利用調度模塊schedule實(shí)現定時(shí)任務(wù)

schedule是一個(gè)第三方輕量級的任務(wù)調度模塊,可以按照秒,分,小時(shí),日期或者自定義事件執行時(shí)間。schedule允許用戶(hù)使用簡(jiǎn)單、人性化的語(yǔ)法以預定的時(shí)間間隔定期運行Python函數(或其它可調用函數)。

先來(lái)看代碼,是不是不看文檔就能明白什么意思?

import schedule
import time
def job():
    print('I'm working...')
schedule.every(10).seconds.do(job)
schedule.every(10).minutes.do(job)
schedule.every().hour.do(job)
schedule.every().day.at('10:30').do(job)
schedule.every(5).to(10).minutes.do(job)
schedule.every().monday.do(job)
schedule.every().wednesday.at('13:15').do(job)
schedule.every().minute.at(':17').do(job)
while True:
    schedule.run_pending()
    time.sleep(1)

裝飾器:通過(guò) @repeat() 裝飾靜態(tài)方法

import time
from schedule import every, repeat, run_pending
@repeat(every().second)
def job():
    print('working...')
while True:
    run_pending()
    time.sleep(1)

傳遞參數:

import schedule
def greet(name):
    print('Hello', name)
schedule.every(2).seconds.do(greet, name='Alice')
schedule.every(4).seconds.do(greet, name='Bob')
while True:
    schedule.run_pending()

裝飾器同樣能傳遞參數:

from schedule import every, repeat, run_pending
@repeat(every().second, 'World')
@repeat(every().minute, 'Mars')
def hello(planet):
    print('Hello', planet)
while True:
    run_pending()

取消任務(wù):

import schedule
i = 0
def some_task():
    global i
    i += 1
    print(i)
    if i == 10:
        schedule.cancel_job(job)
        print('cancel job')
        exit(0)
job = schedule.every().second.do(some_task)
while True:
    schedule.run_pending()

運行一次任務(wù):

import time
import schedule
def job_that_executes_once():
    print('Hello')
    return schedule.CancelJob
schedule.every().minute.at(':34').do(job_that_executes_once)
while True:
    schedule.run_pending()
    time.sleep(1)

根據標簽檢索任務(wù):

# 檢索所有任務(wù):schedule.get_jobs()
import schedule
def greet(name):
    print('Hello {}'.format(name))
schedule.every().day.do(greet, 'Andrea').tag('daily-tasks''friend')
schedule.every().hour.do(greet, 'John').tag('hourly-tasks''friend')
schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks''customer')
schedule.every().day.do(greet, 'Derek').tag('daily-tasks''guest')
friends = schedule.get_jobs('friend')
print(friends)

根據標簽取消任務(wù):

# 取消所有任務(wù):schedule.clear()
import schedule
def greet(name):
    print('Hello {}'.format(name))
    if name == 'Cancel':
        schedule.clear('second-tasks')
        print('cancel second-tasks')
schedule.every().second.do(greet, 'Andrea').tag('second-tasks''friend')
schedule.every().second.do(greet, 'John').tag('second-tasks''friend')
schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks''customer')
schedule.every(5).seconds.do(greet, 'Cancel').tag('daily-tasks''guest')
while True:
    schedule.run_pending()

運行任務(wù)到某時(shí)間:

import schedule
from datetime import datetime, timedelta, time
def job():
    print('working...')
schedule.every().second.until('23:59').do(job)  # 今天23:59停止
schedule.every().second.until('2030-01-01 18:30').do(job)  # 2030-01-01 18:30停止
schedule.every().second.until(timedelta(hours=8)).do(job)  # 8小時(shí)后停止
schedule.every().second.until(time(235959)).do(job)  # 今天23:59:59停止
schedule.every().second.until(datetime(20301118300)).do(job)  # 2030-01-01 18:30停止
while True:
    schedule.run_pending()

馬上運行所有任務(wù)(主要用于測試):

import schedule
def job():
    print('working...')
def job1():
    print('Hello...')
schedule.every().monday.at('12:40').do(job)
schedule.every().tuesday.at('16:40').do(job1)
schedule.run_all()
schedule.run_all(delay_seconds=3)  # 任務(wù)間延遲3秒

并行運行:使用 Python 內置隊列實(shí)現:

import threading
import time
import schedule
def job1():
    print('I'm running on thread %s' % threading.current_thread())
def job2():
    print('I'm running on thread %s' % threading.current_thread())
def job3():
    print('I'm running on thread %s' % threading.current_thread())
def run_threaded(job_func):
    job_thread = threading.Thread(target=job_func)
    job_thread.start()
schedule.every(10).seconds.do(run_threaded, job1)
schedule.every(10).seconds.do(run_threaded, job2)
schedule.every(10).seconds.do(run_threaded, job3)
while True:
    schedule.run_pending()
    time.sleep(1)

利用任務(wù)框架APScheduler實(shí)現定時(shí)任務(wù)

APScheduler(advanceded python scheduler)基于Quartz的一個(gè)Python定時(shí)任務(wù)框架,實(shí)現了Quartz的所有功能,使用起來(lái)十分方便。提供了基于日期、固定時(shí)間間隔以及crontab類(lèi)型的任務(wù),并且可以持久化任務(wù)?;谶@些功能,我們可以很方便的實(shí)現一個(gè)Python定時(shí)任務(wù)系統。

它有以下三個(gè)特點(diǎn):

  • 類(lèi)似于 Liunx Cron 的調度程序(可選的開(kāi)始/結束時(shí)間)
  • 基于時(shí)間間隔的執行調度(周期性調度,可選的開(kāi)始/結束時(shí)間)
  • 一次性執行任務(wù)(在設定的日期/時(shí)間運行一次任務(wù))

APScheduler有四種組成部分:

  • 觸發(fā)器(trigger) 包含調度邏輯,每一個(gè)作業(yè)有它自己的觸發(fā)器,用于決定接下來(lái)哪一個(gè)作業(yè)會(huì )運行。除了他們自己初始配置意外,觸發(fā)器完全是無(wú)狀態(tài)的。
  • 作業(yè)存儲(job store) 存儲被調度的作業(yè),默認的作業(yè)存儲是簡(jiǎn)單地把作業(yè)保存在內存中,其他的作業(yè)存儲是將作業(yè)保存在數據庫中。一個(gè)作業(yè)的數據講在保存在持久化作業(yè)存儲時(shí)被序列化,并在加載時(shí)被反序列化。調度器不能分享同一個(gè)作業(yè)存儲。
  • 執行器(executor) 處理作業(yè)的運行,他們通常通過(guò)在作業(yè)中提交制定的可調用對象到一個(gè)線(xiàn)程或者進(jìn)城池來(lái)進(jìn)行。當作業(yè)完成時(shí),執行器將會(huì )通知調度器。
  • 調度器(scheduler) 是其他的組成部分。你通常在應用只有一個(gè)調度器,應用的開(kāi)發(fā)者通常不會(huì )直接處理作業(yè)存儲、調度器和觸發(fā)器,相反,調度器提供了處理這些的合適的接口。配置作業(yè)存儲和執行器可以在調度器中完成,例如添加、修改和移除作業(yè)。通過(guò)配置executor、jobstore、trigger,使用線(xiàn)程池(ThreadPoolExecutor默認值20)或進(jìn)程池(ProcessPoolExecutor 默認值5)并且默認最多3個(gè)(max_instances)任務(wù)實(shí)例同時(shí)運行,實(shí)現對job的增刪改查等調度控制

示例代碼:

from apscheduler.schedulers.blocking import BlockingScheduler
from datetime import datetime
# 輸出時(shí)間
def job():
    print(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
# BlockingScheduler
sched = BlockingScheduler()
sched.add_job(my_job, 'interval', seconds=5, id='my_job_id')
sched.start()

APScheduler中的重要概念

Job 作業(yè)

Job作為APScheduler最小執行單位。創(chuàng )建Job時(shí)指定執行的函數,函數中所需參數,Job執行時(shí)的一些設置信息。

構建說(shuō)明:

  • id:指定作業(yè)的唯一ID
  • name:指定作業(yè)的名字
  • trigger:apscheduler定義的觸發(fā)器,用于確定Job的執行時(shí)間,根據設置的trigger規則,計算得到下次執行此job的時(shí)間, 滿(mǎn)足時(shí)將會(huì )執行
  • executor:apscheduler定義的執行器,job創(chuàng )建時(shí)設置執行器的名字,根據字符串你名字到scheduler獲取到執行此job的 執行器,執行job指定的函數
  • max_instances:執行此job的最大實(shí)例數,executor執行job時(shí),根據job的id來(lái)計算執行次數,根據設置的最大實(shí)例數來(lái)確定是否可執行
  • next_run_time:Job下次的執行時(shí)間,創(chuàng )建Job時(shí)可以指定一個(gè)時(shí)間[datetime],不指定的話(huà)則默認根據trigger獲取觸發(fā)時(shí)間
  • misfire_grace_time:Job的延遲執行時(shí)間,例如Job的計劃執行時(shí)間是21:00:00,但因服務(wù)重啟或其他原因導致21:00:31才執行,如果設置此key為40,則該job會(huì )繼續執行,否則將會(huì )丟棄此job
  • coalesce:Job是否合并執行,是一個(gè)bool值。例如scheduler停止20s后重啟啟動(dòng),而job的觸發(fā)器設置為5s執行一次,因此此job錯過(guò)了4個(gè)執行時(shí)間,如果設置為是,則會(huì )合并到一次執行,否則會(huì )逐個(gè)執行
  • func:Job執行的函數
  • args:Job執行函數需要的位置參數
  • kwargs:Job執行函數需要的關(guān)鍵字參數

Trigger 觸發(fā)器

Trigger綁定到Job,在scheduler調度篩選Job時(shí),根據觸發(fā)器的規則計算出Job的觸發(fā)時(shí)間,然后與當前時(shí)間比較確定此Job是否會(huì )被執行,總之就是根據trigger規則計算出下一個(gè)執行時(shí)間。

目前APScheduler支持觸發(fā)器:

  • 指定時(shí)間的DateTrigger
  • 指定間隔時(shí)間的IntervalTrigger
  • 像Linux的crontab一樣的CronTrigger。

觸發(fā)器參數:date

date定時(shí),作業(yè)只執行一次。

  • run_date (datetime|str) – the date/time to run the job at
  • timezone (datetime.tzinfo|str) – time zone for run_date if it doesn’t have one already
sched.add_job(my_job, 'date', run_date=date(2009116), args=['text'])
sched.add_job(my_job, 'date', run_date=datetime(20197616305), args=['text'])

觸發(fā)器參數:interval

interval間隔調度

  • weeks (int) –  間隔幾周
  • days (int) –  間隔幾天
  • hours (int) –  間隔幾小時(shí)
  • minutes (int) –  間隔幾分鐘
  • seconds (int) –  間隔多少秒
  • start_date (datetime|str) –  開(kāi)始日期
  • end_date (datetime|str) –  結束日期
  • timezone (datetime.tzinfo|str) –  時(shí)區
sched.add_job(job_function, 'interval', hours=2)

觸發(fā)器參數:cron

cron調度

  • (int|str) 表示參數既可以是int類(lèi)型,也可以是str類(lèi)型
  • (datetime | str) 表示參數既可以是datetime類(lèi)型,也可以是str類(lèi)型
  • year (int|str) – 4-digit year -(表示四位數的年份,如2008年)
  • month (int|str) – month (1-12) -(表示取值范圍為1-12月)
  • day (int|str) – day of the (1-31) -(表示取值范圍為1-31日)
  • week (int|str) – ISO week (1-53) -(格里歷2006年12月31日可以寫(xiě)成2006年-W52-7(擴展形式)或2006W527(緊湊形式))
  • day_of_week (int|str) – number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun) – (表示一周中的第幾天,既可以用0-6表示也可以用其英語(yǔ)縮寫(xiě)表示)
  • hour (int|str) – hour (0-23) – (表示取值范圍為0-23時(shí))
  • minute (int|str) – minute (0-59) – (表示取值范圍為0-59分)
  • second (int|str) – second (0-59) – (表示取值范圍為0-59秒)
  • start_date (datetime|str) – earliest possible date/time to trigger on (inclusive) – (表示開(kāi)始時(shí)間)
  • end_date (datetime|str) – latest possible date/time to trigger on (inclusive) – (表示結束時(shí)間)
  • timezone (datetime.tzinfo|str) – time zone to use for the date/time calculations (defaults to scheduler timezone) -(表示時(shí)區取值)

CronTrigger可用的表達式:

表達式參數類(lèi)型描述
*所有通配符。例:minutes=*即每分鐘觸發(fā)
* / a所有每隔時(shí)長(cháng)a執行一次。例:minutes=”* / 3″ 即每隔3分鐘執行一次
a – b所有a – b的范圍內觸發(fā)。例:minutes=“2-5”。即2到5分鐘內每分鐘執行一次
a – b / c所有a – b范圍內,每隔時(shí)長(cháng)c執行一次。
xth y第幾個(gè)星期幾觸發(fā)。x為第幾個(gè),y為星期幾
last x一個(gè)月中,最后一個(gè)星期的星期幾觸發(fā)
last一個(gè)月中的最后一天觸發(fā)
x, y, z所有組合表達式,可以組合確定值或上述表達式
# 6-8,11-12月第三個(gè)周五 00:00, 01:00, 02:00, 03:00運行
sched.add_job(job_function, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3')
# 每周一到周五運行 直到2024-05-30 00:00:00
sched.add_job(job_function, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2024-05-30'

Executor 執行器

Executor在scheduler中初始化,另外也可通過(guò)scheduler的add_executor動(dòng)態(tài)添加Executor。每個(gè)executor都會(huì )綁定一個(gè)alias,這個(gè)作為唯一標識綁定到Job,在實(shí)際執行時(shí)會(huì )根據Job綁定的executor找到實(shí)際的執行器對象,然后根據執行器對象執行Job。

Executor的種類(lèi)會(huì )根據不同的調度來(lái)選擇,如果選擇AsyncIO作為調度的庫,那么選擇AsyncIOExecutor,如果選擇tornado作為調度的庫,選擇TornadoExecutor,如果選擇啟動(dòng)進(jìn)程作為調度,選擇ThreadPoolExecutor或者ProcessPoolExecutor都可以。

Executor的選擇需要根據實(shí)際的scheduler來(lái)選擇不同的執行器。目前APScheduler支持的Executor:

  • executors.asyncio:同步io,阻塞
  • executors.gevent:io多路復用,非阻塞
  • executors.pool: 線(xiàn)程ThreadPoolExecutor和進(jìn)程ProcessPoolExecutor
  • executors.twisted:基于事件驅動(dòng)

Jobstore 作業(yè)存儲

Jobstore在scheduler中初始化,另外也可通過(guò)scheduler的add_jobstore動(dòng)態(tài)添加Jobstore。每個(gè)jobstore都會(huì )綁定一個(gè)alias,scheduler在A(yíng)dd Job時(shí),根據指定的jobstore在scheduler中找到相應的jobstore,并將job添加到j(luò )obstore中。作業(yè)存儲器決定任務(wù)的保存方式, 默認存儲在內存中(MemoryJobStore),重啟后就沒(méi)有了。APScheduler支持的任務(wù)存儲器有:

  • jobstores.memory:內存
  • jobstores.mongodb:存儲在mongodb
  • jobstores.redis:存儲在redis
  • jobstores.rethinkdb:存儲在rethinkdb
  • jobstores.sqlalchemy:支持sqlalchemy的數據庫如mysql,sqlite等
  • jobstores.zookeeper:zookeeper

不同的任務(wù)存儲器可以在調度器的配置中進(jìn)行配置(見(jiàn)調度器)

Event 事件

Event是APScheduler在進(jìn)行某些操作時(shí)觸發(fā)相應的事件,用戶(hù)可以自定義一些函數來(lái)監聽(tīng)這些事件,當觸發(fā)某些Event時(shí),做一些具體的操作。常見(jiàn)的比如。Job執行異常事件 EVENT_JOB_ERROR。Job執行時(shí)間錯過(guò)事件 EVENT_JOB_MISSED。

目前APScheduler定義的Event:

  • EVENT_SCHEDULER_STARTED
  • EVENT_SCHEDULER_START
  • EVENT_SCHEDULER_SHUTDOWN
  • EVENT_SCHEDULER_PAUSED
  • EVENT_SCHEDULER_RESUMED
  • EVENT_EXECUTOR_ADDED
  • EVENT_EXECUTOR_REMOVED
  • EVENT_JOBSTORE_ADDED
  • EVENT_JOBSTORE_REMOVED
  • EVENT_ALL_JOBS_REMOVED
  • EVENT_JOB_ADDED
  • EVENT_JOB_REMOVED
  • EVENT_JOB_MODIFIED
  • EVENT_JOB_EXECUTED
  • EVENT_JOB_ERROR
  • EVENT_JOB_MISSED
  • EVENT_JOB_SUBMITTED
  • EVENT_JOB_MAX_INSTANCES

Listener表示用戶(hù)自定義監聽(tīng)的一些Event,比如當Job觸發(fā)了EVENT_JOB_MISSED事件時(shí)可以根據需求做一些其他處理。

調度器

Scheduler是APScheduler的核心,所有相關(guān)組件通過(guò)其定義。scheduler啟動(dòng)之后,將開(kāi)始按照配置的任務(wù)進(jìn)行調度。除了依據所有定義Job的trigger生成的將要調度時(shí)間喚醒調度之外。當發(fā)生Job信息變更時(shí)也會(huì )觸發(fā)調度。

APScheduler支持的調度器方式如下,比較常用的為BlockingScheduler和BackgroundScheduler

  • BlockingScheduler:適用于調度程序是進(jìn)程中唯一運行的進(jìn)程,調用start函數會(huì )阻塞當前線(xiàn)程,不能立即返回。
  • BackgroundScheduler:適用于調度程序在應用程序的后臺運行,調用start后主線(xiàn)程不會(huì )阻塞。
  • AsyncIOScheduler:適用于使用了asyncio模塊的應用程序。
  • GeventScheduler:適用于使用gevent模塊的應用程序。
  • TwistedScheduler:適用于構建Twisted的應用程序。
  • QtScheduler:適用于構建Qt的應用程序。

Scheduler的工作流程

Scheduler添加job流程:

Scheduler調度流程:

使用分布式消息系統Celery實(shí)現定時(shí)任務(wù)

Celery是一個(gè)簡(jiǎn)單,靈活,可靠的分布式系統,用于處理大量消息,同時(shí)為操作提供維護此類(lèi)系統所需的工具, 也可用于任務(wù)調度。Celery 的配置比較麻煩,如果你只是需要一個(gè)輕量級的調度工具,Celery 不會(huì )是一個(gè)好選擇。

Celery 是一個(gè)強大的分布式任務(wù)隊列,它可以讓任務(wù)的執行完全脫離主程序,甚至可以被分配到其他主機上運行。我們通常使用它來(lái)實(shí)現異步任務(wù)(async task)和定時(shí)任務(wù)(crontab)。異步任務(wù)比如是發(fā)送郵件、或者文件上傳, 圖像處理等等一些比較耗時(shí)的操作 ,定時(shí)任務(wù)是需要在特定時(shí)間執行的任務(wù)。

需要注意,celery本身并不具備任務(wù)的存儲功能,在調度任務(wù)的時(shí)候肯定是要把任務(wù)存起來(lái)的,因此在使用celery的時(shí)候還需要搭配一些具備存儲、訪(fǎng)問(wèn)功能的工具,比如:消息隊列、Redis緩存、數據庫等。官方推薦的是消息隊列RabbitMQ,有些時(shí)候使用Redis也是不錯的選擇。

它的架構組成如下圖:

Celery架構,它采用典型的生產(chǎn)者-消費者模式,主要由以下部分組成:

  • Celery Beat,任務(wù)調度器,Beat進(jìn)程會(huì )讀取配置文件的內容,周期性地將配置中到期需要執行的任務(wù)發(fā)送給任務(wù)隊列。
  • Producer:需要在隊列中進(jìn)行的任務(wù),一般由用戶(hù)、觸發(fā)器或其他操作將任務(wù)入隊,然后交由workers進(jìn)行處理。調用了Celery提供的API、函數或者裝飾器而產(chǎn)生任務(wù)并交給任務(wù)隊列處理的都是任務(wù)生產(chǎn)者。
  • Broker,即消息中間件,在這指任務(wù)隊列本身,Celery扮演生產(chǎn)者和消費者的角色,brokers就是生產(chǎn)者和消費者存放/獲取產(chǎn)品的地方(隊列)。
  • Celery Worker,執行任務(wù)的消費者,從隊列中取出任務(wù)并執行。通常會(huì )在多臺服務(wù)器運行多個(gè)消費者來(lái)提高執行效率。
  • Result Backend:任務(wù)處理完后保存狀態(tài)信息和結果,以供查詢(xún)。Celery默認已支持Redis、RabbitMQ、MongoDB、Django ORM、SQLAlchemy等方式。

實(shí)際應用中,用戶(hù)從Web前端發(fā)起一個(gè)請求,我們只需要將請求所要處理的任務(wù)丟入任務(wù)隊列broker中,由空閑的worker去處理任務(wù)即可,處理的結果會(huì )暫存在后臺數據庫backend中。我們可以在一臺機器或多臺機器上同時(shí)起多個(gè)worker進(jìn)程來(lái)實(shí)現分布式地并行處理任務(wù)。

Celery定時(shí)任務(wù)實(shí)例:

  • Python Celery & RabbitMQ Tutorial
  • Celery 配置實(shí)踐筆記

使用數據流工具Apache Airflow實(shí)現定時(shí)任務(wù)

Apache Airflow 是Airbnb開(kāi)源的一款數據流程工具,目前是Apache孵化項目。以非常靈活的方式來(lái)支持數據的ETL過(guò)程,同時(shí)還支持非常多的插件來(lái)完成諸如HDFS監控、郵件通知等功能。Airflow支持單機和分布式兩種模式,支持Master-Slave模式,支持Mesos等資源調度,有非常好的擴展性。被大量公司采用。

Airflow使用Python開(kāi)發(fā),它通過(guò)DAGs(Directed Acyclic Graph, 有向無(wú)環(huán)圖)來(lái)表達一個(gè)工作流中所要執行的任務(wù),以及任務(wù)之間的關(guān)系和依賴(lài)。比如,如下的工作流中,任務(wù)T1執行完成,T2和T3才能開(kāi)始執行,T2和T3都執行完成,T4才能開(kāi)始執行。

Airflow提供了各種Operator實(shí)現,可以完成各種任務(wù)實(shí)現:

  • BashOperator – 執行 bash 命令或腳本。
  • SSHOperator – 執行遠程 bash 命令或腳本(原理同 paramiko 模塊)。
  • PythonOperator – 執行 Python 函數。
  • EmailOperator – 發(fā)送 Email。
  • HTTPOperator – 發(fā)送一個(gè) HTTP 請求。
  • MySqlOperator, SqliteOperator, PostgresOperator, MsSqlOperator, OracleOperator, JdbcOperator, 等,執行 SQL 任務(wù)。
  • DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator…

除了以上這些 Operators 還可以方便的自定義 Operators 滿(mǎn)足個(gè)性化的任務(wù)需求。

一些情況下,我們需要根據執行結果執行不同的任務(wù),這樣工作流會(huì )產(chǎn)生分支。如:

這種需求可以使用BranchPythonOperator來(lái)實(shí)現。

Airflow 產(chǎn)生的背景

通常,在一個(gè)運維系統,數據分析系統,或測試系統等大型系統中,我們會(huì )有各種各樣的依賴(lài)需求。包括但不限于:

  • 時(shí)間依賴(lài):任務(wù)需要等待某一個(gè)時(shí)間點(diǎn)觸發(fā)。
  • 外部系統依賴(lài):任務(wù)依賴(lài)外部系統需要調用接口去訪(fǎng)問(wèn)。
  • 任務(wù)間依賴(lài):任務(wù) A 需要在任務(wù) B 完成后啟動(dòng),兩個(gè)任務(wù)互相間會(huì )產(chǎn)生影響。
  • 資源環(huán)境依賴(lài):任務(wù)消耗資源非常多, 或者只能在特定的機器上執行。

crontab 可以很好地處理定時(shí)執行任務(wù)的需求,但僅能管理時(shí)間上的依賴(lài)。Airflow 的核心概念 DAG(有向無(wú)環(huán)圖)—— 來(lái)表現工作流。

  • Airflow 是一種 WMS,即:它將任務(wù)以及它們的依賴(lài)看作代碼,按照那些計劃規范任務(wù)執行,并在實(shí)際工作進(jìn)程之間分發(fā)需執行的任務(wù)。
  • Airflow 提供了一個(gè)用于顯示當前活動(dòng)任務(wù)和過(guò)去任務(wù)狀態(tài)的優(yōu)秀 UI,并允許用戶(hù)手動(dòng)管理任務(wù)的執行和狀態(tài)。
  • Airflow 中的工作流是具有方向性依賴(lài)的任務(wù)集合。
  • DAG 中的每個(gè)節點(diǎn)都是一個(gè)任務(wù),DAG 中的邊表示的是任務(wù)之間的依賴(lài)(強制為有向無(wú)環(huán),因此不會(huì )出現循環(huán)依賴(lài),從而導致無(wú)限執行循環(huán))。

Airflow 核心概念

  • DAGs:即有向無(wú)環(huán)圖(Directed Acyclic Graph),將所有需要運行的tasks按照依賴(lài)關(guān)系組織起來(lái),描述的是所有tasks執行順序。
  • Operators:可以簡(jiǎn)單理解為一個(gè)class,描述了DAG中某個(gè)的task具體要做的事。其中,airflow內置了很多operators,如BashOperator 執行一個(gè)bash 命令,PythonOperator 調用任意的Python 函數,EmailOperator 用于發(fā)送郵件,HTTPOperator 用于發(fā)送HTTP請求, SqlOperator 用于執行SQL命令等等,同時(shí),用戶(hù)可以自定義Operator,這給用戶(hù)提供了極大的便利性。
  • Tasks:Task 是 Operator的一個(gè)實(shí)例,也就是DAGs中的一個(gè)node。
  • Task Instance:task的一次運行。Web 界面中可以看到task instance 有自己的狀態(tài),包括”running”, “success”, “failed”, “skipped”, “up for retry”等。
  • Task Relationships:DAGs中的不同Tasks之間可以有依賴(lài)關(guān)系,如 Task1 >> Task2,表明Task2依賴(lài)于Task2了。通過(guò)將DAGs和Operators結合起來(lái),用戶(hù)就可以創(chuàng )建各種復雜的 工作流(workflow)。

Airflow 的架構

在一個(gè)可擴展的生產(chǎn)環(huán)境中,Airflow 含有以下組件:

  • 元數據庫:這個(gè)數據庫存儲有關(guān)任務(wù)狀態(tài)的信息。
  • 調度器:Scheduler 是一種使用 DAG 定義結合元數據中的任務(wù)狀態(tài)來(lái)決定哪些任務(wù)需要被執行以及任務(wù)執行優(yōu)先級的過(guò)程。調度器通常作為服務(wù)運行。
  • 執行器:Executor 是一個(gè)消息隊列進(jìn)程,它被綁定到調度器中,用于確定實(shí)際執行每個(gè)任務(wù)計劃的工作進(jìn)程。有不同類(lèi)型的執行器,每個(gè)執行器都使用一個(gè)指定工作進(jìn)程的類(lèi)來(lái)執行任務(wù)。例如,LocalExecutor 使用與調度器進(jìn)程在同一臺機器上運行的并行進(jìn)程執行任務(wù)。其他像 CeleryExecutor 的執行器使用存在于獨立的工作機器集群中的工作進(jìn)程執行任務(wù)。
  • Workers:這些是實(shí)際執行任務(wù)邏輯的進(jìn)程,由正在使用的執行器確定。

Worker的具體實(shí)現由配置文件中的executor來(lái)指定,airflow支持多種Executor:

  • SequentialExecutor: 單進(jìn)程順序執行,一般只用來(lái)測試
  • LocalExecutor: 本地多進(jìn)程執行
  • CeleryExecutor: 使用Celery進(jìn)行分布式任務(wù)調度
  • DaskExecutor:使用Dask進(jìn)行分布式任務(wù)調度
  • KubernetesExecutor: 1.10.0新增, 創(chuàng )建臨時(shí)POD執行每次任務(wù)

生產(chǎn)環(huán)境一般使用CeleryExecutor和KubernetesExecutor。

使用CeleryExecutor的架構如圖:

使用KubernetesExecutor的架構如圖:

其它參考:

  • Getting started with Apache Airflow
  • Understanding Apache Airflow’s key concepts
本站僅提供存儲服務(wù),所有內容均由用戶(hù)發(fā)布,如發(fā)現有害或侵權內容,請點(diǎn)擊舉報。
打開(kāi)APP,閱讀全文并永久保存 查看更多類(lèi)似文章
猜你喜歡
類(lèi)似文章
Python任務(wù)調度神器:APScheduler使用詳解
Python使用APScheduler實(shí)現定時(shí)任務(wù)
基于A(yíng)SP.NET MVC(C#)和Quartz.Net組件實(shí)現的定時(shí)執行任務(wù)調度
python不要再使用while死循環(huán),使用定時(shí)器代替效果更佳!
定時(shí)執行執行任務(wù)-APScheduler的全稱(chēng)是Advanced Python Scheduler。
Python實(shí)現定時(shí)任務(wù)的利器apscheduler
更多類(lèi)似文章 >>
生活服務(wù)
分享 收藏 導長(cháng)圖 關(guān)注 下載文章
綁定賬號成功
后續可登錄賬號暢享VIP特權!
如果VIP功能使用有故障,
可點(diǎn)擊這里聯(lián)系客服!

聯(lián)系客服

欧美性猛交XXXX免费看蜜桃,成人网18免费韩国,亚洲国产成人精品区综合,欧美日韩一区二区三区高清不卡,亚洲综合一区二区精品久久