Python如何操作RabbitMQ实现direct关键字发布订阅模式?有录播直播私教课视频教程

direct关键字发布订阅模式

基本用法

发布者

import json
from rabbitmq import pika
import rabbitmq# 建立连接
credentials = rabbitmq.PlainCredentials('zhangdapeng','zhangdapeng520',
)  # mq用户名和密码
connection_target = rabbitmq.ConnectionParameters(host='127.0.0.1',port=5672,virtual_host='/',credentials=credentials,
)
connection = rabbitmq.BlockingConnection(connection_target)# 队列信息
exchange_name = "user_manager_direct"# 创建管道
channel = connection.channel()# 声明一个交换机
channel.exchange_declare(exchange=exchange_name, exchange_type=rabbitmq.ExchangeType.direct)# 向队列中写入数据
user = {"id": 1, "name": "张三", "age": 23}
message = json.dumps(user, ensure_ascii=True)
channel.basic_publish(exchange=exchange_name,routing_key="error",  # 这里不再是队列名了,而是关键字body=message.encode('utf8'),properties=pika.BasicProperties(delivery_mode=2),  # 声明消息在队列中持久化
)
channel.basic_publish(exchange=exchange_name,routing_key="info",  # 这里不再是队列名了,而是关键字body=message.encode('utf8'),properties=pika.BasicProperties(delivery_mode=2),  # 声明消息在队列中持久化
)
print(message)# 关闭连接
connection.close()

消费者

import rabbitmq
import json# 创建连接
credentials = rabbitmq.PlainCredentials('zhangdapeng','zhangdapeng520',
)
target = rabbitmq.ConnectionParameters(host='127.0.0.1',port=5672,virtual_host='/',credentials=credentials,
)
connection = rabbitmq.BlockingConnection(target)# 创建管道
channel = connection.channel()# 队列信息
exchange_name = "user_manager_direct"# 绑定交换机
channel.exchange_declare(exchange=exchange_name,exchange_type=rabbitmq.ExchangeType.direct,
)# 绑定队列
result = channel.queue_declare(queue="",  # 这里不要指定队列名exclusive=True,
)
queue_name = result.method.queue  # 通过result获取队列名
channel.queue_bind(exchange=exchange_name,queue=queue_name,routing_key="error",  # 可以通过routing_key绑定多个关键字
)
channel.queue_bind(exchange=exchange_name,queue=queue_name,routing_key="info",  # 可以通过routing_key绑定多个关键字
)def callback(ch, method, properties, body):"""每次接收到消息的消费回调方法"""ch.basic_ack(delivery_tag=method.delivery_tag)data = body.decode("utf8")print(json.loads(data))# 开始消费
channel.basic_consume(queue=queue_name,on_message_callback=callback,auto_ack=False,
)
try:channel.start_consuming()
finally:connection.close()

简化代码

生产者

import json
from rabbitmq import pika
import rabbitmq# 建立连接
connection = rabbitmq.get_connection()# 队列信息
exchange_name = "user_manager_direct"# 创建管道
channel = connection.channel()# 声明一个交换机
channel.exchange_declare(exchange=exchange_name, exchange_type=rabbitmq.ExchangeType.direct)# 向队列中写入数据
user = {"id": 1, "name": "张三", "age": 23}
rabbitmq.send_json(channel, user, exchange_name, "error")
rabbitmq.send_json(channel, user, exchange_name, "info")# 关闭连接
connection.close()

消费者

import rabbitmq
import json# 创建连接
connection = rabbitmq.get_connection()# 创建管道
channel = connection.channel()# 队列信息
exchange_name = "user_manager_direct"# 绑定交换机
channel.exchange_declare(exchange=exchange_name,exchange_type=rabbitmq.ExchangeType.direct,
)# 绑定队列
result = channel.queue_declare(queue="",  # 这里不要指定队列名exclusive=True,
)
queue_name = result.method.queue  # 通过result获取队列名
channel.queue_bind(exchange=exchange_name,queue=queue_name,routing_key="error",  # 可以通过routing_key绑定多个关键字
)
channel.queue_bind(exchange=exchange_name,queue=queue_name,routing_key="info",  # 可以通过routing_key绑定多个关键字
)def callback(ch, method, properties, body):"""每次接收到消息的消费回调方法"""print(rabbitmq.receive_json(ch, method, body))# 开始消费
rabbitmq.consume(connection, queue_name, callback)

