如何使用 Solana Yellowstone gRPC 重新连接和重放插槽

Yellowstone gRPC 是一个功能强大、可用于生产环境且经过实战检验的工具,用于流式传输实时的 Solana 数据。但在实际条件下,网络中断或服务器重启可能导致连接中断。如果没有适当的重连策略,你的应用程序可能会错过区块链的关键更新。 为了防止这种情况,重要的是构建一个系统,该系统不仅自动重连,而且还从特定的 slot 恢复数据流,从而确保一致性、可靠性和零遗漏事件。

开始之前

要开始,我们需要准备一些东西。

认证:gRPC 端点和 gRPC Token Shyft 的 gRPC 节点遍布欧盟和美国地区的各个位置。要访问,我们需要一个特定于区域的 gRPC 端点和一个访问Token,你可以在 Shyft 仪表板 上购买。

服务器端后端(如 NodeJS)用于接收 gRPC 数据 由于 Web 浏览器不支持 gRPC 服务,因此你需要一个后端应用程序,例如 C#、Go、Java、Python 等,来接收 gRPC 数据。

代码示例:实现重连机制

为了确保流从临时断开连接中自动恢复,我们实现了一个简单的重连循环。如果连接因错误而断开,应用程序会等待一段短暂的延迟,然后使用相同的订阅请求重新启动流。这确保了连续的数据流,无需手动干预,即使在不稳定的网络条件下也是如此。

