django+celery实现分布式任务
想用django做一个自动运维平台,利用netsnmp来获取交换机及服务器信息,但是snmpget任务需要在后台实时运行,为了不影响html响应,利用celery来结合django做异步任务队列。
10年积累的做网站、成都网站制作经验,可以快速应对客户对网站的新想法和需求。提供各种问题对应的解决方案。让选择我们的客户得到更好、更有力的网络服务。我虽然不认识你,你也不认识我。但先网站设计后付款的网站建设流程,更有北辰免费网站建设让你可以放心的选择与我们合作。
一、环境准备
1.首先安装celery
pip3 install celery
2.安装djcelery
pip3 install django-celery
3.安装一个broker
我们必须拥有一个broker消息队列用于发送和接收消息。Celery官网给出了多个broker的备选方案:RabbitMQ、redis、Database(不推荐)以及其他的消息中间件。本次我们利用redis
sudo apt-get install redis
redis-server
启动redis服务, 端口假设为6379
redis-cli 查看redis 状态
root@ubuntu:~/Desktop/webserver/myserver# redis-cli
127.0.0.1:6379>
二、django 配置
配置settings.py
首先,在Django工程的settings.py文件中加入如下配置代码:
celery 配置信息 start
#############################
import djcelery
celery 配置
djcelery.setup_loader()
BROKER_URL = 'redis://127.0.0.1:6379/1'
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'
CELERY_IMPORTS = ('serverapp.task')
#############################
celery 配置信息 end
#############################
当djcelery.setup_loader()运行时,Celery便会去查看INSTALLD_APPS下包含的所有app目录中的tasks.py文件,找到标记为task的方法,将它们注册为celery task
BROKER_URL:broker是代理人,它负责分发任务给worker去执行。我使用的是Redis作为broker
没有设置 CELERY_RESULT_BACKEND,默认没有配置,此时Django会使用默认的数据库(也是你指定的orm数据库)。
CELERY_IMPORTS:是导入目标任务文件
CELERYBEAT_SCHEDULER:使用了django-celery默认的数据库调度模型,任务执行周期都被存在默认指定的orm数据库中.
CELERYBEAT_SCHEDULE:设置定时的时间配置, 可以精确到秒,分钟,小时,天,周等。
3.在主工程的配置文件settings.py 中应用注册表INSTALLED_APPS中加入 djcelery
INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'djcelery', #加入djcelery
]
4.(3)创建应用实例
在主工程目录添加celery.py, 添加自动检索django工程tasks任务
vim artproject/celery.py
#目的是拒绝隐士引入,celery.py和celery冲突。
from futureimport absolute_import
import os
from celery import Celery, platforms
platforms.C_FORCE_ROOT = True
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myserver.settings')
#Specifying the settings here means the celery command line program will know where your Django project is.
#This statement must always appear before the app instance is created, which is what we do next:
from django.conf import settings
app = Celery('serverapp')
app.config_from_object('django.conf:settings')
#This means that you don’t have to use multiple configuration files, and instead configure Celery directly from the Django settings.
#You can pass the object directly here, but using a string is better since then the worker doesn’t have to serialize the object.
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
#With the line above Celery will automatically discover tasks in reusable apps if you define all tasks in a separate tasks.py module.
#The tasks.py should be in dir which is added to INSTALLED_APP in settings.py.
#So you do not have to manually add the individual modules to the CELERY_IMPORT in settings.py.
@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request)) #dumps its own request information
5.(4) 创建任务 tasks
每个任务本质上就是一个函数,在tasks.py中,写入你想要执行的函数即可。
在应用art中添加我们需要提供的异步服务和定时服务
vim art/tasks.py
#!/usr/bin/env python
encoding: utf-8
from futureimport absolute_import
import time
from django.core.mail import send_mail
from celery.utils.log import get_task_logger
from artproject.celery import app
from art.utils.send_mail import pack_html, send_email
@app.task
br/>
@app.task
url = "http://1000phone.com"
receiver = 'zhouguangyou@1000phone.com'
content = pack_html(receiver, url)
content = 'this is email content.'
send_email(receiver, content)
print('send email ok!')
@app.task
br/>
@app.task
return x+y
6.迁移生成celery需要的数据表
python manage.py migrate
此时数据库表结构多出了几个
7.
8.
9.启动服务,测试
我们可以采用 python manage.py help 发现多出了 celery 相关选项。
(1)启动django celery 服务
启动服务:
python manage.py celery worker --loglevel=info
此时异步处理和定时处理服务都已经启动了
(2)web端接口触发异步任务处理
我们在web端加入一个入口,触发异步任务处理add函数
在应用art的urls.py 中加入如下对应关系
from art.views import add_handler
url(r'^add', add_handler),
art/views.py 中加入处理逻辑
def add_handler(request):
x = request.GET.get('x', '1')
y = request.GET.get('y', '1')
from .tasks import add
add.delay(int(x), int(y))
res = {'code':200, 'message':'ok', 'data':[{'x':x, 'y':y}]}
return HttpResponse(json.dumps(res))
启动web服务,通过url传入的参数,通过handler的add.delay(x, y)计算并存入MySQL
http://127.0.0.1:8000/art/add?x=188&y=22
(4) 测试定时器,发送邮件
在终端输入 python manage.py celerybeat -l info
会自动触发每隔30s执行一次tsend_email定时器函数,发送邮件:
CELERYBEAT_SCHEDULE = { #定时器策略
#定时任务一: 每隔30s运行一次
u'测试定时器1': {
"task": "art.tasks.tsend_email",
#"schedule": crontab(minute='*/2'), # or 'schedule': timedelta(seconds=3),
"schedule":timedelta(seconds=30),
"args": (),
},
}
具体发送邮件服务程序见下面的第4节
4 邮件发送服务
项目中经常会有定时发送邮件的情形,比如发送数据报告,发送异常服务报告等。
可以编辑文件 art/utils/send_mail.py, 内容编辑如下:
#!/usr/bin/env python
#-- coding:utf-8 --
#written by zhouguangyou
#发送邮件(wd_email_check123账号用于内部测试使用,不要用于其他用途)
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.image import MIMEImage
from email.header import Header
import time
sender = 'wwwwww@163.com'
subject = u'api开放平台邮箱验证'
smtpserver = 'smtp.163.com'
username = 'wwwwww'
password = 'wwwww1234'
mail_postfix="163.com"
def send_email(receiver, content):
try:
me = username+"<"+username+"@"+mail_postfix+">"
msg = MIMEText(content, 'html', 'utf-8')
msg['Subject'] = subject
msg['From'] = sender
msg['To'] = receiver
smtp = smtplib.SMTP()
smtp.connect(smtpserver)
smtp.login(username, password)
smtp.sendmail(sender, receiver, msg.as_string())
smtp.quit()
return True
except Exception as e:
print('send_email has error with : ' + str(e))
return False
def pack_html(receiver, url):
html_content = u"
" \
"
" \
"
" \
"
" \
"
" \
"
"" % (receiver, url, url)
html_content = html_content
return html_content
if name== "main":
url = "http://xxxx.com"
receiver = 'xxx@126.com'
#content = pack_html(receiver, url)
content = 'this is email content. at %s.'%int(time.time())
send_email(receiver, content)
至此,在celery ui界面可以看到两类,定时器处理和异步处理。
网页题目:django+celery实现分布式任务
分享链接:http://scyanting.com/article/jsjdop.html