【星海随笔】RabbitMQ开发篇 - 教程

news/2025/9/24 20:15:09/文章来源:https://www.cnblogs.com/lxjshuju/p/19109893

生产消息

import pika
import json
import time
import logging
from typing import Dict, Any
import argparse
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class RobustRabbitMQProducer
:
def __init__(self, host='localhost', port=5672, vhost='vhost_test' ,username='rabbitmq', password='rabbitmq@123',connection_attempts=3,aliyun_object_name="None"):
self.host = host
self.port = port
self.username = username
self.password = password
self.vhost = vhost
self.connection_attempts = connection_attempts
self.connection = None
self.channel = None
self.aliyun_obj = aliyun_object_name
self.connect()
def connect(self):
"""建立连接"""
try:
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(
host=self.host,
port=self.port,
credentials=pika.PlainCredentials(self.username, self.password),
virtual_host=self.vhost,
connection_attempts=self.connection_attempts,
heartbeat=600,
blocked_connection_timeout=300
)
)
self.channel = self.connection.channel()
self.channel.exchange_declare(
exchange='unlock_rclone_topic_exchange', # 交换机名称
exchange_type='topic', # 交换机类型(与路由键匹配)
durable=True, # 持久化(重启后不丢失)
auto_delete=False # 不自动删除
)
logger.info("✅ 成功连接到RabbitMQ")
except Exception as e:
logger.error(f"❌ 连接RabbitMQ失败: {e
}")
msg=f"❌ 连接RabbitMQ失败: {e
}"
self.warning_send_to_wechat(record_msg=msg,obj_name=f"{self.aliyun_obj
}")
raise
def warning_send_to_wechat(self,record_msg=None,obj_name=None):
import requests
import json
url = "《机器人URL》"
# 构建正确的JSON payload
payload = {
"msgtype": "text",
"text": {
"content": f"阿里云OSS解锁记录失败: {record_msg
}\n 对象: {obj_name
}",
"mentioned_list": ["@ user_ID"]
}
}
headers = {
'Content-Type': 'application/json' # 修正Content-Type
}
response = requests.post(url, headers=headers, data=json.dumps(payload))
# 检查响应
if response.status_code == 200:
msg="企业微信通知发送成功"
print(msg)
#return True,msg
else:
msg=f"企业微信通知发送失败: {record_msg,obj_name
} "
print(msg)
def ensure_connection(self):
"""确保连接有效"""
if self.connection is None or self.connection.is_closed:
logger.warning("连接已关闭,尝试重新连接...")
self.connect()
def send_message(self, exchange: str, routing_key: str, message: Dict[str, Any],
persistent: bool = True, retry_count: int = 3):
"""发送消息(带重试机制)"""
for attempt in range(retry_count):
try:
self.ensure_connection()
properties = pika.BasicProperties(
delivery_mode=2 if persistent else 1, # 2=持久化,1=非持久化
content_type='application/json',
timestamp=int(time.time())
)
self.channel.basic_publish(
exchange=exchange,
routing_key=routing_key,
body=json.dumps(message),
properties=properties,
mandatory=True # 确保消息被路由到队列
)
logger.info(f" 消息发送成功: {exchange
} -> 
{routing_key
}")
return True
except pika.exceptions.UnroutableError:
self.warning_send_to_wechat(record_msg=f"⚠️ 消息无法路由: {routing_key
}",obj_name=f"{message
}")
logger.warning(f"⚠️ 消息无法路由: {routing_key
}")
return False
except Exception as e:
self.warning_send_to_wechat(record_msg=f"❌ 发送失败 尝试 {attempt + 1
}/{retry_count
}: {e
}",obj_name=f"{message
}")
logger.error(f"❌ 发送失败 (尝试 {attempt + 1
}/{retry_count
}): {e
}")
if attempt < retry_count - 1:
time.sleep(2) # 等待后重试
else:
raise
def close(self):
"""安全关闭连接"""
if self.connection and not self.connection.is_closed:
self.connection.close()
logger.info(" 连接已关闭")
def parse_args():
"""
解析命令行入口参数,定义用户可传递的参数规则
"""
parser = argparse.ArgumentParser(
description="RabbitMQ消息发送脚本:支持用户传递rclone相关的对象名及消息内容",
formatter_class=argparse.RawTextHelpFormatter # 支持换行显示帮助信息
)
# 1. 必传核心参数:rclone用到的对象名
parser.add_argument(
"--aliyun-obj", # 参数名(长选项)
"-a", # 短选项
required=True, # 必传
type=str,
help="阿里云对象存储的对象名(rclone操作目标,例如:oss://my-bucket/path)"
)
parser.add_argument(
"--minio-obj",
"-m",
required=True,
type=str,
help="MinIO对象存储的对象名(rclone操作目标,例如:minio://my-bucket/path)"
)
# 2. 消息内容参数:支持用户自定义(提供默认值,可选传)
parser.add_argument(
"--message-ali",
"-ma",
type=str,
default='{"service": "aliyun", "action": "copy"}',
help='阿里云相关消息体(JSON格式字符串)\n'
'示例:\'{"service":"aliyun","action":"copy","files":["data.csv"]}\'\n'
'默认值:{"service": "aliyun", "action": "copy"}'
)
parser.add_argument(
"--message-minio",
"-mm",
type=str,
default='{"tool": "rclone", "action": "copy", "source": "/tmp"}',
help='MinIO相关消息体(JSON格式字符串)\n'
'示例:\'{"tool":"rclone","action":"move","source":"/data"}\'\n'
'默认值:{"tool": "rclone", "action": "copy", "source": "/tmp"}'
)
# 3. 可选参数:RabbitMQ交换器和路由键(如需灵活配置可开放,这里用默认值)
parser.add_argument(
"--exchange",
"-e",
type=str,
default="unlock_rclone_topic_exchange",
help="RabbitMQ主题交换器名称(默认:unlock_rclone_topic_exchange)"
)
parser.add_argument(
"--routing-key-ali",
"-rka",
type=str,
default="aliV4.sync",
help="阿里云消息的路由键(默认:aliV4.sync)"
)
parser.add_argument(
"--routing-key-minio",
"-rkm",
type=str,
default="rclone.copy",
help="MinIO消息的路由键(默认:rclone.copy)"
)
return parser.parse_args()
def validate_and_parse_json(json_str: str, param_name: str) ->
dict:
"""
校验并解析JSON格式的字符串为字典
:param json_str: 待解析的JSON字符串
:param param_name: 参数名(用于报错提示)
:return: 解析后的字典
"""
import json # 局部导入(仅用到时加载)
try:
return json.loads(json_str)
except json.JSONDecodeError as e:
raise ValueError(f"参数【{param_name
}】格式错误,必须是合法JSON字符串:{e
}")
def build_messages(args) ->
list:
"""
根据用户传入的参数,构建要发送的RabbitMQ消息列表
:param args: 解析后的命令行参数
:return: 消息列表
"""
# 解析JSON格式的消息体
msg_ali = validate_and_parse_json(args.message_ali, "--message-ali")
msg_minio = validate_and_parse_json(args.message_minio, "--message-minio")
# 构建消息(可根据需求将 aliyun-obj/minio-obj 嵌入消息体)
# 注意:这里默认将对象名加入消息体的"target"字段,如需调整位置可修改
messages = [
{
"exchange": args.exchange,
"routing_key": args.routing_key_ali,
"message": {
**msg_ali, "target": args.aliyun_obj
} # 合并用户消息与对象名
},
{
"exchange": args.exchange,
"routing_key": args.routing_key_minio,
"message": {
**msg_minio, "target": args.minio_obj
} # 合并用户消息与对象名
}
]
return messages
def main():
# 1. 解析命令行参数
args = parse_args()
# 2. 初始化RabbitMQ生产者
producer = RobustRabbitMQProducer(aliyun_object_name=args.aliyun_obj)
try:
# 3. 构建消息列表
messages_to_send = build_messages(args)
print(f" 待发送消息列表:{messages_to_send
}")
# 4. 发送消息
for idx, msg in enumerate(messages_to_send, start=1):
producer.send_message(
exchange=msg["exchange"],
routing_key=msg["routing_key"],
message=msg["message"]
)
print(f"✅ 第{idx
}条消息发送成功")
time.sleep(0.5) # 避免消息发送过快
print("\n 所有消息发送完成!")
except ValueError as ve:
# 捕获参数校验错误(如JSON格式错误)
print(f"❌ 参数错误: {ve
}")
except Exception as e:
# 捕获其他运行时错误(如RabbitMQ连接失败)
print(f" 发送过程中出错: {
str(e)
}")
finally:
# 确保生产者连接关闭
if "producer" in locals(): # 避免未初始化时调用close()
producer.close()
print(" RabbitMQ生产者连接已关闭")
if __name__ == "__main__":
main()

