日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

利用 Celery 构建 Web 服务的后台任务调度模块

發(fā)布時(shí)間:2024/7/23 编程问答 35 豆豆
生活随笔 收集整理的這篇文章主要介紹了 利用 Celery 构建 Web 服务的后台任务调度模块 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

來(lái)源:http://www.tuicool.com/articles/Enaeymm


任務(wù)隊(duì)列在 Web 服務(wù)里的應(yīng)用

在 Web2.0 后的時(shí)代,社交網(wǎng)站、搜索引擎的的迅猛發(fā)展對(duì) Web 服務(wù)的后臺(tái)管理系統(tǒng)提出了更高的需求。考慮幾個(gè)常見(jiàn)的使用場(chǎng)景:

  • 社交網(wǎng)站的用戶在其主頁(yè)發(fā)布了一組新的照片,這條新鮮事需要適時(shí)地推送至該用戶的所有好友。該網(wǎng)站的活躍用戶有千萬(wàn)級(jí)別,在同一時(shí)刻會(huì)有非常多的“新鮮事推送”任務(wù)需要處理,并且每個(gè)用戶的好友數(shù)會(huì)達(dá)到 1000+的級(jí)別。出于用戶體驗(yàn)的考慮,用戶發(fā)布照片的這個(gè)操作需要在較短時(shí)間內(nèi)得到反饋。
  • 在文獻(xiàn)搜索系統(tǒng)的主頁(yè),用戶可以查到當(dāng)前一小時(shí)內(nèi)最熱門(mén)的十大文獻(xiàn),并且能夠直接訪問(wèn)該文獻(xiàn)。該文獻(xiàn)管理系統(tǒng)所管理的文獻(xiàn)數(shù)量非常多,達(dá)到 PB 的級(jí)別。處于用戶體驗(yàn)的考慮,用戶獲得十大熱門(mén)文獻(xiàn)這個(gè)動(dòng)作需要在較短時(shí)間內(nèi)獲得反饋。
  • 考慮對(duì)于高并發(fā)大用戶量的 Web 服務(wù)系統(tǒng),對(duì)于場(chǎng)景一和場(chǎng)景二中的需求,如果在請(qǐng)求處理周期內(nèi)完成這些任務(wù),然后再返回結(jié)果,這種傳統(tǒng)的做法會(huì)導(dǎo)致用戶等待的時(shí)間過(guò)長(zhǎng)。同時(shí) Web 服務(wù)管理后臺(tái)對(duì)任務(wù)處理能力也缺乏擴(kuò)展性。

    在這種場(chǎng)景下,任務(wù)隊(duì)列是有效的解決方案。在一個(gè)任務(wù)隊(duì)列系統(tǒng)中,“將新鮮事推送至用戶 A 的所有好友”或者“查詢當(dāng)前最熱門(mén)的十大文獻(xiàn)”這種查詢或者計(jì)算工作可以被當(dāng)成一個(gè)“任務(wù)”。在任務(wù)隊(duì)列系統(tǒng)中,一般有任務(wù)生產(chǎn)者、任務(wù)處理中間方以及任務(wù)消費(fèi)者三方。其中任務(wù)生產(chǎn)者負(fù)責(zé)生產(chǎn)任務(wù),比如“將新鮮事推送至用戶 A 的所有好友”這一任務(wù)的發(fā)起方就可以稱(chēng)作任務(wù)生產(chǎn)者。任務(wù)處理中間方負(fù)責(zé)接收任務(wù)生產(chǎn)者的任務(wù)處理請(qǐng)求,對(duì)任務(wù)進(jìn)行調(diào)度,最后將任務(wù)分發(fā)給任務(wù)消費(fèi)者來(lái)進(jìn)行處理。任務(wù)消費(fèi)者就是執(zhí)行任務(wù)的一方,它負(fù)責(zé)接收任務(wù)處理中間方發(fā)來(lái)的任務(wù)處理請(qǐng)求,完成這些任務(wù),并且返回任務(wù)處理的結(jié)果。在生產(chǎn)方、消費(fèi)者和任務(wù)處理中間方之間一般使用消息傳遞的方式來(lái)進(jìn)行通信。

    在任務(wù)隊(duì)列系統(tǒng)框架中,任務(wù)消費(fèi)者可以跨越不同的服務(wù)節(jié)點(diǎn),可以動(dòng)態(tài)地增加節(jié)點(diǎn)來(lái)增加系統(tǒng)的任務(wù)處理能力,非常適合高并發(fā)、需要橫向擴(kuò)展的 Web 服務(wù)后臺(tái)。

    回頁(yè)首

    Celery: 基于 Python 的開(kāi)源分布式任務(wù)調(diào)度模塊

    Celery 是一個(gè)用 Python 編寫(xiě)的分布式的任務(wù)調(diào)度模塊,它有著簡(jiǎn)明的 API,并且有豐富的擴(kuò)展性,適合用于構(gòu)建分布式的 Web 服務(wù)。

    圖 1. Celery 的模塊架構(gòu)


    Celery 的模塊架構(gòu)較為簡(jiǎn)潔,但是提供了較為完整的功能:

    任務(wù)生產(chǎn)者 (task producer)

    任務(wù)生產(chǎn)者 (task producer) 負(fù)責(zé)產(chǎn)生計(jì)算任務(wù),交給任務(wù)隊(duì)列去處理。在 Celery 里,一段獨(dú)立的 Python 代碼、一段嵌入在 Django Web 服務(wù)里的一段請(qǐng)求處理邏輯,只要是調(diào)用了 Celery 提供的 API,產(chǎn)生任務(wù)并交給任務(wù)隊(duì)列處理的,我們都可以稱(chēng)之為任務(wù)生產(chǎn)者。

    任務(wù)調(diào)度器 (celery beat)

    Celery beat 是一個(gè)任務(wù)調(diào)度器,它以獨(dú)立進(jìn)程的形式存在。Celery beat 進(jìn)程會(huì)讀取配置文件的內(nèi)容,周期性地將執(zhí)行任務(wù)的請(qǐng)求發(fā)送給任務(wù)隊(duì)列。Celery beat 是 Celery 系統(tǒng)自帶的任務(wù)生產(chǎn)者。系統(tǒng)管理員可以選擇關(guān)閉或者開(kāi)啟 Celery beat。同時(shí)在一個(gè) Celery 系統(tǒng)中,只能存在一個(gè) Celery beat 調(diào)度器。

    任務(wù)代理 (broker)

    任務(wù)代理方負(fù)責(zé)接受任務(wù)生產(chǎn)者發(fā)送過(guò)來(lái)的任務(wù)處理消息,存進(jìn)隊(duì)列之后再進(jìn)行調(diào)度,分發(fā)給任務(wù)消費(fèi)方 (celery worker)。因?yàn)槿蝿?wù)處理是基于 message(消息) 的,所以我們一般選擇 RabbitMQ、Redis 等消息隊(duì)列或者數(shù)據(jù)庫(kù)作為 Celery 的 message broker。

    任務(wù)消費(fèi)方 (celery worker)

    Celery worker 就是執(zhí)行任務(wù)的一方,它負(fù)責(zé)接收任務(wù)處理中間方發(fā)來(lái)的任務(wù)處理請(qǐng)求,完成這些任務(wù),并且返回任務(wù)處理的結(jié)果。Celery worker 對(duì)應(yīng)的就是操作系統(tǒng)中的一個(gè)進(jìn)程。Celery 支持分布式部署和橫向擴(kuò)展,我們可以在多個(gè)節(jié)點(diǎn)增加 Celery worker 的數(shù)量來(lái)增加系統(tǒng)的高可用性。在分布式系統(tǒng)中,我們也可以在不同節(jié)點(diǎn)上分配執(zhí)行不同任務(wù)的 Celery worker 來(lái)達(dá)到模塊化的目的。

    結(jié)果保存

    Celery 支持任務(wù)處理完后將狀態(tài)信息和結(jié)果的保存,以供查詢。Celery 內(nèi)置支持 rpc, Django ORM,Redis,RabbitMQ 等方式來(lái)保存任務(wù)處理后的狀態(tài)信息。

    回頁(yè)首

    構(gòu)建第一個(gè) Celery 程序

    在我們的第一個(gè) Celery 程序中,我們嘗試在 Celery 中構(gòu)建一個(gè)“將新鮮事通知到朋友”的任務(wù),并且嘗試通過(guò)編寫(xiě)一個(gè) Python 程序來(lái)啟動(dòng)這個(gè)任務(wù)。

    安裝 Celery

    Pip install celery

    選擇合適的消息代理中間件

    Celery 支持 RabbitMQ、Redis 甚至其他數(shù)據(jù)庫(kù)系統(tǒng)作為其消息代理中間件,在本文中,我們選擇 RabbitMQ 作為消息代理中間件。

    sudo apt-get install rabbitmq-server

    創(chuàng)建 Celery 對(duì)象

    Celery 對(duì)象是所有 Celery 功能的入口,所以在開(kāi)始其它工作之前,我們必須先定義我們自己的 Celery 對(duì)象。該對(duì)象定義了任務(wù)的具體內(nèi)容、任務(wù)隊(duì)列的服務(wù)地址、以及保存任務(wù)執(zhí)行結(jié)果的地址等重要信息。

    # notify_friends.py from celery import Celery import time app = Celery('notify_friends', backend='rpc://', broker='amqp://localhost')@app.task def notify_friends(userId, newsId):print 'Start to notify_friends task at {0}, userID:{1} newsID:{2}'.format(time.ctime(), userId, newsId)time.sleep(2)print 'Task notify_friends succeed at {0}'.format(time.ctime())return True

    在本文中,為了模擬真實(shí)的應(yīng)用場(chǎng)景,我們定義了 notify_friends 這個(gè)任務(wù),它接受兩個(gè)參數(shù),并且在輸出流中打印出一定的信息,

    創(chuàng)建 Celery Worker 服務(wù)進(jìn)程

    在定義完 Celery 對(duì)象后,我們可以創(chuàng)建對(duì)應(yīng)的任務(wù)消費(fèi)者--Celery worker 進(jìn)程,后續(xù)的任務(wù)處理請(qǐng)求都是由這個(gè) Celery worker 進(jìn)程來(lái)最終執(zhí)行的。

    celery -A celery_test worker --loglevel=info

    在 Python 程序中調(diào)用 Celery Task

    我們創(chuàng)建一個(gè)簡(jiǎn)單的 Python 程序,來(lái)觸發(fā) notify_friends 這個(gè)任務(wù)。

    # call_notify_friends.pyfrom notify_friends import notify_friends import timedef notify(userId, messageId):result = notify_friends.delay(userId, messageId)while not result.ready():time.sleep(1)print result.get(timeout=1)if __name__ == '__main__':notify('001', '001')

    我們?cè)?call_notify_friends.py 這個(gè)程序文件中,定義了 Notify 函數(shù),它調(diào)用了我們之前定義的 notify_friends 這個(gè) API,來(lái)發(fā)送任務(wù)處理請(qǐng)求到任務(wù)隊(duì)列,并且不斷地查詢等待來(lái)獲得任務(wù)處理的結(jié)果。

    Celery worker 中的 log 信息:

    [tasks]. celery_test.notify_friends[2015-11-16 15:02:31,113: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672// [2015-11-16 15:02:31,122: INFO/MainProcess] mingle: searching for neighbors [2015-11-16 15:02:32,142: INFO/MainProcess] mingle: all alone [2015-11-16 15:02:32,179: WARNING/MainProcess] celery@yuwenhao-VirtualBox ready. [2015-11-16 15:04:45,474: INFO/MainProcess] Received task: celery_test.notify_friends[3f090a76-7678-4f9c-a37b-ceda59600f9c] [2015-11-16 15:04:45,475: WARNING/Worker-2] Start to notify_friends task at Mon Nov 16 15:04:45 2015, userID:001 newsID:001 [2015-11-16 15:04:47,477: WARNING/Worker-2] Task notify_friends succeed at Mon Nov 16 15:04:47 2015 [2015-11-16 15:04:47,511: INFO/MainProcess] Task celery_test.notify_friends[3f090a76-7678-4f9c-a37b-ceda59600f9c] succeeded in 2.035536565s: True

    我們可以看到,Celery worker 收到了 Python 程序的 notify_friends 任務(wù)的處理請(qǐng)求,并且執(zhí)行完畢。

    回頁(yè)首

    利用調(diào)度器創(chuàng)建周期任務(wù)

    在我們第二個(gè) Celery 程序中,我們嘗試構(gòu)建一個(gè)周期性執(zhí)行“查詢當(dāng)前一小時(shí)最熱門(mén)文獻(xiàn)”的任務(wù),每隔 100 秒執(zhí)行一次,并將結(jié)果保存起來(lái)。后續(xù)的搜索請(qǐng)求到來(lái)后可以直接返回已有的結(jié)果,極大優(yōu)化了用戶體驗(yàn)。

    創(chuàng)建配置文件

    Celery 的調(diào)度器的配置是在 CELERYBEAT_SCHEDULE 這個(gè)全局變量上配置的,我們可以將配置寫(xiě)在一個(gè)獨(dú)立的 Python 模塊,在定義 Celery 對(duì)象的時(shí)候加載這個(gè)模塊。我們將 select_populate_book 這個(gè)任務(wù)定義為每 100 秒執(zhí)行一次。

    # config.py from datetime import timedeltaCELERYBEAT_SCHEDULE = {'select_populate_book': {'task': 'favorite_book.select_populate_book','schedule': timedelta(seconds=100),}, }

    創(chuàng)建 Celery 對(duì)象

    在 Celery 對(duì)象的定義里,我們加載了之前定義的配置文件,并定義了 select_populate_book 這個(gè)任務(wù)。

    #favorite_book.py from celery import Celery import timeapp = Celery('select_populate_book', backend='rpc://', broker='amqp://localhost') app.config_from_object('config')@app.task def select_populate_book():print 'Start to select_populate_book task at {0}'.format(time.ctime())time.sleep(2)print 'Task select_populate_book succeed at {0}'.format(time.ctime())return True

    啟動(dòng) Celery worker

    celery -A favorite_book worker --loglevel=info

    啟動(dòng) Celery beat

    啟動(dòng) Celery beat 調(diào)度器,Celery beat 會(huì)周期性地執(zhí)行在 CELERYBEAT_SCHEDULE 中定義的任務(wù),即周期性地查詢當(dāng)前一小時(shí)最熱門(mén)的書(shū)籍。

    celery -A favorite_book beatyuwenhao@yuwenhao:~$ celery -A favorite_book beat celery beat v3.1.15 (Cipater) is starting. __ - ... __ - _ Configuration ->. broker -> amqp://guest:**@localhost:5672//. loader -> celery.loaders.app.AppLoader. scheduler -> celery.beat.PersistentScheduler. db -> celerybeat-schedule. logfile -> [stderr]@%INFO. maxinterval -> now (0s) [2015-11-16 16:21:15,443: INFO/MainProcess] beat: Starting... [2015-11-16 16:21:15,447: WARNING/MainProcess] Reset: Timezone changed from 'UTC' to None [2015-11-16 16:21:25,448: INFO/MainProcess] Scheduler: Sending due task select_populate_book (favorite_book.select_populate_book) [2015-11-16 16:21:35,485: INFO/MainProcess] Scheduler: Sending due task select_populate_book (favorite_book.select_populate_book) [2015-11-16 16:21:45,490: INFO/MainProcess] Scheduler: Sending due task select_populate_book (favorite_book.select_populate_book)

    我們可以看到,Celery beat 進(jìn)程周期性地將任務(wù)執(zhí)行請(qǐng)求 select_populate_book 發(fā)送至任務(wù)隊(duì)列。

    yuwenhao@yuwenhao:~$ celery -A favorite_book worker --loglevel=info [2015-11-16 16:21:11,560: WARNING/MainProcess] /usr/local/lib/python2.7/dist-packages/celery/apps/worker.py:161: CDeprecationWarning: Starting from version 3.2 Celery will refuse to accept pickle by default.The pickle serializer is a security concern as it may give attackers the ability to execute any command. It's important to secure your broker from unauthorized access when using pickle, so we think that enabling pickle should require a deliberate action and not be the default choice.If you depend on pickle then you should set a setting to disable this warning and to be sure that everything will continue working when you upgrade to Celery 3.2::CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml']You must only enable the serializers that you will actually use.warnings.warn(CDeprecationWarning(W_PICKLE_DEPRECATED))-------------- celery@yuwenhao-VirtualBox v3.1.15 (Cipater) ---- **** ----- --- * *** * -- Linux-3.5.0-23-generic-x86_64-with-Ubuntu-12.04-precise -- * - **** --- - ** ---------- [config] - ** ---------- .> app: select_populate_book:0x1b219d0 - ** ---------- .> transport: amqp://guest:**@localhost:5672// - ** ---------- .> results: rpc:// - *** --- * --- .> concurrency: 2 (prefork) -- ******* ---- --- ***** ----- [queues]-------------- .> celery exchange=celery(direct) key=celery[tasks]. favorite_book.select_populate_book[2015-11-16 16:21:11,579: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672// [2015-11-16 16:21:11,590: INFO/MainProcess] mingle: searching for neighbors [2015-11-16 16:21:12,607: INFO/MainProcess] mingle: all alone [2015-11-16 16:21:12,631: WARNING/MainProcess] celery@yuwenhao-VirtualBox ready. [2015-11-16 16:21:25,459: INFO/MainProcess] Received task: favorite_book.select_populate_book[515f7c55-7ff0-4fcf-bc40-8838f69805fd] [2015-11-16 16:21:25,460: WARNING/Worker-2] Start to select_populate_book task at Mon Nov 16 16:21:25 2015 [2015-11-16 16:21:27,462: WARNING/Worker-2] Task select_populate_book succeed at Mon Nov 16 16:21:27 2015 [2015-11-16 16:21:27,475: INFO/MainProcess] Task favorite_book.select_populate_book [515f7c55-7ff0-4fcf-bc40-8838f69805fd] succeeded in 2.015802141s: True [2015-11-16 16:21:35,494: INFO/MainProcess] Received task: favorite_book.select_populate_book[277d718a-3435-4bca-a881-a8f958d64aa9] [2015-11-16 16:21:35,498: WARNING/Worker-1] Start to select_populate_book task at Mon Nov 16 16:21:35 2015 [2015-11-16 16:21:37,501: WARNING/Worker-1] Task select_populate_book succeed at Mon Nov 16 16:21:37 2015 [2015-11-16 16:21:37,511: INFO/MainProcess] Task favorite_book.select_populate_book [277d718a-3435-4bca-a881-a8f958d64aa9] succeeded in 2.014368786s: True

    我們可以看到,任務(wù) select_populate_book 的 Celery worker 周期性地收到 Celery 調(diào)度器的任務(wù)的處理請(qǐng)求,并且運(yùn)行該任務(wù)。

    回頁(yè)首

    結(jié)束語(yǔ)

    任務(wù)隊(duì)列技術(shù)可以滿足 Web 服務(wù)系統(tǒng)后臺(tái)任務(wù)管理和調(diào)度的需求,適合構(gòu)建分布式的 Web 服務(wù)系統(tǒng)后臺(tái)。Celery 是一個(gè)基于 Python 的開(kāi)源任務(wù)隊(duì)列系統(tǒng)。它有著簡(jiǎn)明的 API 以及良好的擴(kuò)展性。本文首先介紹了隊(duì)列技術(shù)的基本原理,然后介紹了 Celery 的模塊架構(gòu)以及工作原理。最后,本文通過(guò)實(shí)例介紹了如何在 Python 程序中調(diào)用 Celery API 并通過(guò) Celery 任務(wù)隊(duì)列來(lái)執(zhí)行任務(wù),以及如何通過(guò) Celery beat 在 Celery 任務(wù)隊(duì)列中創(chuàng)建周期性執(zhí)行的任務(wù)。希望本文可以對(duì) Web 后臺(tái)開(kāi)發(fā)者、以及 Celery 的初學(xué)者有所幫助。



    總結(jié)

    以上是生活随笔為你收集整理的利用 Celery 构建 Web 服务的后台任务调度模块的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

    如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。