Celery分布式任务
celery call把任务给一个组件,组件交给rabiitmq放到队列broker,队列返回任务id给celery组件再给call,
任务完成时call拿着id通过celery去rabbitmq取。broker发任务给worker1.Celery有以下优点:
简单:一单熟悉了celery的工作流程后,配置和使用还是比较简单的
高可用:当任务执行失败或执行过程中发生连接中断,celery 会自动尝试重新执行任务 快速:一个单进程的celery每分钟可处理上百万个任务 灵活: 几乎celery的各个组件都可以被扩展及自定制2.Celery安装使用
Celery的默认broker是RabbitMQ, 仅需配置一行就可以 broker_url = 'amqp://guest:guest@localhost:5672//' 使用Redis做broker也可以 安装redis组件 pip install -U "celery[redis]" app.conf.broker_url = 'redis://localhost:6379/0' redis://:password@hostname:port/db_number 配置一下把任务结果存在哪:app.conf.result_backend = 'redis://localhost:6379/0'3.开始使用Celery啦 pip install celery 软链:ln -s /usr/local/python3/bin/celery /usr/bin/celery 移除软链:rm + name4.简单任务 创建一个任务文件 from celery import Celeryapp = Celery('tasks', # app的名字
broker='redis://:admin@192.168.80.128:6379/0') # backend='redis://localhost')@app.task
def add(x, y): # 加装饰器,这是worker可以执行的一个任务 print("running...", x, y) return x + y 启动Celery Worker来开始监听并执行任务 celery -A tasks worker --loglevel=info 注:window上报错,pip install eventlet celery -A <mymodule> worker -l info -P eventlet 调用任务 再打开一个终端, 进行命令行模式,调用任务 from tasks import add add.delay(4, 4) result = add.delay(4, 4) #赋值变量后能接收值 发布任务后,多个worker时只能由其中一个抢到并执行,linux和window的worker一样 任务状态: result.ready() 取结果时,设置超时时间 result.get(timeout=1) #如果还没处理完,会报错 propagate 参数覆盖get的报错,而是返回任务的报错 result.get(propagate=False) 回撤 result.traceback二、在项目中使用celery 1.目录格式 proj/__init__.py /celery.py #配置连接 /tasks.py 2. proj/celery.py内容 from __future__ import absolute_import, unicode_literals # 默认导入当前同名文件,这里导入后从python包的绝对路径导入,支持unicode兼容 from celery import Celeryapp = Celery('proj',
broker='redis://:admin@192.168.80.128:6379/0', backend='redis://:admin@192.168.80.128:6379/0', include=['celery_project.tasks']) # 要执行的任务文件模板位置# Optional configuration, see the application user guide.
app.conf.update( result_expires=3600, )if __name__ == '__main__':
app.start() 3. proj/tasks.py中的内容 from __future__ import absolute_import, unicode_literals from .celery import app # .表示不绝对导入 @app.task def add(x, y): return x + y @app.task def mul(x, y): return x * y @app.task def xsum(numbers): return sum(numbers) 4.启动celery的worker elery -A celery_project worker -l info -P eventlet 5.发布任务 进入项目所在文件夹,cmd导入运行 # 这时不能从文件导入函数,只能在上一层文件夹导入文件,相对路径.xxx不能作为函数主入口 from celery_project import tasks t = tasks.add.delay(2,3) t.get() 6.后台进程(仅linux) 启动 celery multi start w1 -A proj -l info 停止 celery multi stop w1 -A proj -l info celery multi stopwait w1 -A proj -l info # 等待任务结束才停止 查看进程 ps -ef |grep celery三、celery定时任务 celery支持定时任务,设定好任务的执行时间,celery就会定时自动帮你执行, 这个定时任务模块叫celery beat celery beat会循环监听定时任务,负责交给worker 1.编写定时任务,加入app的include from __future__ import absolute_import, unicode_literals from .celery import app # .表示不绝对导入 from celery.schedules import crontab@app.on_after_configure.connect # 连接后启动任务
def setup_periodic_tasks(sender, **kwargs): # Calls test('hello') every 10 seconds. sender.add_periodic_task(10.0, test.s('hello'), name='add every 10') # 每隔10秒执行,.s相当于delay传参# Calls test('world') every 30 seconds
sender.add_periodic_task(30.0, test.s('world'), expires=10) # 结果保存10秒# Executes every Monday morning at 7:30 a.m.
sender.add_periodic_task( crontab(hour=7, minute=30, day_of_week=1), # crontab是linux的定时模块 crontab -e 编写定时文件 分钟 小时 日 月 星期几 command test.s('Happy Mondays!'), ) @app.task def test(arg): print('run fun',arg) 2.启动celery beat celery -A celery_project.periodic beat -l info # 在根目录执行,或者理解为任务的命名 # beat就会根据定时时间wake up来把任务交给worker # 指定beat最后运行时间要存储在哪个位置,默认当前目录 celery -A celery_project.periodic beat -s /home/celery/var/run/celerybeat-schedule 配置文件添加定时任务 app.conf.beat_schedule = { 'add-every-30-seconds': { 'task': 'celery_project.tasks.add', # 一个app下该任务的命名 'schedule': 30.0, 'args': (16, 16) }, } app.conf.timezone = 'UTC' 更复杂的定时配置 用crontab功能,跟linux自带的crontab功能是一样的,可以个性化定制任务执行时间 from celery.schedules import crontab app.conf.beat_schedule = { # Executes every Monday morning at 7:30 a.m. 'add-every-monday-morning': { 'task': 'celery_project.tasks.add', 'schedule': crontab(hour=7, minute=30, day_of_week=1), 'args': (16, 16), }, } 四、与django结合使用 django 可以轻松跟celery结合实现异步任务,只需简单配置即可 1.定义celery项目实例 from __future__ import absolute_import, unicode_literals import os from celery import Celery# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'MyBlog.settings')app = Celery('task') # 项目名
# Using a string here means the worker don't have to serialize
# the configuration object to child processes. # - namespace='CELERY' means all celery-related configuration keys # should have a `CELERY_` prefix. app.config_from_object('django.conf:settings', namespace='CELERY') # 使用django的session# Load task modules from all registered Django app configs.
app.autodiscover_tasks() # 项目下所有app的celery任务发现 @app.task(bind=True) def debug_task(self): print('Request: {0!r}'.format(self.request)) 2.在django的settings配置celery的连接 #for celery CELERY_BROKER_URL = 'redis://:admin@192.168.80.128:6379/0' CELERY_RESULT_BACKEND = 'redis://:admin@192.168.80.128:6379/0' 3.设置celery随django项目一起启动 from __future__ import absolute_import, unicode_literals # This will make sure the app is always imported when # Django starts so that shared_task will use this app. from .celery import app as celery_app __all__ = ['celery_app'] 4.具体的app里在tasks.py写要执行的任务,以被worker执行 # Create your tasks here from __future__ import absolute_import, unicode_literals from celery import shared_task # 多个app共享worker@shared_task
def add(x,y): return x + y@shared_task
def mul(x, y): return x * y@shared_task
def xsum(numbers): return sum(numbers) 5.在app里写django的views,然后调用上面写好的celery task,这样访问页面时自动发布任务 from django.http import HttpResponse from celery.result import AsyncResult from .celery_task import addtask_id=None
def celery_pub(request): task = add.delay(22,23) global task_id task_id = task.id # 拿到任务id即可返回,不用等待get获取值,以后再调用id拿值 return HttpResponse(task.id)def celery_get(request):
global task_id result = AsyncResult(id=task_id) return HttpResponse(result.get()) 6.进入Django根目录,通过cmd启动worker以接收并执行任务 celery -A 项目名 worker -l info # 访问页面拿到id,再通过另一个请求拿到结果。五、Django定时任务 1.添加django_celery_beat模块到django的INSTALLED_APPS INSTALLED_APPS = ( ..., 'django_celery_beat', ) 2.创建数据库中的表格 python manage.py migrate 3.在admin中添加定时任务(已注册的任务)到数据库 4.启动beat去数据库取任务 celery -A MyBlog beat -l info -S django 5.启动worker即可