欢迎访问 生活随笔!

生活随笔

当前位置: 首页 >

tornado celery mysql_Python3.7+Tornado5+Celery3+Rabbitmq3实现异步队列任务

发布时间:2025/3/12 32 豆豆
生活随笔 收集整理的这篇文章主要介绍了 tornado celery mysql_Python3.7+Tornado5+Celery3+Rabbitmq3实现异步队列任务 小编觉得挺不错的,现在分享给大家,帮大家做个参考.

在之前的一篇文章中提到了用Django+Celery+Redis实现了异步任务队列,只不过消息中间件使用了redis,redis作为消息中间件可谓是差强人意,功能和性能上都不如Rabbitmq,所以本次使用tornado框架结合celery,同时消息中间件使用Rabbitmq来实现异步发邮件,并且使用flower来监控任务队列。

首先安装rabbitmq

Mac os直接运行brew命令安装

#安装服务

brew install rabbitmq

#启动服务

brew services start rabbitmq

Win10系统就要下载安装包进行安装了,由于rabbitmq是基于erlang的,所以要首先安装erlang

1、首先,下载并运行Erlang for Windows 安装程序 (地址:http://www.erlang.org/downloads)下载完毕并安装(注意:安装目录请选择默认目录)

安装成功后,启用web管理UI,进入RabbitMQ Serverrabbitmq_server-3.6.6sbin,输入命令rabbitmq-plugins enable rabbitmq_management

在系统的开始菜单里找到RabbitMQ的启动菜单,启动服务

浏览器输入,http://localhost:15672/,使用默认用户guest/guest进入网页端控制台:

代表没有问题了

然后安装tornado和celery,注意指定版本号

pip3 install tornado==5.1.1

pip3 install celery ==3.1

pip3 install pika ==0.9.14

pip3 install tornado-celery

pip3 install flower

需要注意一点,由于python3.7中async已经作为关键字存在,但是有的三方库还没有及时修正,导致它们自己声明的变量和系统关键字重名,所以我们要深入三方库的源码,帮他们修改async关键字为async_my,需要修改的文件夹和文件包含但不限于:

/site-packages/pika/adapters/libev_connection.py

/site-packages/celery下面的文件

/site-packages/kombu下面的文件夹

在tornado项目下新建一个任务队列文件task.py:

import time

from celery import Celery

from func_tool import mail

C_FORCE_ROOT=True

celery = Celery("tasks", broker="amqp://guest:guest@localhost:5672")

celery.conf.CELERY_RESULT_BACKEND = "amqp"

@celery.task

def sleep(seconds):

time.sleep(float(seconds))

return seconds

@celery.task

def sendmail(title,text,tomail):

mail(title,text,tomail)

return '发送邮件成功'

if __name__ == "__main__":

celery.start()

然后编写服务端代码:

from celery import Celery

from tornado import gen

import tcelery

sys.path.append("..")

import task

#异步任务

class CeleryHandler(BaseHandler):

@gen.coroutine

def get(self):

response = yield gen.Task(task.sendmail.apply_async,args=['你好','非常好','164850527@qq.com'])

self.write('ok')

self.finish()

路由器代码:

import tornado.web

from views import Index

import config

#路由

class Application(tornado.web.Application):

def __init__(self):

handlers = [

(r"/celery", Index.CeleryHandler)

]

super(Application,self).__init__(handlers,**config.setting)

程序入口代码server.py:

import tornado.ioloop

import tornado.httpserver

import config

from application import Application

if __name__ == "__main__":

print('启动...')

app = Application()

httpServer = tornado.httpserver.HTTPServer(app)

# httpServer.listen(8888)

#绑定端口

httpServer.bind(config.options['port'])

#开启5个子进程(默认1,若为None或者小于0,开启对应硬件的CPU核心数个子进程)

httpServer.start(1)

tornado.ioloop.IOLoop.current().start()

进入项目目录,分别启动tornado服务,celery服务,以及flower服务

python server.py

celery -A task worker --loglevel=info

celery flower -A task --broker=amqp://guest:guest@localhost:5672//

访问网址http://localhost:8000/celery 用来触发异步任务

后台服务显示任务返回值:

进入flower在线任务监控网址:http://localhost:5555/

至此,整个流程就走完了。

总结

以上是生活随笔为你收集整理的tornado celery mysql_Python3.7+Tornado5+Celery3+Rabbitmq3实现异步队列任务的全部内容,希望文章能够帮你解决所遇到的问题。

如果觉得生活随笔网站内容还不错,欢迎将生活随笔推荐给好友。