/*** 重连机制在 handle stream 函数中实现* 如果发生任何错误,流将等待 1000 毫秒,然后调用* handleStream 函数,该函数反过来将重新启动流*/
async function subscribeCommand(client: Client, args: SubscribeRequest) {while (true) {try {await handleStream(client, args); // 订阅并处理流} catch (error) {console.error("Stream error, retrying in 1 second...", error);await new Promise((resolve) => setTimeout(resolve, 1000));// 可以在这里更改超时时间}}
}

该代码演示了一个 while 循环,其中调用了 handleStream() 函数。handleStream() 函数负责订阅和接收流。一旦流中断,将从 handle stream 函数抛出一个错误,该错误在循环内处理。然后,循环等待给定的超时时间并迭代,重新发送订阅请求。

你可以查看 我们的文档 ,或者直接运行 Repl 此处的代码 以获取上面示例的完整代码。

代码示例:从特定 Slot 重放更新

为了避免在断开连接期间丢失任何数据,流会跟踪从每个交易更新收到的最后一个 slot。当流遇到错误时,它会尝试重新连接,并使用 SubscribeRequest 中的 fromSlot 字段从该确切 slot 恢复。此逻辑确保在重新连接时不会跳过任何交易更新。使用重试计数器来防止无限次尝试 — 达到限制后,系统会回退到从最新的可用 slot 进行流式传输。

require("dotenv").config();
import Client, { CommitmentLevel } from "@triton-one/yellowstone-grpc";
import { SubscribeRequest } from "@triton-one/yellowstone-grpc/dist/types/grpc/geyser";
import * as bs58 from "bs58";const MAX_RETRY_WITH_LAST_SLOT = 30;
const RETRY_DELAY_MS = 1000;
const ADDRESS_TO_STREAM_FROM = "6EF8rrecthR5Dkzon8Nwu78hRvfCKubJ14M5uBEwF6P";type StreamResult = {lastSlot?: string;hasRcvdMSg: boolean;
};async function handleStream(client: Client,args: SubscribeRequest,lastSlot?: string
): Promise<StreamResult> {const stream = await client.subscribe();let hasRcvdMSg = false;return new Promise((resolve, reject) => {stream.on("data", (data) => {const tx = data.transaction?.transaction?.transaction;if (tx?.signatures?.[0]) {const sig = bs58.encode(tx.signatures[0]);console.log("Got tx:", sig);lastSlot = data.transaction.slot;hasRcvdMSg = true;}});stream.on("error", (err) => {stream.end();reject({ error: err, lastSlot, hasRcvdMSg });});const finalize = () => resolve({ lastSlot, hasRcvdMSg });stream.on("end", finalize);stream.on("close", finalize);stream.write(args, (err: any) => {if (err) reject({ error: err, lastSlot, hasRcvdMSg });});});
}async function subscribeCommand(client: Client, args: SubscribeRequest) {let lastSlot: string | undefined;let retryCount = 0;while (true) {try {if (args.fromSlot) {console.log("Starting stream from slot", args.fromSlot);}const result = await handleStream(client, args, lastSlot);lastSlot = result.lastSlot;if (result.hasRcvdMSg) retryCount = 0;} catch (err: any) {console.error(`Stream error, retrying in ${RETRY_DELAY_MS / 1000} second...`);await new Promise((resolve) => setTimeout(resolve, RETRY_DELAY_MS));lastSlot = err.lastSlot;if (err.hasRcvdMSg) retryCount = 0;if (lastSlot && retryCount < MAX_RETRY_WITH_LAST_SLOT) {console.log(`#${retryCount} retrying with last slot ${lastSlot}, remaining retries ${MAX_RETRY_WITH_LAST_SLOT - retryCount}`);args.fromSlot = lastSlot;retryCount++;} else {console.log("Retrying from latest slot (no last slot available)");delete args.fromSlot;retryCount = 0;lastSlot = undefined;}}}
}const client = new Client(process.env.GRPC_URL!, process.env.X_TOKEN!, {"grpc.keepalive_permit_without_calls": 1,"grpc.keepalive_time_ms": 10000,"grpc.keepalive_timeout_ms": 1000,"grpc.default_compression_algorithm": 2,
});const req: SubscribeRequest = {accounts: {},slots: {},transactions: {pumpFun: {vote: false,failed: false,accountInclude: [ADDRESS_TO_STREAM_FROM],accountExclude: [],accountRequired: [],},},transactionsStatus: {},blocks: {},blocksMeta: {},entry: {},accountsDataSlice: [],commitment: CommitmentLevel.CONFIRMED,
};subscribeCommand(client, req);

与上一种方法类似,无限循环确保流在每次断开连接时都保持重新连接。我们初始化两个变量,一个用于存储 lastSlot,即从流收到的最新 slot,另一个用于 retryCount,它限制了从先前 slot 重试的次数,以避免卡在错误数据或间隙上。

if (args.fromSlot) {console.log("Starting stream from slot", args.fromSlot);
}

在启动流之前,代码检查是否设置了 fromSlot。如果是,流将从该特定 slot 恢复,而不是从最新的区块开始。handleStream 函数打开流,侦听传入的交易数据,并跟踪收到的最新 slot。如果收到任何数据,它会将流标记为成功(hasRcvdMSg = true)并重置重试计数器,以便系统可以在需要时继续从上次已知的 slot 重试。

const result = await handleStream(client, args, lastSlot);
lastSlot = result.lastSlot;
if (result.hasRcvdMSg) retryCount = 0;
  • 如果在发生错误之前成功记录了 lastSlot,则将在下一次尝试中重复使用它。
  • 如果之前的流确实传递了数据,我们将重置 retryCount

智能回退

if (lastSlot && retryCount < MAX_RETRY_WITH_LAST_SLOT) {args.fromSlot = lastSlot;retryCount++;
} else {delete args.fromSlot;retryCount = 0;lastSlot = undefined;
}

这是核心的弹性逻辑

  • 如果我们仍然有有效的 lastSlot 并且没有超过重试限制,我们将尝试从它恢复。
  • 如果我们重试的次数过多或者没有有效的 slot,我们将清除 fromSlot 并让流从区块链的顶端开始。

本文的完整代码可在 GitHub 上获取 — 随意克隆并进行测试。我们还在 GitHub 上分享了一系列涵盖 gRPC 和 DeFi 的示例用例,你可以克隆并进行实验。

结论

构建具有 基于 slot 的重放 的重连策略,可确保你的 Solana 应用程序 保持可靠和实时 — 即使在网络中断的情况下也是如此。通过跟踪上次收到的 slot 并智能地重试,你可以从中断的地方恢复流式传输,避免错过更新或重复数据。这种方法 增加了弹性,并保证了 任何生产级区块链应用程序的 更顺畅的 用户体验,更多相关文章,请,https://t.me/gtokentool。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/83344.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

foxmail - foxmail 启用超大附件提示密码与帐号不匹配

foxmail 启用超大附件提示密码与帐号不匹配 问题描述 在 foxmail 客户端中&#xff0c;启用超大附件功能&#xff0c;输入了正确的账号&#xff08;邮箱&#xff09;与密码&#xff0c;但是提示密码与帐号不匹配 处理策略 找到 foxmail 客户端目录/Global 目录下的 domain.i…

MySQL 事务(一)

文章目录 CURD不加控制&#xff0c;会有什么问题CURD满足什么属性&#xff0c;能解决上述问题&#xff1f;什么是事务为什么要有事务事务的版本支持了解事务的提交方式 事务常见操作方式研究并发场景事务的正常操作事务的非正常情况的案例结论事务操作的注意事项 CURD不加控制&…

CSS面试题汇总

在前端开发领域&#xff0c;CSS 是一项不可或缺的技术。无论是页面布局、样式设计还是动画效果&#xff0c;CSS 都扮演着重要的角色。因此&#xff0c;在前端面试中&#xff0c;CSS 相关的知识点往往是面试官重点考察的内容。为了帮助大家更好地准备面试&#xff0c;本文汇总了…

Java 后端给前端传Long值,精度丢失的问题与解决

为什么后端 Long 类型 ID 要转为 String&#xff1f; 在前后端分离的开发中&#xff0c;Java 后端通常使用 Long 类型作为主键 ID&#xff08;如雪花算法生成的 ID&#xff09;。但如果直接将 Long 返回给前端&#xff0c;可能会导致前端精度丢失的问题&#xff0c;特别是在 J…

对称二叉树的判定:双端队列的精妙应用

一、题目解析 题目描述 给定一个二叉树&#xff0c;检查它是否是镜像对称的。例如&#xff0c;二叉树 [1,2,2,3,4,4,3] 是对称的&#xff1a; 1/ \2 2/ \ / \ 3 4 4 3而 [1,2,2,null,3,null,3] 则不是镜像对称的&#xff1a; 1/ \2 2\ \3 3问题本质 判断一棵二叉…

C#数组与集合

&#x1f9e0; 一、数组&#xff08;Array&#xff09; 1. 定义和初始化数组 // 定义并初始化数组 int[] numbers new int[5]; // 默认值为 0// 声明并赋值 string[] names { "Tom", "Jerry", "Bob" };// 使用 new 初始化 double[] scores …

本地部署Scratch在线编辑器

1、说明 由于在GitHub上没有找到Scratch源码&#xff0c;所以只能编写脚本下载官网相关资源&#xff0c;然后在本地部署。 如果你找到了Scratch源码&#xff0c;请自行编译部署&#xff0c;可忽略以下操作。 项目结构&#xff1a;scratch.mit.edu |-- chunks | |-- fetch-w…

Gmsh 读取自定义轮廓并划分网格:深入解析与实践指南

一、Gmsh 简介 (一)Gmsh 是什么 Gmsh 是一款功能强大的开源有限元网格生成器,广泛应用于工程仿真、数值模拟以及计算机图形学等领域。它为用户提供了从几何建模到网格划分的一整套解决方案,能够有效处理复杂几何形状,生成高质量的二维和三维网格,满足多种数值方法的需求…

Elabscience 精准识别 CD4+ T 细胞|大鼠源单克隆抗体 GK1.5,适配小鼠样本的流式优选方案

内容概要 CD4 T细胞在免疫调节、自身免疫疾病及肿瘤免疫治疗中发挥关键作用。Elabscience推出的APC Anti-Mouse CD4 Antibody (GK1.5)&#xff08;货号&#xff1a;E-AB-F1097E&#xff09;是一款高特异性、低背景的流式抗体&#xff0c;专为小鼠CD4 T细胞亚群检测优化设计。该…

【RabbitMQ】应用问题、仲裁队列(Raft算法)和HAProxy负载均衡

&#x1f525;个人主页&#xff1a; 中草药 &#x1f525;专栏&#xff1a;【中间件】企业级中间件剖析 一、幂等性保障 什么是幂等性&#xff1f; 幂等性是指对一个系统进行重复调用&#xff08;相同参数&#xff09;&#xff0c;无论同一操作执行多少次&#xff0c;这些请求…

51 单片机头文件 reg51.h 和 reg52.h 详解

51 单片机头文件详解 51 单片机的头文件reg51.h和reg52.h是开发中非常重要的文件,它们定义了单片机的特殊功能寄存器 (SFR) 和位地址。以下是对这两个头文件的详细解析: 1. 头文件概述 reg51.h:针对标准 8051 单片机(4KB ROM, 128B RAM) reg52.h:针对增强型 8052 单片…

前端的面试笔记——JavaScript篇(二)

一、instanceof 在 JavaScript 里&#xff0c;instanceof 是一个相当实用的运算符&#xff0c;它的主要功能是检查某个对象是否属于特定构造函数的实例。这里需要明确的是&#xff0c;判断的依据并非对象的类型&#xff0c;而是其原型链。下面为你详细介绍它的用法和特点&…

”一维前缀和“算法原理及模板

前缀和&#xff0c;就是通过一种方法来求出数组中某个连续区间的元素的和的办法。我们通常先预处理出来一个前缀和数组&#xff0c;然后把数组中进行元素填充后再进行后续使用。 我们通过一道模板题或许能更加理解其意思。 现在的问题就是&#xff1a;如果我们用暴力枚举来记录…

5.13/14 linux安装centos及一些操作命令随记

一、环境准备 VMware Workstation版本选择建议 CentOS 7 ISO镜像下载指引 虚拟机硬件配置建议&#xff08;内存/处理器/磁盘空间&#xff09; 二、系统基础命令 一、环境准备 1.VMware Workstation版本选择建议 版本选择依据 选择VMware Workstation的版本时&#xff0c…

spring学习->sprintboot

spring IoC(控制翻转): 控制:资源的控制权(资源的创建&#xff0c;获取&#xff0c;销毁等) 反转:和传统方式不一样(用上面new什么)&#xff0c;不用new让ioc来发现你用什么&#xff0c;然后我来给什么 DI:(依赖注入) 依赖:组件的依赖关系。如newsController依赖NewsServi…

iOS 阅后即焚功能的实现

iOS阅后即焚功能实现步骤 一、功能设计要点 消息类型支持&#xff1a;文本、图片、视频、音频等。销毁触发条件&#xff1a; 接收方首次打开消息后启动倒计时。消息存活时间可配置&#xff08;如5秒、1分钟&#xff09;。 安全要求&#xff1a; 端到端加密&#xff08;E2EE&a…

OpenHarmony 开源鸿蒙南向开发——linux下使用make交叉编译第三方库——mqtt库

准备工作 请依照这篇文章搭建环境 OpenHarmony 开源鸿蒙南向开发——linux下使用make交叉编译第三方库——环境配置_openharmony交叉编译-CSDN博客 下载 wget ftp://ftp.gnutls.org/gcrypt/gnutls/v3.5/gnutls-3.5.9.tar.xz 解压 tar -xf mkdir ./out cd ./out Cmake命…

武汉SMT贴片工艺优化与生产效能提升路径

内容概要 随着华中地区电子制造产业集群的快速发展&#xff0c;武汉SMT贴片行业面临工艺升级与效能提升的双重挑战。本文聚焦SMT生产全流程中的关键环节&#xff0c;从钢网印刷精度控制、回流焊温度曲线优化、AOI检测系统迭代三大核心工艺出发&#xff0c;结合区域产业链特点提…

线程池(ThreadPoolExecutor)实现原理和源码细节是Java高并发面试和实战开发的重点

一、线程池核心流程图 ----------------- | 提交任务 | submit/execute -----------------|v ----------------- | 判断核心线程数 | < corePoolSize&#xff1f; -----------------|Yes |Nov v [创建新线程] -----------------| 队列是否满&a…

学习海康VisionMaster之直方图工具

一&#xff1a;进一步学习了 今天学习下VisionMaster中的直方图工具&#xff1a;就是统计在ROI范围内进行灰度级分布的统计 二&#xff1a;开始学习 1&#xff1a;什么是直方图工具&#xff1f; 直方图工具针对输入灰度图像的指定ROI区域&#xff0c;输出该区域的图像灰度直方…