Python rabbitmq的使用(二)

上一篇介绍了rabbitmq的安装和经典的hello world!实例。这里将对工作队列(work queues)做一个了解。因为是接上一篇说明的,所以如果没看过上一篇,看这篇可能会比较难理解。上一篇的地址是:ubuntu安装rabbitmq和python的使用实现

消息也可以理解为任务,消息发送者可以理解为任务分配者,消息接收者可以理解为工作者,当工作者接收到一个任务,还没完成的时候,任务分配者又发一个任务过来,那就忙不过来了,于是就需要多个工作者来共同处理这些任务,这些工作者,就称为工作队列。结构图如下:

246.jpg

rabbitmq的python实例工作队列

准备工作(Preparation)

立即学习“Python免费学习笔记(深入)”;

在实例程序中,用new_task.py来模拟任务分配者, worker.py来模拟工作者。

修改send.py,从命令行参数里接收信息,并发送

import sysmessage= ' '.join(sys.argv[1:])or "Hello World!"channel.basic_publish(exchange='',routing_key='hello',body=message)print " [x] Sent %r" % (message,)

登录后复制

修改receive.py的回调函数。

import timedef callback(ch, method, properties, body):print " [x] Received %r" % (body,)time.sleep( body.count('.') )print " [x] Done"

登录后复制

这边先打开两个终端,都运行worker.py,处于监听状态,这边就相当于两个工作者。打开第三个终端,运行new_task.py

$ python new_task.py First message.$ python new_task.py Second message..$ python new_task.py Third message...$ python new_task.py Fourth message....$ python new_task.py Fifth message.....

登录后复制

观察worker.py接收到任务,其中一个工作者接收到3个任务 :

$ python worker.py[*] Waiting for messages. To exit press CTRL+C[x] Received 'First message.'[x] Received 'Third message...'[x] Received 'Fifth message.....'

登录后复制

另外一个工作者接收到2个任务 :

$ python worker.py[*] Waiting for messages. To exit press CTRL+C[x] Received 'Second message..'[x] Received 'Fourth message....'

登录后复制

从上面来看,每个工作者,都会依次分配到任务。那么如果一个工作者,在处理任务的时候挂掉,这个任务就没有完成,应当交由其他工作者处理。所以应当有一种机制,当一个工作者完成任务时,会反馈消息。

消息确认(Message acknowledgment)

消息确认就是当工作者完成任务后,会反馈给rabbitmq。修改worker.py中的回调函数:

def callback(ch, method, properties, body):print " [x] Received %r" % (body,)time.sleep(5)print " [x] Done"ch.basic_ack(delivery_tag= method.delivery_tag)

登录后复制

这边停顿5秒,可以方便ctrl+c退出。

去除no_ack=True参数或者设置为False也可以。

channel.basic_consume(callback, queue='hello', no_ack=False)

登录后复制

用这个代码运行,即使其中一个工作者ctrl+c退出后,正在执行的任务也不会丢失,rabbitmq会将任务重新分配给其他工作者。

消息持久化存储(Message durability)

虽然有了消息反馈机制,但是如果rabbitmq自身挂掉的话,那么任务还是会丢失。所以需要将任务持久化存储起来。声明持久化存储:

channel.queue_declare(queue='hello', durable=True)

登录后复制

但是这个程序会执行错误,因为hello这个队列已经存在,并且是非持久化的,rabbitmq不允许使用不同的参数来重新定义存在的队列。重新定义一个队列:

channel.queue_declare(queue='task_queue', durable=True)

登录后复制

在发送任务的时候,用delivery_mode=2来标记任务为持久化存储:

channel.basic_publish(exchange='',routing_key="task_queue",body=message,properties=pika.BasicProperties(delivery_mode= 2,# make message persistent))

登录后复制

公平调度(Fair dispatch)

上面实例中,虽然每个工作者是依次分配到任务,但是每个任务不一定一样。可能有的任务比较重,执行时间比较久;有的任务比较轻,执行时间比较短。如果能公平调度就最好了,使用basic_qos设置prefetch_count=1,使得rabbitmq不会在同一时间给工作者分配多个任务,即只有工作者完成任务之后,才会再次接收到任务。

channel.basic_qos(prefetch_count=1)

登录后复制

new_task.py完整代码

#!/usr/bin/env pythonimport pikaimport sysconnection= pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))channel= connection.channel()channel.queue_declare(queue='task_queue', durable=True)message= ' '.join(sys.argv[1:])or "Hello World!"channel.basic_publish(exchange='',routing_key='task_queue',body=message,properties=pika.BasicProperties(delivery_mode= 2,# make message persistent))print " [x] Sent %r" % (message,)connection.close()

登录后复制

worker.py完整代码

#!/usr/bin/env pythonimport pikaimport timeconnection= pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))channel= connection.channel()channel.queue_declare(queue='task_queue', durable=True)print ' [*] Waiting for messages. To exit press CTRL+C'def callback(ch, method, properties, body):print " [x] Received %r" % (body,)time.sleep( body.count('.') )print " [x] Done"ch.basic_ack(delivery_tag= method.delivery_tag)channel.basic_qos(prefetch_count=1)channel.basic_consume(callback,queue='task_queue')channel.start_consuming()

登录后复制

以上就是Python rabbitmq的使用(二)的内容,更多相关内容请关注PHP中文网(www.php.cn)!

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至253000106@qq.com举报,一经查实,本站将立刻删除。

发布者:PHP中文网,转转请注明出处:https://www.chuangxiangniao.com/p/2280642.html

(0)
上一篇 2025年2月27日 18:07:52
下一篇 2025年2月26日 06:32:13

AD推荐 黄金广告位招租... 更多推荐

相关推荐

  • Python rabbitmq的使用(四)

    第三篇说明了关于交换机的使用,已经能实现给所有接收端发送消息,但是如果需要自由定制,有的消息发给其中一些接收端,有些消息发送给另外一些接收端,要怎么办呢?这种情况下就要用到路由键了。 路由键的工作原理:每个接收端的消息队列在绑定交换机的时候…

    编程技术 2025年2月27日
    200
  • Python中RabbitMQ的操作图文代码详解

    知识准备 RabbitMQ rabbitmq是一个在amqp基础上完整的,可复用的企业消息系统。 MQ MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。 消息队列使用发布订-阅模式工作。消息发送者…

    2025年2月27日
    200

发表回复

登录后才能评论