direct关键字发布订阅模式
基本用法
发布者
import json
from rabbitmq import pika
import rabbitmq
credentials = rabbitmq. PlainCredentials( 'zhangdapeng' , 'zhangdapeng520' ,
)
connection_target = rabbitmq. ConnectionParameters( host= '127.0.0.1' , port= 5672 , virtual_host= '/' , credentials= credentials,
)
connection = rabbitmq. BlockingConnection( connection_target)
exchange_name = "user_manager_direct"
channel = connection. channel( )
channel. exchange_declare( exchange= exchange_name, exchange_type= rabbitmq. ExchangeType. direct)
user = { "id" : 1 , "name" : "张三" , "age" : 23 }
message = json. dumps( user, ensure_ascii= True )
channel. basic_publish( exchange= exchange_name, routing_key= "error" , body= message. encode( 'utf8' ) , properties= pika. BasicProperties( delivery_mode= 2 ) ,
)
channel. basic_publish( exchange= exchange_name, routing_key= "info" , body= message. encode( 'utf8' ) , properties= pika. BasicProperties( delivery_mode= 2 ) ,
)
print ( message)
connection. close( )
消费者
import rabbitmq
import json
credentials = rabbitmq. PlainCredentials( 'zhangdapeng' , 'zhangdapeng520' ,
)
target = rabbitmq. ConnectionParameters( host= '127.0.0.1' , port= 5672 , virtual_host= '/' , credentials= credentials,
)
connection = rabbitmq. BlockingConnection( target)
channel = connection. channel( )
exchange_name = "user_manager_direct"
channel. exchange_declare( exchange= exchange_name, exchange_type= rabbitmq. ExchangeType. direct,
)
result = channel. queue_declare( queue= "" , exclusive= True ,
)
queue_name = result. method. queue
channel. queue_bind( exchange= exchange_name, queue= queue_name, routing_key= "error" ,
)
channel. queue_bind( exchange= exchange_name, queue= queue_name, routing_key= "info" ,
) def callback ( ch, method, properties, body) : """每次接收到消息的消费回调方法""" ch. basic_ack( delivery_tag= method. delivery_tag) data = body. decode( "utf8" ) print ( json. loads( data) )
channel. basic_consume( queue= queue_name, on_message_callback= callback, auto_ack= False ,
)
try : channel. start_consuming( )
finally : connection. close( )
简化代码
生产者
import json
from rabbitmq import pika
import rabbitmq
connection = rabbitmq. get_connection( )
exchange_name = "user_manager_direct"
channel = connection. channel( )
channel. exchange_declare( exchange= exchange_name, exchange_type= rabbitmq. ExchangeType. direct)
user = { "id" : 1 , "name" : "张三" , "age" : 23 }
rabbitmq. send_json( channel, user, exchange_name, "error" )
rabbitmq. send_json( channel, user, exchange_name, "info" )
connection. close( )
消费者
import rabbitmq
import json
connection = rabbitmq. get_connection( )
channel = connection. channel( )
exchange_name = "user_manager_direct"
channel. exchange_declare( exchange= exchange_name, exchange_type= rabbitmq. ExchangeType. direct,
)
result = channel. queue_declare( queue= "" , exclusive= True ,
)
queue_name = result. method. queue
channel. queue_bind( exchange= exchange_name, queue= queue_name, routing_key= "error" ,
)
channel. queue_bind( exchange= exchange_name, queue= queue_name, routing_key= "info" ,
) def callback ( ch, method, properties, body) : """每次接收到消息的消费回调方法""" print ( rabbitmq. receive_json( ch, method, body) )
rabbitmq. consume( connection, queue_name, callback)
进一步简化代码
生产者
import rabbitmq
connection = rabbitmq. get_connection( )
exchange_name = "user_manager_direct"
channel, _ = rabbitmq. get_direct_channel( connection, exchange_name)
user = { "id" : 1 , "name" : "张三" , "age" : 23 }
rabbitmq. send_json( channel, user, exchange_name, "error" )
rabbitmq. send_json( channel, user, exchange_name, "info" )
connection. close( )
消费者
import rabbitmq
import json
connection = rabbitmq. get_connection( )
exchange_name = "user_manager_direct"
channel, queue_name = rabbitmq. get_direct_channel( connection, exchange_name, [ "error" , "info" ] ) def callback ( ch, method, properties, body) : """每次接收到消息的消费回调方法""" print ( rabbitmq. receive_json( ch, method, body) )
rabbitmq. consume( connection, queue_name, callback)