博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
RabbitMQie消息列队整理
阅读量:7032 次
发布时间:2019-06-28

本文共 4730 字,大约阅读时间需要 15 分钟。

使用方法过程,这儿只做了windows平台教程  

先安装Erlang 编程软件,然后设置环境变量,在安装RabbimMQ  ,这儿我下载了一个版本不行,后来换了最新版就好了,以后在使用过程 中如果有问题 ,可以换版本试一下,这是个坑。。然后 pip install pipk   

在编程器中粘上下代码测试  先是服务端:

import pika#连接队列服务器connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))channel = connection.channel()#创建队列。有就不管,没有就自动创建channel.queue_declare(queue='hello')#使用默认的交换机发送消息。exchange为空就使用默认的channel.basic_publish(exchange='',                      routing_key='hello',                      body='Hello 201!')print(" [x] Sent 'Hello World!'")connection.close()
View Code

在另一个文件粘上客服端:

 

import pikaimport time# 连接服务器connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))channel = connection.channel()# rabbitmq消费端仍然使用此方法创建队列。这样做的意思是:若是没有就创建。和发送端道理道理。目的是为了保证队列一定会有channel.queue_declare(queue='hello')# 收到消息后的回调def callback(ch, method, properties, body):    print("我收到了,正在等。。")    time.sleep(10)    print(" [x] Received %r" % body)channel.basic_consume(callback,                      queue='hello',                      #no_ack=True                       )print(' [*] Waiting for messages. To exit press CTRL+C')channel.start_consuming()
View Code
------------------------------------------------------------------------------ 以下是注释:
channel.basic_consume(callback,                       queue='hello',                       #no_ack=True   如果取消注释就会不等反应,如果客                       #服端开闭,没有外理的消息会丢失                        ) 在安装rabbitmq文件夹下有个叫rebbitmqctl的命令可以管里查看消息队列
消息持久化,持久化客户端,就是客户端关机数据没有处理,就会丢失,默认有机制解决这个问题,客户端关机,函数没有处理完会 发给另一个客户端,不想发给另一台,数据不重要直接写上:
channel.basic_consume(callback,                       queue='hello',      no_ack=True,                       #默认上面红色这句                        )
如果是防止服务端死机: 需要在二个地方修改,
 
channel.queue_declare(queue='hello'#durable = True,客户端和服务端在申明队列时都要写上,表示这个列队持 久 。。但是要消失都持久,还要在服各务端上:
hannel.basic_publish(exchange='',                       routing_key='hello',                       body="wwwwwwwwwww",                       properties=pika.BasicProperties(                           delivery_mode= 2                       )                       )
) 如果客户端处理不过来,,就先不要发消息过的解决方法,只需要在客户端的发送消息前加入: channel.basic_qos(prefetch_count=1) 代码片断如下:
收到消息后的回调 def callback(ch, method, properties, body):     print("我收到了,正在等。。")     time.sleep(10)     print(" [x] Received %r" % body)     #ch.basic_ack(delivery_tag=method.delivery_tag)#这句在我的版本内没有实际用 channel.basic_qos(prefetch_count=1) channel.basic_consume(callback,                       queue='hello',                       no_ack=True,                        ) 广播效果要用到到exchange,可以设规测,转发器:   有三个参数:   fanout 所有   direct  指定   topic规则 发送方,
channel.exchange_declare(exchange='direct_logs',                            type='direct') #py3中type是关健字,需要加成, exchange_type=    然后发送那儿:
channel.basic_publish(exchange='direct_logs',                       routing_key=""   #为空                       body=message) 服务端完成代码:
import pikaimport sys connection = pika.BlockingConnection(pika.ConnectionParameters(        host='localhost'))channel = connection.channel() channel.exchange_declare(exchange='direct_logs',                         type='direct') severity = sys.argv[1] if len(sys.argv) > 1 else 'info'message = ' '.join(sys.argv[2:]) or 'Hello World!'channel.basic_publish(exchange='direct_logs',                      routing_key=severity,                      body=message)print(" [x] Sent %r:%r" % (severity, message))connection.close()
View Code

 

 
消费方要改的多一点:第一个地方
channel.exchange_declare(exchange='direct_logs',                            type='direct') #py3中type是关健字,需要加成, exchange_type=  第二个地方 在下面加上: result = channel.queue_declare(exclusive = True) queur_name = result.method.queue channel.queue_bind(exchange = 'loags'                                queue = queue_name      ) 完成代码:
import pikaimport sys connection = pika.BlockingConnection(pika.ConnectionParameters(        host='localhost'))channel = connection.channel() channel.exchange_declare(exchange='direct_logs',                         type='direct') result = channel.queue_declare(exclusive=True)queue_name = result.method.queue severities = sys.argv[1:]if not severities:    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])    sys.exit(1) for severity in severities:    channel.queue_bind(exchange='direct_logs',                       queue=queue_name,                       routing_key=severity) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body):    print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback,                      queue=queue_name,                      no_ack=True) channel.start_consuming()

首先我写博客其实就是学习笔记,写到这儿赖了,,多对多,双向通信有空在补上。。。。。。待续

 
 
 
 
 

转载于:https://www.cnblogs.com/fgxwan/p/9661371.html

你可能感兴趣的文章
自动补0
查看>>
Python如何批量给云主机配置安全组?
查看>>
iOS音频播放 (四):AudioFile
查看>>
Oracle 存储过程异常处理
查看>>
URL里面存在下划线导致SESSION在IE下被分隔
查看>>
自定义注册的 Windows服务 无法访问网络共享文件解决办法
查看>>
postgersql日志备份和还原
查看>>
简单的pythonweb程序
查看>>
RemoteView概述
查看>>
JAVA集合小结
查看>>
ubuntu下android 源码下载
查看>>
Oracle数据库角色管理
查看>>
订单系统 高级设计
查看>>
flutter 底部输入框 聊天输入框 Flexible
查看>>
mac安装thrift 0.93
查看>>
cxf客户端代码自动生成
查看>>
sql语句的分页技术
查看>>
android定位和地图开发实例
查看>>
Spring从入门到精通视频教程合集
查看>>
mtr 命令详解(跟踪路由)
查看>>