执行

python send_message.py -a <ali-bucket>://<object_all_path>-m <other-App>://<object_all_path>-mm '{"tool": "rclone", "action": "copy", "source": "<object_name_all_path>"}'
customer.py 消费者
#!/usr/bin/env python3
import pika
import json
import argparse
from typing import Optional, Dict, Any
class RabbitMQMessageReader
:
def __init__(self, host: str = 'localhost', port: int = 5672,
username: str = 'Account', password: str = 'Password',
vhost: str = 'vhost_prod'):
self.connection_params = pika.ConnectionParameters(
host=host,
port=port,
virtual_host=vhost,
credentials=pika.PlainCredentials(username, password),
heartbeat=600,
blocked_connection_timeout=300
)
def read_single_message(self, queue_name: str) -> Optional[Dict[str, Any]]:
"""读取单条消息但不删除"""
connection = None
try:
connection = pika.BlockingConnection(self.connection_params)
channel = connection.channel()
# 声明队列(确保队列存在)
channel.queue_declare(queue=queue_name, passive=True)
# 获取消息
method_frame, properties, body = channel.basic_get(
queue=queue_name,
auto_ack=False
)
if method_frame:
message_info = {
'delivery_tag': method_frame.delivery_tag,
'routing_key': method_frame.routing_key,
'exchange': method_frame.exchange,
'redelivered': method_frame.redelivered,
'message_count': method_frame.message_count,
'body': body
}
# 拒绝消息并重新入队
channel.basic_reject(method_frame.delivery_tag, requeue=True)
return message_info
else:
print("队列为空")
return None
except pika.exceptions.ChannelClosedByBroker as e:
print(f"队列不存在或无法访问: {e
}")
except Exception as e:
print(f"错误: {e
}")
finally:
if connection and not connection.is_closed:
connection.close()
return None
def decode_message_body(self, body: bytes) -> Any:
"""解码消息体"""
try:
text = body.decode('utf-8')
# 尝试解析为 JSON
if text.strip().startswith('{') or text.strip().startswith('['):
return json.loads(text)
return text
except UnicodeDecodeError:
return body # 返回原始字节
except json.JSONDecodeError:
return text # 返回文本
def print_message(self, message_info: Dict[str, Any]):
"""美化打印消息"""
print("=" * 60)
print("RabbitMQ 消息详情")
print("=" * 60)
print(f"消息ID: {message_info['delivery_tag']
}")
print(f"路由键: {message_info['routing_key']
}")
print(f"交换器: {message_info['exchange']
}")
print(f"重投递: {message_info['redelivered']
}")
print(f"队列剩余消息: {message_info['message_count']
}")
print("\n消息内容:")
decoded_body = self.decode_message_body(message_info['body'])
if isinstance(decoded_body, dict):
print(json.dumps(decoded_body, ensure_ascii=False, indent=2))
else:
print(decoded_body)
print("\n消息已重新放回队列")
def main():
parser = argparse.ArgumentParser(description='读取 RabbitMQ 消息(不删除)')
parser.add_argument('--queue', required=True, help='队列名称')
parser.add_argument('--host', default='localhost', help='RabbitMQ 主机')
parser.add_argument('--port', type=int, default=5672, help='RabbitMQ 端口')
parser.add_argument('-u', '--username', default='Account', help='用户名')
parser.add_argument('-p', '--password', required=True, help='密码')
parser.add_argument('-V', '--vhost', default='vhost_prod', help='虚拟主机')
args = parser.parse_args()
reader = RabbitMQMessageReader(
host=args.host,
port=args.port,
username=args.username,
password=args.password,
vhost=args.vhost
)
message = reader.read_single_message(args.queue)
if message:
reader.print_message(message)
if __name__ == '__main__':
# 使用示例
# python3 script.py --queue ali_queue -p Password
main()

