rabbitMQ 的基礎知識這里就不闡述了,可以參看我早年寫的文章 -> rabbitMQ 入門
https://blog.csdn.net/weixin_41997073/article/details/118724779
Celery 官網:http://www.celeryproject.org/
Celery 官方文檔英文版:http://docs.celeryproject.org/en/latest/index.html
Celery 官方文檔中文版:http://docs.jinkan.org/docs/celery/
celery 還有定時運行,redis 緩沖使用等高級特性,可以參看 Celery的官網進行研究。
這里講些 rabbitMQ 如何與程序嵌入,使用的是 django 的基礎框架
django 使用celery 插件,在配置文件配置,并啟動 celery 就可以與 rabbitMQ 進行交互
配置如下
INSTALLED_APPS = ['rest_framework',
]
from celery import Celery
from kombu import QueueCELERY_WORKER_POOL = 'prefork' # 禁用gevent池
CELERY_BROKER_URL = 'amqp://test:test_password@localhost:5672/test_vhost'
CELERY_RESULT_BACKEND = 'django-db' # 使用數據庫存儲結果
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_ENABLE_UTC = False
CELERY_TASK_TRACK_STARTED = True
CELERY_RESULT_EXPIRES = 3600 # 任務結果保存1小時CELERY_WORKER_CONCURRENCY = 4
# [ IO 密集型, 每個worker進程可以預取 并發數 × 預乘數 個任務到內存中 ]
# 設置為1000,表示每個worker進程處理1000個任務后會自動重啟
CELERY_WORKER_PREFETCH_MULTIPLIER = 1
CELERY_WORKER_MAX_TASKS_PER_CHILD = 1000
CELERY_TASK_ACKS_LATE = True# Celery 隊列配置
CELERY_TASK_QUEUES = (Queue('default', routing_key='task.default'),Queue('high_priority', routing_key='task.high_priority'),Queue('celery', routing_key='task.celery'),
)# Celery 路由配置
CELERY_TASK_ROUTES = {'metadata.tasks.process_rabbitmq_message': {'queue': 'high_priority','routing_key': 'task.high_priority'},'metadata.tasks.async_update_record': {'queue': 'default','routing_key': 'task.default'},
}# CELERY_TASK_QUEUES = (
# Queue('high_priority', Exchange('high_priority'), routing_key='high_priority'),
# Queue('default', Exchange('default'), routing_key='default'),
# Queue('low_priority', Exchange('low_priority'), routing_key='low_priority'),
# )MONKEY_PATCH_SETTINGS = {'subprocess': False,'thread': False, # 禁用線程補丁
}
注意:
django創建的數據模型需要手動遷移,還有rabbitMQ 的虛擬主機和隊列也需要進行驗證,確保可用。
在 django 的主入口 url 目錄中建立celery 腳本 ,并在該目錄的初始化 init 配置文件中添加 celery 信息
就可以在 django 的 manage.py 平級目錄使用 celery 進行啟動與 rabbitMQ 的交互 。
celery -A djangoProjectMetaDataManagement worker --loglevel=info -Q high_priority,defaultcelery -A djangoProjectMetaDataManagement.celery worker --loglevel=info為了安裝考慮看情況也可以創建一個獨立的用戶運行celery
# 創建專用用戶
useradd celeryuser# 使用非 root 用戶運行
celery -A djangoProjectMetaDataManagement.celery worker --loglevel=info --uid=celeryuser --gid=celeryuser
celery.py
import os
from celery import Celery
from kombu import QueueCELERY_BROKER_URL = 'amqp://test:test_password@localhost/test_vhost'
CELERY_RESULT_BACKEND = Noneos.environ.setdefault('DJANGO_SETTINGS_MODULE', 'djangoProjectMetaDataManagement.settings')app = Celery('djangoProjectMetaDataManagement')
#app.config_from_object('django.conf:settings', namespace='CELERY')# 使用 Django 的設置文件配置 Celery
app.config_from_object('django.conf:settings', namespace='CELERY')app.conf.update(worker_prefetch_multiplier=1, # 每個Worker進程每次只預取1個任務worker_max_tasks_per_child=100, # 可選:限制每個Worker進程處理的任務數后重啟
)app.conf.task_queues = (Queue('default'