如何使用队列处理 API 速率限制

对于遇到速率限制的应用程序来说也是一个挑战,因为它需要“放慢速度”或暂停。这是一个典型的场景:

  • 初始请求:当应用程序发起与 API 的通信时,它会请求特定的数据或功能。
  • API 响应: API 处理请求并响应请求的信息或执行所需的操作。
  • 速率限制:如果应用程序已达到限制,通常需要等到下一个指定的时间范围(例如一分钟到一小时)才能发出其他请求。如果它是“软”速率限制并且时间范围已知并且是线性的,则更容易处理。通常,每个区块的等待时间都会增加,需要对每个 API 进行完全不同的自定义处理。
  • 处理超出速率限制:如果应用程序超出速率限制,它可能会收到来自 API 的错误响应(例如“429 Too Many Requests”状态代码)。应用程序需要优雅地处理这个问题,可能是通过对请求进行排队、实施退避策略(在重试之前等待逐渐延长的时间)或通知用户已达到速率限制。

为了在速率限制内有效运行,应用程序通常采用以下策略:

  • 限制:调节传出请求的速率以符合 API 的速率限制。
  • 缓存:将频繁请求的数据存储在本地,以减少重复 API 调用的需要。
  • 指数退避:实施一种策略,使应用程序在达到速率限制后在后续重试之间等待的时间越来越长,以减少服务器负载并防止立即重试。
  • 队列? 下一节将详细介绍

使用队列

由于队列能够系统地处理任务,因此可以作为出色的“助手”或工具来帮助服务管理速率限制。然而,虽然它提供了显着的好处,但它并不是用于此目的的独立解决方案。

在构建健壮的架构时,用于与受速率限制的外部 API 交互的服务或应用程序通常会异步处理任务。该服务通常由从队列派生的任务启动。当服务遇到速率限制时,它可以轻松地将作业返回到主队列或将其分配到指定用于延迟任务的单独队列,并在特定的等待时间(例如 X 秒)后重新访问它。

这种对队列系统的依赖是非常有利的,主要是因为它的临时性质和排序。然而,仅靠队列并不能完全解决速率限制问题;它需要额外的功能或服务本身的帮助才能有效地处理这些限制。

使用队列时可能会出现挑战

  • 重新进入队列的任务可能会比必要的时间更早返回,因为它们的时间不直接由您的服务控制。
  • 由于在有限时间内频繁拨打电话而超出速率限制。这可能需要实施睡眠或等待机制,由于它们对性能和响应能力的潜在影响,通常被认为是不好的做法。

RabbitMQ