消费者确认

手动确认的三种方式:
basic_ack(delivery_tag) - 确认并删除消息

basic_reject(delivery_tag, requeue=True) - 拒绝并重新入队

basic_reject(delivery_tag, requeue=False) - 拒绝并丢弃

例如:

def manual_ack_example():
credentials = pika.PlainCredentials('Account', 'Password')
parameters = pika.ConnectionParameters(
host='localhost',
virtual_host='vhost_prod',
credentials=credentials
)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
# 获取消息,不自动确认
method_frame, header_frame, body = channel.basic_get(
queue='ali_queue',
auto_ack=False # 重要:手动确认模式
)
if method_frame:
print(f"收到消息: {body.decode()
}")
# 手动确认消息(从队列中删除)
channel.basic_ack(delivery_tag=method_frame.delivery_tag)
print("消息已确认并删除")
# 或者拒绝消息并重新入队
# channel.basic_reject(delivery_tag=method_frame.delivery_tag, requeue=True)
# print("消息已拒绝并重新入队")
else:
print("没有消息")
connection.close()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/916190.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

手机网站建设要注意哪些问题搜索关键词

在Ionic框架中&#xff0c;头部&#xff08;Header&#xff09;和底部&#xff08;Footer&#xff09;是重要的UI组件&#xff0c;它们分别固定在屏幕的顶部和底部。 头部&#xff08;Header&#xff09; 头部组件通常用于显示应用程序的标题、副标题和导航按钮。它是应用程序…

