新聞中心
本篇文章給大家分享的是有關(guān)使用celery怎么實現(xiàn)集群管理,小編覺得挺實用的,因此分享給大家學(xué)習(xí),希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。
專注于為中小企業(yè)提供成都網(wǎng)站建設(shè)、成都網(wǎng)站制作服務(wù),電腦端+手機端+微信端的三站合一,更高效的管理,為中小企業(yè)鐘祥免費做網(wǎng)站提供優(yōu)質(zhì)的服務(wù)。我們立足成都,凝聚了一批互聯(lián)網(wǎng)行業(yè)人才,有力地推動了上1000+企業(yè)的穩(wěn)健成長,幫助中小企業(yè)通過網(wǎng)站建設(shè)實現(xiàn)規(guī)模擴充和轉(zhuǎn)變。
架構(gòu):
這里作為例子的celery app為myapp:
root@workgroup0:~/celeryapp# ls myapp agent.py celery.py config.py __init__.py root@workgroup0:~/celeryapp#
公用代碼部分:
celery.py:(備注:172.16.77.175是任務(wù)發(fā)布節(jié)點的ip地址)
from __future__ import absolute_import from celery import Celery app = Celery('myapp', broker='amqp://guest@172.16.77.175//', backend='amqp://guest@172.16.77.175//', include=['myapp.agent']) app.config_from_object('myapp.config') if __name__ == '__main__': app.start()
config.py:
from __future__ import absolute_import from kombu import Queue,Exchange from datetime import timedelta CELERY_TASK_RESULT_EXPIRES=3600 CELERY_TASK_SERIALIZER='json' CELERY_ACCEPT_CONTENT=['json'] CELERY_RESULT_SERIALIZER='json' CELERY_DEFAULT_EXCHANGE = 'agent' CELERY_DEFAULT_EXCHANGE_TYPE = 'direct' CELERT_QUEUES = ( Queue('machine1',exchange='agent',routing_key='machine1'), Queue('machine2',exchange='agent',routing_key='machine2'), )
__init__.py:(空白)
任務(wù)發(fā)布節(jié)點的agent.py:
from __future__ import absolute_import from myapp.celery import app @app.task def add(x,y): return {'the value is ':str(x+y)} @app.task def writefile(): out=open('/tmp/data.txt','w') out.write('hello'+'\n') out.close() @app.task def mul(x,y): return x*y @app.task def xsum(numbers): return sum(numbers) @app.task def getl(stri): return getlength(stri) def getlength(stri): return len(stri)
docker1上的agent.py:
from __future__ import absolute_import from myapp.celery import app @app.task def add(x,y): return {'value':str(x+y),'node_name':'docker1'} #增加了node_name用來識別節(jié)點 @app.task def writefile(): out=open('/tmp/data.txt','w') out.write('hello'+'\n') out.close() @app.task def mul(x,y): return x*y @app.task def xsum(numbers): return sum(numbers) @app.task def getl(stri): return getlength(stri) def getlength(stri): return len(stri)
docker2上的:
from __future__ import absolute_import from myapp.celery import app @app.task def add(x,y): return {'value':str(x+y),'node_name':'docker2'} @app.task def writefile(): out=open('/tmp/data.txt','w') out.write('hello'+'\n') out.close() @app.task def mul(x,y): return x*y @app.task def xsum(numbers): return sum(numbers) @app.task def getl(stri): return getlength(stri) def getlength(stri): return len(stri)
在這個例子中我只測試add()函數(shù):
在docker1節(jié)點上啟動worker:(用-Q指定監(jiān)聽的queue)
root@workgroup1:~/celeryapp# celery -A myapp worker -l info -Q machine1 /usr/local/lib/python2.7/dist-packages/celery/platforms.py:766: RuntimeWarning: You are running the worker with superuser privileges, which is absolutely not recommended! Please specify a different user using the -u option. User information: uid=0 euid=0 gid=0 egid=0 uid=uid, euid=euid, gid=gid, egid=egid, -------------- celery@workgroup1.hzg.com v3.1.17 (Cipater) ---- **** ----- --- * *** * -- Linux-3.13.0-24-generic-x86_64-with-Ubuntu-14.04-trusty -- * - **** --- - ** ---------- [config] - ** ---------- .> app: myapp:0x7f472d73f190 - ** ---------- .> transport: amqp://guest:**@172.16.77.175:5672// - ** ---------- .> results: amqp://guest@172.16.77.175// - *** --- * --- .> concurrency: 1 (prefork) -- ******* ---- --- ***** ----- [queues] -------------- .> machine1 exchange=machine1(direct) key=machine1 [tasks] . myapp.agent.add . myapp.agent.getl . myapp.agent.mul . myapp.agent.writefile . myapp.agent.xsum [2015-10-18 15:07:51,313: INFO/MainProcess] Connected to amqp://guest:**@172.16.77.175:5672// [2015-10-18 15:07:51,340: INFO/MainProcess] mingle: searching for neighbors [2015-10-18 15:07:52,372: INFO/MainProcess] mingle: sync with 1 nodes [2015-10-18 15:07:52,374: INFO/MainProcess] mingle: sync complete [2015-10-18 15:07:52,423: WARNING/MainProcess] celery@workgroup1.hzg.com ready.
啟動docker2上的worker:
root@workgroup2:~/celeryapp# celery -A myapp worker -l info -Q machine2 /usr/local/lib/python2.7/dist-packages/celery/platforms.py:766: RuntimeWarning: You are running the worker with superuser privileges, which is absolutely not recommended! Please specify a different user using the -u option. User information: uid=0 euid=0 gid=0 egid=0 uid=uid, euid=euid, gid=gid, egid=egid, -------------- celery@workgroup2.hzg.com v3.1.18 (Cipater) ---- **** ----- --- * *** * -- Linux-3.13.0-24-generic-x86_64-with-Ubuntu-14.04-trusty -- * - **** --- - ** ---------- [config] - ** ---------- .> app: myapp:0x7f708cb8ec10 - ** ---------- .> transport: amqp://guest:**@172.16.77.175:5672// - ** ---------- .> results: amqp://guest@172.16.77.175// - *** --- * --- .> concurrency: 1 (prefork) -- ******* ---- --- ***** ----- [queues] -------------- .> machine2 exchange=machine2(direct) key=machine2 [tasks] . myapp.agent.add . myapp.agent.getl . myapp.agent.mul . myapp.agent.writefile . myapp.agent.xsum [2015-10-18 15:08:52,114: INFO/MainProcess] Connected to amqp://guest:**@172.16.77.175:5672// [2015-10-18 15:08:52,144: INFO/MainProcess] mingle: searching for neighbors [2015-10-18 15:08:53,174: INFO/MainProcess] mingle: sync with 1 nodes [2015-10-18 15:08:53,176: INFO/MainProcess] mingle: sync complete [2015-10-18 15:08:53,227: WARNING/MainProcess] celery@workgroup2.hzg.com ready.
在任務(wù)發(fā)布節(jié)點發(fā)布一個計算任務(wù)給docker1:
root@workgroup0:~/celeryapp# ls default.etcd hots.sh hotswap.py myapp myapp1tmp people.db resp sora test.py root@workgroup0:~/celeryapp# python Python 2.7.6 (default, Mar 22 2014, 22:59:56) [GCC 4.8.2] on linux2 Type "help", "copyright", "credits" or "license" for more information. >>> from myapp.agent import add >>> res = add.apply_async(args=[122,34],queue='machine1',routing_key='machine1') >>> res.get() {u'value': u'156', u'node_name': u'docker1'}
用get()可以看到來自docker1的返回,再看看docker1的顯示:
[2015-10-18 15:11:51,217: INFO/MainProcess] Task myapp.agent.add[c487a9a2-e5cc-462b-a131-784b363a1952] succeeded in 0.03602907s: {'value': '156', 'node_name': 'docker1'}
至于docker2,一點沒動:
[2015-10-18 15:08:53,176: INFO/MainProcess] mingle: sync complete [2015-10-18 15:08:53,227: WARNING/MainProcess] celery@workgroup2.hzg.com ready.
發(fā)布一個任務(wù)給docker2:
>>> res = add.apply_async(args=[1440,900],queue='machine2',routing_key='machine2') >>> res.get() {u'value': u'2340', u'node_name': u'docker2'} >>>
以上就是使用celery怎么實現(xiàn)集群管理,小編相信有部分知識點可能是我們?nèi)粘9ぷ鲿姷交蛴玫降?。希望你能通過這篇文章學(xué)到更多知識。更多詳情敬請關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。
當(dāng)前題目:使用celery怎么實現(xiàn)集群管理
URL網(wǎng)址:http://www.dlmjj.cn/article/jhhgch.html