const amqp = require('amqplib');
const axios = require('axios');// Function to make API requests, simulating rate limitations
async function makeAPICall(url) {try {const response = await axios.get(url);console.log('API Response:', response.data);} catch (error) {console.error('API Error:', error.message);}
}// Connect to RabbitMQ server
async function connect() {try {const connection = await amqp.connect('amqp://localhost');const channel = await connection.createChannel();const queue = 'rateLimitedQueue';channel.assertQueue(queue, { durable: true });// Consume messages from the queuechannel.consume(queue, async msg => {const { url, delayInSeconds } = JSON.parse(msg.content.toString());// Simulating rate limitationawait new Promise(resolve => setTimeout(resolve, delayInSeconds * 1000));await makeAPICall(url); // Make the API callchannel.ack(msg); // Acknowledge message processing completion});} catch (error) {console.error('RabbitMQ Connection Error:', error.message);}
}// Function to send a message to the queue
async function addToQueue(url, delayInSeconds) {try {const connection = await amqp.connect('amqp://localhost');const channel = await connection.createChannel();const queue = 'rateLimitedQueue';channel.assertQueue(queue, { durable: true });const message = JSON.stringify({ url, delayInSeconds });channel.sendToQueue(queue, Buffer.from(message), { persistent: true });console.log('Task added to the queue');} catch (error) {console.error('RabbitMQ Error:', error.message);}
}// Usage example
addToQueue('https://api.example.com/data', 5); // Add an API call with a delay of 5 seconds// Start the consumer
connect();

Kafka

const { Kafka } = require('kafkajs');
const axios = require('axios');// Function to make API requests, simulating rate limitations
async function makeAPICall(url) {try {const response = await axios.get(url);console.log('API Response:', response.data);} catch (error) {console.error('API Error:', error.message);}
}// Kafka configuration
const kafka = new Kafka({clientId: 'my-app',brokers: ['localhost:9092'], // Replace with your Kafka broker address
});// Create a Kafka producer
const producer = kafka.producer();// Connect to Kafka and send messages
async function produceToKafka(topic, message) {await producer.connect();await producer.send({topic,messages: [{ value: message }],});await producer.disconnect();
}// Create a Kafka consumer
const consumer = kafka.consumer({ groupId: 'my-group' });// Consume messages from Kafka topic
async function consumeFromKafka(topic) {await consumer.connect();await consumer.subscribe({ topic });await consumer.run({eachMessage: async ({ message }) => {const { url, delayInSeconds } = JSON.parse(message.value.toString());// Simulating rate limitationawait new Promise(resolve => setTimeout(resolve, delayInSeconds * 1000));await makeAPICall(url); // Make the API call},});
}// Usage example - Sending messages to Kafka topic
async function addToKafka(topic, url, delayInSeconds) {const message = JSON.stringify({ url, delayInSeconds });await produceToKafka(topic, message);console.log('Message added to Kafka topic');
}// Start consuming messages from Kafka topic
const kafkaTopic = 'rateLimitedTopic';
consumeFromKafka(kafkaTopic);// Usage example - Adding messages to Kafka topic
addToKafka('rateLimitedTopic', 'https://api.example.com/data', 5); // Add an API call with a delay of 5 seconds

这两种方法都是合法的,但它们需要您的服务包含“睡眠”机制。
借助 Memphis,您可以使用专门为此目的而设计的称为“延迟消息”的简单功能,将延迟从客户端转移到队列。当您的消费者应用程序需要额外的处理时间时,延迟消息允许您将收到的消息发送回代理。

孟菲斯实施的独特之处在于消费者能够独立且原子地控制这种延迟。
在站内,未消费消息的计数不会影响延迟消息的消费。例如,如果需要 60 秒的延迟,它会精确配置该特定消息的不可见时间。

Memphis.dev 延迟消息

  1. 消费者组收到一条消息。
  2. 发生事件,提示消费者组暂停处理消息。
  3. 假设maxMsgDeliveries尚未达到其限制,消费者将激活message.delay(delayInMilliseconds),绕过消息。代理不会立即重新处理同一消息,而是将其保留指定的持续时间。
  4. 后续消息将被消费。
  5. 一旦请求delayInMilliseconds通过,代理将停止主要消息流并将延迟的消息重新引入循环。

孟菲斯

const { memphis } = require('memphis-dev');// Function to make API requests, simulating rate limitations 
async function makeAPICall(message) 
{ try { const response = await axios.get(message.getDataAsJson()['url']); console.log('API Response:', response.data); message.ack();} catch (error) { console.error('API Error:', error.message); console.log("Delaying message for 1 minute"); message.delay(60000);} 
}(async function () {let memphisConnection;try {memphisConnection = await memphis.connect({host: '<broker-hostname>',username: '<application-type username>',password: '<password>'});const consumer = await memphisConnection.consumer({stationName: '<station-name>',consumerName: '<consumer-name>',consumerGroup: ''});consumer.setContext({ key: "value" });consumer.on('message', (message, context) => {await makeAPICall(url, message);});consumer.on('error', (error) => { });} catch (ex) {console.log(ex);if (memphisConnection) memphisConnection.close();}
})();

结论

了解并遵守速率限制对于使用 API 的应用程序开发人员至关重要。它涉及管理请求频率、达到限制时处理错误、实施退避策略以防止 API 服务器过载以及利用 API 提供的速率限制信息来优化应用程序性能,现在您也知道如何使用队列来做到这一点!


作者:Idan Asulin

更多技术干货请关注公号【云原生数据库

squids.cn,云数据库RDS,迁移工具DBMotion,云备份DBTwin等数据库生态工具。

irds.cn,多数据库管理平台(私有云)。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/578270.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

12.25

led.c #include "led.h" void all_led_init() {RCC_GPIO | (0X3<<4);//时钟使能GPIOE_MODER &(~(0X3<<20));//设置PE10输出GPIOE_MODER | (0X1<<20);//设置PE10为推挽输出GPIOE_OTYPER &(~(0x1<<10));//PE10为低速输出GPIOE_OSPEED…

单集群400TB,OceanBase稳定支撑快手核心业务场景

一款日均超过千万人访问的短视频 App 快手&#xff0c;面对高并发流量如何及时有效地处理用户请求&#xff1f;通过在后端配置多套 MySQL 集群来支撑高流量访问&#xff0c;以解决大数据量存储和性能问题&#xff0c;这种传统的 MySQL 分库分表方案有何问题&#xff1f;快手对分…

评估回馈电子负载的重要指标?

回馈电子负载是用于测试电源、电池和其他电子设备性能的设备。它可以模拟实际负载&#xff0c;同时将多余的能量回馈到电网或电池中。在选择和使用回馈电子负载时&#xff0c;有几个重要的指标需要考虑&#xff1a; 功率范围&#xff1a;回馈电子负载的功率范围是指其能够提供的…

巅峰画师Midjourney:新时代的独角兽

介绍 AI绘画领域中&#xff0c;Midjourney处于绝对地位&#xff0c;并且一年时间就登顶。 Midjourney是一家独立的AI研究实验室,探索新的思维媒介,拓展人类的想象力。 它由一个小型的自筹资金团队组成,专注于设计、人类基础设施和AI。 在AI绘画领域,Midjourney取得了非常突出…

百度Apollo五步入门自动驾驶:Dreamview与离线数据包分析(文末赠送apollo周边)

&#x1f3ac; 鸽芷咕&#xff1a;个人主页 &#x1f525; 个人专栏:《linux深造日志》《粉丝福利》 ⛺️生活的理想&#xff0c;就是为了理想的生活! ⛳️ 粉丝福利活动 ✅参与方式&#xff1a;通过连接报名观看课程&#xff0c;即可免费获取精美周边 ⛳️活动链接&#xf…

【PDF密码】 一键强制去掉pdf密码

想要给PDF文件设置一个密码防止他人对文件进行编辑&#xff0c;那么我们可以对PDF文件设置限制编辑&#xff0c;设置方法很简单&#xff0c;我们在PDF编辑器中点击文件 – 属性 – 安全&#xff0c;在权限下拉框中选中【密码保护】 然后在密码保护界面中&#xff0c;我们勾选【…

通过three.js玩转车展项目

1.项目搭建 1.1 创建文件夹 mkdir 文件名1.2 初始化package.json npm init -y1.3 安装打包工具并配置相关依赖 npm i parcel -d在package.json中打包路径和指令 1.4 安装three.js npm i three -d2.项目搭建 2.1 新建index.html&#xff0c;并再index.html引入car.js,在…

从流星雨启程:Python和Pygame下载与安装全过程

文章目录 一、前言二、下载安装过程1.官网下载安装包2.安装python过程第一步第二步第三步第四步第五步安装完成 3.简单测试Python3.1 检查 Python 版本号3.2 打开 Python 解释器3.3 输入你的第一个代码3.4 运行 Python 脚本 4.安装Pygame4.1 cmd命令安装Pygame4.2 pip升级4.3 安…

实战:朴素贝叶斯文本分类器搭建与性能评估

&#x1f497;&#x1f497;&#x1f497;欢迎来到我的博客&#xff0c;你将找到有关如何使用技术解决问题的文章&#xff0c;也会找到某个技术的学习路线。无论你是何种职业&#xff0c;我都希望我的博客对你有所帮助。最后不要忘记订阅我的博客以获取最新文章&#xff0c;也欢…

Java 快速入门

简介 跨平台性&#xff1a;Java 最大的优势之一就是跨平台性&#xff0c;即一份 Java 程序可以在多平台上运行&#xff0c;而无需重写。 简单易学&#xff1a;Java 的语法和面向对象的开发方式非常简单易学。 安全性&#xff1a;Java 对于安全性的处理非常慎重&#xff0c;对…

接口测试和功能测试

本文主要分为两个部分&#xff1a; 第一部分&#xff1a;主要从问题出发&#xff0c;引入接口测试的相关内容并与前端测试进行简单对比&#xff0c;总结两者之前的区别与联系。但该部分只交代了怎么做和如何做&#xff1f;并没有解释为什么要做&#xff1f; 第二部分&#xf…

wifi驱动打开双模式

双模式 3.1 开启双模式 在Makefile添加EXTRA_CFLAGS -DCONFIG_CONCURRENT_MODE 重新编译&#xff08;make之后发现不正常工作&#xff0c;需要make clean清理一下&#xff09;。 再用sudo rmmod 8821cu.ko&#xff0c;重新启动。出现wlan1&#xff1a; 出现问题&#xff1…

助力打造清洁环境,基于轻量级DETR(DEtectionTRansformer)开发构建公共场景下垃圾堆放垃圾桶溢出检测识别系统

公共社区环境生活垃圾基本上是我们每个人每天几乎都无法避免的一个问题&#xff0c;公共环境下垃圾投放点都会有固定的值班时间&#xff0c;但是考虑到实际扔垃圾的无规律性&#xff0c;往往会出现在无人值守的时段内垃圾堆放垃圾桶溢出等问题&#xff0c;有些容易扩散的垃圾比…

2024年PMP考试新考纲-PMBOK第七版-项目绩效域真题解析

如何一次性通过PMP考试&#xff0c;取得3A等级的PMP证书&#xff1f;华研荟根据十多年的培训和辅导&#xff0c;以及数千名学员的建议是&#xff1a; 先把PMBOK第六版、第七版和敏捷实践指南的三本官方教材研读一遍&#xff08;如果觉得自己看书慢&#xff0c;可以看讲解视频&…

基于多反应堆的高并发服务器【C/C++/Reactor】(中)Dispatcher模块的实现思路和定义

&#xff08;四&#xff09;Dispatcher模块的实现思路 关于dispatcher&#xff0c;它应该是反应堆模型里边的核心组成部分&#xff0c;因为如果说这个反应堆模型里边有事件需要处理&#xff0c;或者说有事件需要检测&#xff0c;那么是需要通过这个poll、epoll 或者 select来完…

Spring Boot3 Web开发技术

前期回顾 springboot项目常见的配置文件类型有哪些&#xff1f;哪种类型的优先级最高 yml properties yaml 读取配置文件里的数据用什么注解&#xff1f; value restful风格 RESTful 风格与传统的 HTTP 请求方式相比&#xff0c;更加简洁&#xff0c;安全&#xff0c;能隐…

Kind创建k8s - JAVA操作控制

kind 简介kind 架构安装 Kind (必备工具)docker官网kubectl官网kind官网校验安装结果 关于kind 命令 安装一个集群查看当前 Kubernetes 集群中的节点信息。查看当前命名空间下中的Pod&#xff08;容器实例&#xff09;的信息。使用 kind create cluster 安装&#xff0c;关于安…

MYSQL一一函数一一流程函数

咱今天讲的是MySQL函数中的流程函数&#xff0c;会有3小题和一个综合案例帮助大家理解 流程函数是很常用的一类函数&#xff0c;可以在SQL语句中实现条件筛选&#xff0c;从而提高语句的效率 小题&#xff1a; ①if语句&#xff1a; select if(flash,ok,error); //如果…

Java之Atomic 原子类总结

Java之Atomic 原子类总结 Atomic 原子类介绍 Atomic 翻译成中文是原子的意思。在化学上&#xff0c;我们知道原子是构成一般物质的最小单位&#xff0c;在化学反应中是不可分割的。在我们这里 Atomic 是指一个操作是不可中断的。即使是在多个线程一起执行的时候&#xff0c;一…

【Java】你掌握了多线程吗?

【文末送书】今天推荐一本Java多线程编程领域新书《一本书讲透Java线程》 摘要 互联网的每一个角落&#xff0c;无论是大型电商平台的秒杀活动&#xff0c;社交平台的实时消息推送&#xff0c;还是在线视频平台的流量洪峰&#xff0c;背后都离不开多线程技术的支持。在数字化转…