使用方法过程,这儿只做了windows平台教程
先安装Erlang 编程软件,然后设置环境变量,在安装RabbimMQ ,这儿我下载了一个版本不行,后来换了最新版就好了,以后在使用过程 中如果有问题 ,可以换版本试一下,这是个坑。。然后 pip install pipk
在编程器中粘上下代码测试 先是服务端:
import pika#连接队列服务器connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))channel = connection.channel()#创建队列。有就不管,没有就自动创建channel.queue_declare(queue='hello')#使用默认的交换机发送消息。exchange为空就使用默认的channel.basic_publish(exchange='', routing_key='hello', body='Hello 201!')print(" [x] Sent 'Hello World!'")connection.close()
在另一个文件粘上客服端:
import pikaimport time# 连接服务器connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))channel = connection.channel()# rabbitmq消费端仍然使用此方法创建队列。这样做的意思是:若是没有就创建。和发送端道理道理。目的是为了保证队列一定会有channel.queue_declare(queue='hello')# 收到消息后的回调def callback(ch, method, properties, body): print("我收到了,正在等。。") time.sleep(10) print(" [x] Received %r" % body)channel.basic_consume(callback, queue='hello', #no_ack=True )print(' [*] Waiting for messages. To exit press CTRL+C')channel.start_consuming()
------------------------------------------------------------------------------ 以下是注释:
channel.basic_consume(callback, queue='hello', #no_ack=True 如果取消注释就会不等反应,如果客 #服端开闭,没有外理的消息会丢失 ) 在安装rabbitmq文件夹下有个叫rebbitmqctl的命令可以管里查看消息队列
消息持久化,持久化客户端,就是客户端关机数据没有处理,就会丢失,默认有机制解决这个问题,客户端关机,函数没有处理完会 发给另一个客户端,不想发给另一台,数据不重要直接写上:
channel.basic_consume(callback, queue='hello', no_ack=True, #默认上面红色这句 )
如果是防止服务端死机: 需要在二个地方修改,
channel.queue_declare(queue='hello'#durable = True,客户端和服务端在申明队列时都要写上,表示这个列队持 久 。。但是要消失都持久,还要在服各务端上:
hannel.basic_publish(exchange='', routing_key='hello', body="wwwwwwwwwww", properties=pika.BasicProperties( delivery_mode= 2 ) )
) 如果客户端处理不过来,,就先不要发消息过的解决方法,只需要在客户端的发送消息前加入: channel.basic_qos(prefetch_count=1) 代码片断如下:
收到消息后的回调 def callback(ch, method, properties, body): print("我收到了,正在等。。") time.sleep(10) print(" [x] Received %r" % body) #ch.basic_ack(delivery_tag=method.delivery_tag)#这句在我的版本内没有实际用 channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, queue='hello', no_ack=True, ) 广播效果要用到到exchange,可以设规测,转发器: 有三个参数: fanout 所有 direct 指定 topic规则 发送方,
channel.exchange_declare(exchange='direct_logs', type='direct') #py3中type是关健字,需要加成, exchange_type= 然后发送那儿:
channel.basic_publish(exchange='direct_logs', routing_key="" #为空 body=message) 服务端完成代码:
import pikaimport sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost'))channel = connection.channel() channel.exchange_declare(exchange='direct_logs', type='direct') severity = sys.argv[1] if len(sys.argv) > 1 else 'info'message = ' '.join(sys.argv[2:]) or 'Hello World!'channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message)print(" [x] Sent %r:%r" % (severity, message))connection.close()
消费方要改的多一点:第一个地方
channel.exchange_declare(exchange='direct_logs', type='direct') #py3中type是关健字,需要加成, exchange_type= 第二个地方 在下面加上: result = channel.queue_declare(exclusive = True) queur_name = result.method.queue channel.queue_bind(exchange = 'loags' queue = queue_name ) 完成代码:
import pikaimport sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost'))channel = connection.channel() channel.exchange_declare(exchange='direct_logs', type='direct') result = channel.queue_declare(exclusive=True)queue_name = result.method.queue severities = sys.argv[1:]if not severities: sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) sys.exit(1) for severity in severities: channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
首先我写博客其实就是学习笔记,写到这儿赖了,,多对多,双向通信有空在补上。。。。。。待续