进一步简化代码

生产者

import rabbitmq# 建立连接
connection = rabbitmq.get_connection()# 队列信息
exchange_name = "user_manager_direct"# 创建管道
channel, _ = rabbitmq.get_direct_channel(connection, exchange_name)# 向队列中写入数据
user = {"id": 1, "name": "张三", "age": 23}
rabbitmq.send_json(channel, user, exchange_name, "error")
rabbitmq.send_json(channel, user, exchange_name, "info")# 关闭连接
connection.close()

消费者

import rabbitmq
import json# 创建连接
connection = rabbitmq.get_connection()# 队列信息
exchange_name = "user_manager_direct"# 创建管道
channel, queue_name = rabbitmq.get_direct_channel(connection, exchange_name, ["error", "info"])def callback(ch, method, properties, body):"""每次接收到消息的消费回调方法"""print(rabbitmq.receive_json(ch, method, body))# 开始消费
rabbitmq.consume(connection, queue_name, callback)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/622320.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【金猿CIO展】步长制药信息化管理与建设中心总经理束炼:IT部门既要懂技术,也要懂业务...

‍ 束炼 本文由步长制药信息化管理与建设中心总经理束炼撰写并投递参与“数据猿年度金猿策划活动——2023大数据产业年度优秀CIO榜单及奖项”评选。 大数据产业创新服务媒体 ——聚焦数据 改变商业 随着数字化转型的浪潮席卷各行各业,中国数字经济已进入快速发展阶…

水浒传之Screen的传奇-Linux/FreeBSD终端下的多窗口复用

缘起 梁山泊大厅 宋江(梁山泊头领):各位兄弟,今日我们梁山好汉齐聚一堂,可是有大事要商量。 李逵(黑旋风):哥哥,是不是又要有新的任务了?这次我们打谁&am…

架设一台NFS服务器

1、开放/nfs/shared目录,供所有用户查询资料 2、开放/nfs/upload目录,为192.168.xxx.0/24网段主机可以上传目录, 并将所有用户及所属的组映射为nfs-upload,其UID和GID均为210 3、将/home/tom目录仅共享给192.168.xxx.xxx这台主机&#xf…

【翻译转载】What is a Self-Driving Database Management System

What is a Self-Driving Database Management System? cs.cmu.edu/~pavlo/blog/2018/04/what-is-a-self-driving-database-management-system.html#footnote-cidr 概述 一些组织和个人错误地将他们的系统标记为“自动驾驶”。真正的自动驾驶数据库管理系统(DBMS…

Mybatis 常用条件语句,大于小于、if、for、模糊搜索、case when、choose

大于小于 方法1&#xff1a; > 大于 &#xff0c; < 小于 <if test"startTime ! null ">and a.create_time > #{startTime} </if> <if test"endTime ! null ">and a.create_time < #{endTime} </if> 方法2(建议写这…

爬虫的基本原理

基本原理 可以把网页与网页之间的链接关系比作节点中的连线&#xff0c;爬虫可以根据网页中的关系获取后续的网页&#xff0c;当整个网站涉及的页面全部被爬虫访问到后&#xff0c;网站的数据就被访问下来了。 1.爬虫概述 简单点讲&#xff0c;爬虫就是获取网页并提取和保存信…

PLECS如何下载第三方库并导入MOSFET 的xml文件,xml库路径添加方法及相关问题

1. 首先xml库的下载&#xff0c;PLECS提供了一个跳转的链接。 https://www.plexim.com/download/thermal_models 2. 下载一个库&#xff08;以最后一个Wolfspeed为例&#xff0c;属于CREE的SiC MOSFET&#xff09; 下载这个就行&#xff0c;都包含了。不信自己可以试试再下载…

响应式Web开发项目教程(HTML5+CSS3+Bootstrap)第2版 例4-1 表单

