摘要
使用mqtt透传ollama的api端点以应对客户端与服务器网络连接不稳定的情况.
实现
服务端
1. 启动mqtt broker(也可以使用公共broker)
- mqtt://127.0.0.1:8907, 允许匿名登陆
docker pull m.daocloud.io/docker.io/eclipse-mosquitto:latest
sudo mkdir -p /home/server/mosquitto
sudo touch /home/server/mosquitto/mosquitto.conf
sudo chown -R qsbye /home/server/mosquitto
cat > /home/server/mosquitto/mosquitto.conf <<'EOD'
listener 8907 0.0.0.0
protocol mqtt
allow_anonymous true
EOD
docker run --restart=always -p 8907:8907 -v "/home/server/mosquitto/mosquitto.conf:/mosquitto/config/mosquitto.conf" -d m.daocloud.io/docker.io/eclipse-mosquitto:latest
2. 配置mqtt透传转发ollama
命令:
uv init
uv python pin 3.13
uv add paho-mqtt aiohttp
vim mqtt_bridge_ollama.py
代码:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
文件: mqtt_bridge_ollama.py
功能: MQTT 透明代理 127.0.0.1:8907 <-> Ollama 127.0.0.1:11435
用法: python mqtt_bridge_ollama.py
"""
import asyncio
import json
import logging
import signal
import sys
import time
from datetime import datetime
from typing import Any, Dict, Optionalimport aiohttp
import paho.mqtt.client as mqtt# ========== 配置 ==========
OLLAMA_BASE = "http://127.0.0.1:11435"
MQTT_BROKER = "tcp://127.0.0.1:8907"
ROOT_TOPIC = "/api/ollama"
QOS = 1
# ==========================logging.basicConfig(level=logging.INFO,format="[mqtt-ollama] %(asctime)s %(message)s",datefmt="%H:%M:%S",
)
log = logging.getLogger(__name__)# ---------- 工具 ----------
def gen_id() -> str:return str(int(datetime.now().timestamp() * 1_000_000))# ---------- MQTT ----------
class Bridge:def __init__(self) -> None:self._mqtt: Optional[mqtt.Client] = Noneself._session: Optional[aiohttp.ClientSession] = Noneself._loop: Optional[asyncio.AbstractEventLoop] = None# 入口def run(self) -> None:self._loop = asyncio.new_event_loop()asyncio.set_event_loop(self._loop)# 优雅退出for sig in (signal.SIGINT, signal.SIGTERM):signal.signal(sig, lambda *_: self._loop.create_task(self.stop()))try:self._loop.run_until_complete(self._start())finally:self._loop.close()async def stop(self) -> None:log.info("收到退出信号")if self._mqtt:self._mqtt.disconnect()if self._session:await self._session.close()self._loop.stop()# 连接 MQTTasync def _start(self) -> None:# 在异步上下文中创建 aiohttp sessionself._session = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=0))while True:try:self._mqtt = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id="mqtt-ollama-bridge")self._mqtt.on_connect = self._on_connectself._mqtt.connect("127.0.0.1", 8907, 60)self._mqtt.loop_start()log.info("MQTT 连接成功")# 保持运行await asyncio.Event().wait()except Exception as e:log.error("连接失败: %s ,5s 后重试", e)await asyncio.sleep(5)# 订阅主题def _on_connect(self, client: mqtt.Client, *_: Any, properties=None) -> None:topic = f"{ROOT_TOPIC}/#"client.subscribe(topic, qos=QOS)client.message_callback_add(topic, self._on_message)log.info("订阅成功: %s", topic)# 收到 MQTT 消息def _on_message(self, client: mqtt.Client, userdata: Any, msg: mqtt.MQTTMessage) -> None:asyncio.run_coroutine_threadsafe(self._handle(msg), self._loop)# 异步处理async def _handle(self, msg: mqtt.MQTTMessage) -> None:topic = msg.topiclog.info("收到 topic=%s", topic)# 解析 topic: api/ollama/<method>/<path...>suffix = topic[len(ROOT_TOPIC) + 1 :]if "/" not in suffix:log.warning("topic 格式错误")returnmethod, path = suffix.split("/", 1)# 提取 req_idtry:payload: Dict[str, Any] = json.loads(msg.payload) if msg.payload else {}except Exception:payload = {}req_id: str = payload.pop("_req_id", None) or gen_id()# 转发try:await self._forward(method, path, req_id, payload)except Exception as e:log.error("转发错误: %s", e)self._publish(req_id, {"error": str(e)})# 真正发 HTTP 并流式回 MQTTasync def _forward(self, method: str, path: str, req_id: str, body: Dict[str, Any]) -> None:url = f"{OLLAMA_BASE}/{path}"headers = {"Content-Type": "application/json"} if body else {}async with self._session.request(method, url, json=body or None, headers=headers) as resp:# 逐行读取 SSE / 普通 bodyasync for line in resp.content:line = line.rstrip(b"\r\n")if line:self._publish(req_id, line.decode("utf-8"))# 发布单条数据def _publish(self, req_id: str, data: Any) -> None:topic = f"{ROOT_TOPIC}/response/{req_id}"payload = json.dumps(data) if not isinstance(data, str) else dataself._mqtt.publish(topic, payload, qos=QOS)# ---------- main ----------
if __name__ == "__main__":Bridge().run()
运行:
nohup uv run mqtt_bridge_ollama.py &
客户端
1. 测试连接mqtt话题
命令:
uv init
uv python pin 3.13
uv add paho-mqtt aiohttp
vim test_mqtt_ollama.py
代码:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
测试:MQTT 透传 → Ollama 流式生成 Rust 代码
用法: python test_mqtt_ollama.py
"""
import json
import queue
import random
import time
import paho.mqtt.client as mqttMQTT_HOST = "10.8.8.130"
MQTT_PORT = 8907
ROOT_TOPIC = "/api/ollama"req_id = f"rust_demo_{random.randint(1000, 9999)}"
q = queue.Queue()def on_connect(cli, _ud, _flags, rc, _properties=None):if rc == 0:print("✅ MQTT 连接成功")cli.subscribe(f"{ROOT_TOPIC}/response/{req_id}", qos=1)else:print("❌ 连接失败,rc =", rc)def on_message(cli, _ud, msg):q.put(msg.payload)client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
client.on_connect = on_connect
client.on_message = on_message
client.connect(MQTT_HOST, MQTT_PORT, 60)
client.loop_start()# 等连上
while not q.empty():q.get()
time.sleep(0.5)# 发请求
payload = {"_req_id": req_id,"model": "modelscope.cn/Qwen/Qwen3-30B-A3B-GGUF:Qwen3-30B-A3B-Q8_0.gguf","prompt": "请只输出一段最简 Rust 代码,打印 hello ollama,不要任何解释。","stream": True
}
client.publish(f"{ROOT_TOPIC}/post/api/generate",json.dumps(payload), qos=1)# 收流式回答
print("\n--- 流式回答开始 ---")
done = False
while not done:try:pkt = q.get(timeout=10)if not pkt:continue# 只提取并打印 response 字段try:data = json.loads(pkt.decode('utf-8'))print(data.get("response", ""), end="", flush=True)if data.get("done"):done = Trueexcept json.JSONDecodeError:# 非 JSON 包直接丢弃continueexcept queue.Empty:print("\n⚠️ 10s 没收到新包,退出")break
print("\n--- 流式回答结束 ---")client.loop_stop()
client.disconnect()
go版本:
package mainimport ("encoding/json""fmt""log""math/rand""time"mqtt "github.com/eclipse/paho.mqtt.golang"
)const (MQTT_HOST = "10.8.8.130"MQTT_PORT = 8907ROOT_TOPIC = "/api/ollama"
)func main() {// 生成随机请求 IDreqId := fmt.Sprintf("rust_demo_%d", rand.Intn(9000)+1000)// 创建消息队列(用带缓冲的 channel 模拟)msgQueue := make(chan mqtt.Message, 10)// MQTT 连接选项opts := mqtt.NewClientOptions()opts.AddBroker(fmt.Sprintf("tcp://%s:%d", MQTT_HOST, MQTT_PORT))opts.SetClientID(reqId)opts.SetAutoReconnect(true)// 连接成功回调:订阅响应主题opts.SetOnConnectHandler(func(c mqtt.Client) {fmt.Println("✅ MQTT 连接成功")topic := fmt.Sprintf("%s/response/%s", ROOT_TOPIC, reqId)if token := c.Subscribe(topic, 1, nil); token.Wait() && token.Error() != nil {log.Fatal(token.Error())}})// 消息到达回调:写入队列opts.SetDefaultPublishHandler(func(c mqtt.Client, m mqtt.Message) {msgQueue <- m})// 连接并启动网络循环client := mqtt.NewClient(opts)if token := client.Connect(); token.Wait() && token.Error() != nil {log.Fatal("❌ 连接失败:", token.Error())}defer client.Disconnect(250)// 等待连上并清空残留消息time.Sleep(500 * time.Millisecond)for len(msgQueue) > 0 {<-msgQueue}// 构造并发送请求payload := map[string]interface{}{"_req_id": reqId,"model": "modelscope.cn/Qwen/Qwen3-30B-A3B-GGUF:Qwen3-30B-A3B-Q8_0.gguf","prompt": "请只输出一段最简 Rust 代码,打印 hello ollama,不要任何解释。","stream": true,}jsonBytes, _ := json.Marshal(payload)topic := fmt.Sprintf("%s/post/api/generate", ROOT_TOPIC)token := client.Publish(topic, 1, false, jsonBytes)token.Wait()// 收流式回答fmt.Println("\n--- 流式回答开始 ---")done := falsefor !done {select {case msg := <-msgQueue:// 只提取并打印 response 字段var data map[string]interface{}if err := json.Unmarshal(msg.Payload(), &data); err != nil {// 非 JSON 包直接丢弃continue}fmt.Print(data["response"])if v, ok := data["done"].(bool); ok && v {done = true}case <-time.After(10 * time.Second):fmt.Println("\n⚠️ 10s 没收到新包,退出")return}}fmt.Println("\n--- 流式回答结束 ---")
}
输出:
MQTT 连接成功--- 流式回答开始 ---
<think>
好的,用户让我提供一个最简的Rust代码,打印“hello ollama”,而且不要任何解释。首先,我需要确认用户的需求是什么。他们可能想要一个非常基础的示例,可能用于测试环境或者快速演示。Rust的Hello World通常使用println!宏。所以最简单的代码应该是fn main() { println!("hello ollama"); }。不过用户要求最简,可能需要更简短的写法。但Rust的语法必须有函数定义,所以fn main()是必须的。有没有可能更简?比如使用main函数的另一种形式?或者有没有其他方式?比如使用宏展开?不过可能不会更简。比如,直接写println!会报错,因为需要在函数体内。所以必须有main函数。那正确的最简代码应该是fn main(){println!("hello ollama");}。去掉空格的话,可能更简,但用户可能希望有适当的格式。不过用户要求最简,所以可能不需要空格。比如fn main(){println!("hello ollama");}这样。检查是否有其他可能的错误。比如,是否需要use std::println;?不过println!是宏,通常自动引入。所以不需要额外的use语句。因此,这段代码应该可以直接运行。确认用户不要任何解释,所以只需要输出代码。所以最终答案应该是这个代码段。
</think>fn main(){println!("hello ollama");}
--- 流式回答结束 ---