欢迎访问 生活随笔!

生活随笔

当前位置: 首页 > 运维知识 > windows >内容正文

windows

用 Celery 实现邮件推送系统

发布时间:2025/7/14 windows 54 豆豆
生活随笔 收集整理的这篇文章主要介绍了 用 Celery 实现邮件推送系统 小编觉得挺不错的,现在分享给大家,帮大家做个参考.

2019独角兽企业重金招聘Python工程师标准>>>

系统需求

本文以Celery 实现分布式任务队列为基础,简述了一个邮件推送系统的模型。

Celery 是 Distributed Task Queue,分布式任务队列,分布式决定了可以有多个 worker 的存在,队列表示其是异步操作,即存在一个产生任务提出需求的工头,和一群等着被分配工作的码农。

需求:

1.在邮件推送系统中,我们需要对成千上万的用户发送邮件,发送邮件具有时效性,即不能说今天开始发邮件,要等到明天才能发送完毕。

2.发送邮件过程中,可能会遇到过于频繁,邮件服务器上信件堆积无法及时接受新信件而产生的拒信,或者邮件服务器将我们的邮件判决为垃圾邮件。

3.邮件发送的 I/O 时间较长,不能让程序在等待邮件服务器返回消息上浪费时间。

所以我们的推送系统要有以下特性:1.分布式处理作业;2.闭环监控;3.异步式分发作业

系统框图

前端通过 ajax 调用 views 中的 callpush 接口,该接口将被推送用户的筛选条件传入 service,然后 service 请求数据库,将返回数据作为参数调用 celery 接口中 addtask 函数。celery 接口中 addtask 根据 action 参数来判断所要添加的任务类型,根据不同的类型分别进行处理,放入队列。

系统的另外一头,worker 从队列中取出任务,用 mail 函数推送邮件,如果发送失败就调用 error_handler 进行异常处理,此处我们将所有 task 的执行情况放入 redis 中,给每个任务进行标记,如果成功则标记为 1,失败则 0.

前端可以通过 ajax 调用 pushstatus 来向 redis 中读取任务执行情况,此处我们返回了成功和失败任务的个数。

伪代码实现

# Controller from redis import StrictRedis red = StrictRedis(host='localhost', port=6379, db=0)def callpush(request):area = request.POST.get('area')return HttpResponse(str(mailpush(area)))def pushstatus(request):failure = red.scard('status:0:task')success = red.scard('status:1:task')return HttpResponse('Failures: ' + str(failure) + '\nSuccess: ' + str(success))# Service def mailpush(**kargs):targets = MtUser.objects.filter(kargs).values('username', 'address')addtask(action='mailpush', data=targets, content='Hello %s!', subject='Greetings')return len(targets)# Celery Interface (Dispatcher) from celery import Celeryapp = Celery() app.config_from_object('celeryconfig')def addtask(action, data, **kargs):if action == 'mailpush':for (address, username) in data:app.send_task('worker.mail', args=[kargs['subject'], kargs['content'] % username, address], link_error=app.signature('worker.error_handler'))elif action == 'messagepush':passelse:pass# Celery Backend (Worker) from celery import Celery from celery import Task from redis import StrictRedisapp = Celery() app.config_from_object('celeryconfig') red = StrictRedis(host='localhost', port=6379, db=0)@app.task(bind=True) def mail(self, subject, content, address):from django.core.mail import EmailMessagemsg = EmailMessage(subject, content, 'admin@admin.com', address)msg.content_subtype = 'html'msg.send()red.sadd('status:1:task', self.request.id)# Overwrite the on_failure function in trace.py @app.task def error_handler(uuid, args):print uuidprint argsred.set(uuid, args)red.sadd('status:0:task', uuid)red.srem('status:1:task', uuid)

转载于:https://my.oschina.net/shinedev/blog/500554

总结

以上是生活随笔为你收集整理的用 Celery 实现邮件推送系统的全部内容,希望文章能够帮你解决所遇到的问题。

如果觉得生活随笔网站内容还不错,欢迎将生活随笔推荐给好友。