有域名就可以做网站么动漫设计专业属于什么大类

记得几年前想要在Android手机上截图&#xff0c;得安装类似截图软件与Root 才行&#xff0c;层层的关卡还真不是一般使用者能处理的&#xff0c;如今Android手机大部分都已内置截图功能&#xff0c;对于我撰写App文章来说帮助很大&#xff0c;但有时想要表现哪台手机外框画面时…

买app的网站建设做网站卖产品

当涉及Java编程和执行时&#xff0c;以下术语具有不同的含义&#xff1a; 1.JRE (Java Runtime Environment) JRE是Java运行时环境的缩写。它是一个包含用于在计算机上运行Java应用程序所需的组件集合。JRE包括了以下几个主要部分&#xff1a; Java虚拟机(JVM)&#xff1a;用…

找做金融的网站电商网站设计流程

1. 文件上传到本地 需求分析 在用户更换头像或发布文章时&#xff0c;需要携带一个图片的 url 地址&#xff0c;该 url 地址是当用户访问文件上传接口&#xff0c;将图片上传成功后&#xff0c;服务器返回的地址。所以&#xff0c;后台需要提供一个文件上传接口&#xff0c;用…

侯马建设规划局网站学校网站建设团队

problem statement 这道题给我们一个不超过15位的整数 让我们在其中加等号或者加号 如果加完符号后符合计算结果 那么计数 最后输出所有的情况 problem analysis 这道题其实就是想办法遍历所有情况 把合法的记录下来 最终输出就得到解 如何枚举呢 我们可以枚举等号的位置…

分库分表后如何高效处理分页

pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "Monaco", "Courier New", …

Playwright MCP 服务器对比高层级的 MCP 服务器解决方案 - 详解

Playwright MCP 服务器对比高层级的 MCP 服务器解决方案 - 详解2025-09-24 20:07 tlnshuju 阅读(0) 评论(0) 收藏 举报pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !impo…

详细介绍:【Selenium】UI自动化测试框架设计:从项目结构到Base-Page层的最佳实践

pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "Monaco", "Courier New", …

P13754 【MX-X17-T3】Distraction

原题链接:P13754 【MX-X17-T3】Distraction - 洛谷 非常好的题,非常好的思想。简单思想的结合体就是不易察觉的难题。这题实际上就两个难点:1. 处理每个点的权值 \(v_i\)。2. 推导交换权值并找出最长字段和 首先对于…

台州自助建站系统热门国际新闻

从Manus到OpenManus&#xff1a;AI智能体技术如何重塑未来生活场景&#xff1f; 一、现状&#xff1a;AI智能体技术面临的三大核心矛盾 &#xff08;通过分析用户高频痛点与市场反馈提炼&#xff09; 能力与门槛的失衡 Manus展示的复杂任务处理能力&#xff08;如股票分析、代…

2025.9.24

今天上早八离散数学,认真听课发现真听懂了,然后是马克思主义基本原理,老师话很亲切,中午吃饭没休息,又出发修改表格格式了,弄了3个小时,外卖被偷了,我又点了一份,然后洗澡,没带洗发水用香皂洗头,效果不好,…

初学汇编

寄存器 存储数据速度:cpu > 内存 > 硬盘通用寄存器 寄存器是在cpu中的8位 16位 32位EAX AX ALEBX BX BLECX CX CLEDX DX DLESP SP AHEBP BP CHESI SI DHEDI DI BH内存地址的五种形式 1.立即数:如0x13FFC4 2.[r…

架构图设计还得是华为 - 智慧园区

在数字化时代,架构图就像建筑工程的设计蓝图,是技术系统从抽象想法落地为实际产品的关键桥梁。无论是手机芯片的内部逻辑布局,还是全球通信网络的节点连接,清晰、科学的架构图都能让复杂的技术体系变得“可视化”,…

解决zsh: corrupt history file /home/sgud4h5gh/.zsh_history的办法

问题 在一次重装Ubuntu随后进行了一些修改的情况下,输入命令会出现报错zsh: corrupt history file /home/sgud4h5gh/.zsh_history 并且好像不能执行,实际上是因为文件.zsh_history出现损坏或者乱码的情况。 首先要知…

StarRocks GitHub 工作流程

StarRocks项目遵循GitHub工作流规范,其中包含若干实用建议(例如保持本地环境与上游仓库同步并及时提交)。本文档详细说明在GitHub平台完成StarRocks开发的完整工作流程。 第一步:云端分叉项目访问:https://github…

【Selenium】消除Selenium报错:ChromeDriver与Chrome浏览器版本不匹配

pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "Monaco", "Courier New", …

伍佰亿搜索引擎网站系统企业科技网站建设

如何在apache Arrow定位与解决问题 最近在执行sql时做了一些batch变更&#xff0c;出现了一个 crash问题&#xff0c;底层使用了apache arrow来实现。本节将会从0开始讲解如何调试STL源码crash问题&#xff0c;在这篇文章中以实际工作中resize导致crash为例&#xff0c;引出如何…

2013网站建设方案沃尔玛商城

本人使用谷歌搜索了简中互联网&#xff0c;完全没有找到任何有关 ANAME 的文章……本文该不会是头一份吧 相信大家对于 DNS 的解析方式都不陌生&#xff0c;常见的有 A、CNAME、MX、TXT 记录等等。其中&#xff0c;网站常用的是 A 记录和 CNAME 记录&#xff1a;A 记录用于将域…

对象初始化器的使用方法

1. 什么是对象初始化器? 对象初始化器就是一种在创建对象的同时,直接给属性赋值的写法。不用先 new 一个对象,再一行一行地赋值。 它的作用是用来给对象初始化的 一定存在:必然要执行构造方法 2. 构造方法与对象初…

C++、Java 和 Python 在输入输出差别

C++、Java 和 Python 在输入输出(I/O)格式上有显著差异,主要体现在语法风格、处理方式和灵活性上。以下从标准输入输出、文件操作两个维度对比三者的差异,并结合示例说明核心特点。 一、标准输入输出(控制台 I/O)…