代码 <!doctype html> <html> <head> <meta charset"utf-8"> <title>表单</title> </head><body> <!--<form action"URL地址" method"提交方式" name"表单名称" /*编码“多部…

升级cmake 版本方法

ubuntu18.04更新cmake版本_cmake 已经是最新版 (3.10.2-1ubuntu2.18.04.2)-CSDN博客

【QML COOK】- 008-自定义属性

前面介绍了用C定义QML类型&#xff0c;通常在使用Qt Quick开发项目时&#xff0c;C定义后端数据类型&#xff0c;前端则完全使用QML实现。而QML类型或Qt Quick中的类型时不免需要为对象增加一些属性&#xff0c;本篇就来介绍如何自定义属性。 1. 创建项目&#xff0c;并编辑Ma…

NVIDIA-SMI has failed because it couldn‘t communicate with the NVIDIA driver

安装显卡驱动 https://fakerth.blog.csdn.net/article/details/134659236 NVIDIA-SMI has failed because it couldn‘t communicate with the NVIDIA driver 电脑打开分辨率直接变了&#xff0c;运行nvidia-smi报错&#xff0c;当时我就在爆发的边缘了&#xff0c;想着大好…

C语言数据结构(1)复杂度(大o阶)

欢迎来到博主的专栏——C语言与数据结构 博主ID——代码小豪 文章目录 如何判断代码的好坏时间复杂度什么是时间复杂度如何计算时间复杂度 空间复杂度 如何判断代码的好坏 实现相同作用的不同代码&#xff0c;如何分辨这些代码的优劣之处呢&#xff1f; 有人说了&#xff0c…

算法:A*算法最小实例

A*算法主要作用是寻找两个节点之间的最短路径 首先&#xff0c;需要定义一个表示地图的二维数组&#xff0c;其中0表示可通过的空地&#xff0c;1表示障碍物&#xff1a; const map [[0, 0, 0, 0, 0],[0, 1, 1, 0, 0],[0, 0, 0, 0, 0],[0, 1, 0, 1, 0],[0, 0, 0, 0, 0]];接着…

原子类-入门介绍和分类说明、基本类型原子类

Atomic翻译成中文是原子的意思。在化学上,我们知道原子是构成一般物质的最小单位,在化学 反应中是不可分割的。在我们这里Atomic是指一个操作是不可中断的。即使是在多个线程一起执 行的时候,一个操作一旦开始,就不会被其他线程干扰。 基本类型原子类 AtomicInteger:整…

在illustrator中按大小尺寸选择物体 <脚本 018>

在Illustrator中我们可以依据对象的属性 如:填充颜色、描边颜色或描边宽度来选择相同属性的对象,但是Illustrator中没有根据不同大小尺寸来选择对象的功能,下面介绍的就是根据大小尺寸选择对象的脚本。 1、下面是当前画板中的所有对象,我们想把一些在尺寸小一些的方形物体…

数据治理工程师 CDGA数据建模和设计

1. 以下选项不属于数据建模和设计治理中质量管理内容的是&#xff08;D &#xff09; A 开发数据建模和设计标准 B 评审数据模型以及数据库设计质量 C 管理数据模型版本与集成 D 评估数据模型运行效率 2. 关于实体的别名&#xff0c;以下对应关系不正确的是&#xff08;B &…

金融帝国实验室(Capitalism Lab)V10版本游戏平衡性优化与改进

即将推出的V10版本中的各种游戏平衡性优化与改进&#xff1a; ————————————— 一、当玩家被提议收购一家即将破产的公司时&#xff0c;显示商业秘密。 当一家公司濒临破产&#xff0c;玩家被提议收购该公司时&#xff0c;如果玩家有兴趣评估该公司&#xff0c;则无…

datavrap-各种各样的条形图(含详细操作步骤)

静态条形图&#xff1a;正确设置数据即可&#xff0c;导出的图形不会随着时间变化 最普通的静态条形图 黑色系风格的静态条形图 动态条形图&#xff1a;导出的图形会随着时间变化 普通的动态条形图 带数字滚动效果的动态条形图 简单的Top排行榜动态条形图 格式更丰富的Top排行榜…

联盛德-安全物联网芯片w800

联盛德-安全物联网芯片w800 前言一、w800简介&#xff1f;芯片外观 总结 前言 本文介绍w800的基本信息&#xff0c;详细规格&#xff0c;后续有时间会介绍如何适配openharmony的过程。 一、w800简介&#xff1f; W800 芯片是一款安全 IoT Wi-Fi/蓝牙 双模 SoC 芯片。支持 2.…

系统设计面试

如何处理一个系统设计的面试题 系统设计面试是一个开放式的对话。他们希望你去主导这个对话 第一步&#xff1a;描述使用场景&#xff0c;约束和假设 把所有需要的东西聚集在一起&#xff0c;审视问题。不停的提问&#xff0c;以至于我们可以明确使用场景和约束。讨论假设。 …