主要用来做多进程、分布式。

使用一对多,一生产者,多消费者。

Intall

  • 先安装erlang环境,主要erlang需要和Rabbitmq版本进行对应。
  • pip install pika
  • 需要新建账户,guest账户是不难用来连接的。

Send

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import pika,json
import pandas as pd
# 建立socket连接
credentials = pika.PlainCredentials("root", "root")
#connection = pika.BlockingConnection(pika.ConnectionParameters("127.0.0.1", credentials=credentials))

connection = pika.BlockingConnection(pika.ConnectionParameters(
host='10.147.17.42',port=5672,virtual_host='/',credentials=credentials,heartbeat=0))

# 建立rabbitMQ协议的通道
channel = connection.channel()


messages=....


for message in messages:

channel.basic_publish(
exchange='',
routing_key='task_queue',
body=message,
properties=pika.BasicProperties(delivery_mode=2)
)
print(names[i])
connection.close()

Receive

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import pika

user_info = pika.PlainCredentials('root', 'root')
connection = pika.BlockingConnection(pika.ConnectionParameters('10.147.17.42', 5672, '/', user_info,heartbeat=0))
channel = connection.channel()
import json,time
channel.queue_declare(queue='task_queue')
print(' [*] Waiting for messages.')


def callback(ch, method, properties, body):
message=body.decode()
print(" [x] Received %r" % message)
ch.basic_ack(delivery_tag=method.delivery_tag) # 手动标记消息已接收并处理完毕,RabbitMQ可以从queue中移除该条消息





channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue', on_message_callback=callback)
channel.start_consuming()

Notice

  • 确认消息接收才会去接受下一个消息
    • ch.basic_ack(delivery_tag=method.delivery_tag)