印度尼西亚股票数据API对接实现

news/2025/9/18 12:43:47/文章来源:https://www.cnblogs.com/CryptoRzz/p/19098520

环境准备

首先安装必要的依赖包:

pip install requests websocket-client pandas numpy

基础配置

import requests
import json
import websocket
import threading
import time
from datetime import datetime# API配置
API_KEY = "YOUR_API_KEY"  # 替换为您的实际API密钥
BASE_URL = "https://api.stocktv.top"
WS_URL = "wss://ws-api.stocktv.top/connect"# 印尼股票代码映射(示例)
IDX_SYMBOLS = {"BBCA": "Bank Central Asia","BBRI": "Bank Rakyat Indonesia","TLKM": "Telkom Indonesia","ASII": "Astra International","UNVR": "Unilever Indonesia"
}

REST API实现

1. 获取印尼股票列表

def get_indonesia_stocks(page=1, page_size=100):"""获取印尼交易所股票列表"""url = f"{BASE_URL}/stock/stocks"params = {"countryId": 42,      # 印尼国家ID"exchangeId": 62,     # IDX交易所ID"pageSize": page_size,"page": page,"key": API_KEY}try:response = requests.get(url, params=params)if response.status_code == 200:data = response.json()if data.get("code") == 200:return data["data"]["records"]else:print(f"API Error: {data.get('message')}")else:print(f"Request failed with status: {response.status_code}")except Exception as e:print(f"Error fetching stock list: {str(e)}")return []# 示例:获取第一页股票列表
stocks = get_indonesia_stocks()
for stock in stocks:print(f"{stock['symbol']}: {stock['name']} - {stock['last']}")

2. 查询特定股票详情

def get_stock_detail(symbol_or_id):"""获取股票详细信息"""url = f"{BASE_URL}/stock/queryStocks"# 判断是symbol还是idif isinstance(symbol_or_id, str) and symbol_or_id.isdigit():params = {"id": symbol_or_id, "key": API_KEY}else:params = {"symbol": symbol_or_id, "key": API_KEY}try:response = requests.get(url, params=params)if response.status_code == 200:data = response.json()if data.get("code") == 200 and data["data"]:return data["data"][0]else:print(f"API Error: {data.get('message')}")else:print(f"Request failed with status: {response.status_code}")except Exception as e:print(f"Error fetching stock detail: {str(e)}")return None# 示例:获取BBCA股票详情
bbca_detail = get_stock_detail("BBCA")
if bbca_detail:print(f"BBCA当前价格: {bbca_detail['last']}")print(f"涨跌幅: {bbca_detail['chgPct']}%")

3. 获取指数数据

def get_indonesia_indices():"""获取印尼主要指数"""url = f"{BASE_URL}/stock/indices"params = {"countryId": 42,  # 印尼国家ID"key": API_KEY}try:response = requests.get(url, params=params)if response.status_code == 200:data = response.json()if data.get("code") == 200:return data["data"]else:print(f"API Error: {data.get('message')}")else:print(f"Request failed with status: {response.status_code}")except Exception as e:print(f"Error fetching indices: {str(e)}")return []# 示例:获取印尼指数
indices = get_indonesia_indices()
for index in indices:print(f"{index['symbol']}: {index['last']} ({index['chgPct']}%)")

4. 获取K线数据

def get_kline_data(pid, interval="PT15M"):"""获取股票K线数据"""url = f"{BASE_URL}/stock/kline"params = {"pid": pid,"interval": interval,"key": API_KEY}try:response = requests.get(url, params=params)if response.status_code == 200:data = response.json()if data.get("code") == 200:return data["data"]else:print(f"API Error: {data.get('message')}")else:print(f"Request failed with status: {response.status_code}")except Exception as e:print(f"Error fetching kline data: {str(e)}")return []# 示例:获取BBCA的15分钟K线数据
bbca_kline = get_kline_data(41602, "PT15M")
for kline in bbca_kline[:5]:  # 显示前5条dt = datetime.fromtimestamp(kline["time"] / 1000)print(f"{dt}: O:{kline['open']} H:{kline['high']} L:{kline['low']} C:{kline['close']}")

5. 获取涨跌排行榜

def get_top_gainers():"""获取涨幅榜"""url = f"{BASE_URL}/stock/updownList"params = {"countryId": 42,  # 印尼国家ID"type": 1,         # 1=涨幅榜, 2=跌幅榜"key": API_KEY}try:response = requests.get(url, params=params)if response.status_code == 200:data = response.json()if data.get("code") == 200:return data["data"]else:print(f"API Error: {data.get('message')}")else:print(f"Request failed with status: {response.status_code}")except Exception as e:print(f"Error fetching top gainers: {str(e)}")return []# 示例:获取印尼涨幅榜
gainers = get_top_gainers()
for stock in gainers[:10]:  # 显示前10名print(f"{stock['symbol']}: {stock['last']} (+{stock['chgPct']}%)")

WebSocket实时数据

WebSocket客户端实现

class IDXWebSocketClient:def __init__(self, api_key):self.api_key = api_keyself.ws = Noneself.connected = Falseself.subscriptions = set()# 启动连接self.connect()# 启动心跳线程threading.Thread(target=self.heartbeat, daemon=True).start()def connect(self):"""连接WebSocket服务器"""ws_url = f"{WS_URL}?key={self.api_key}"self.ws = websocket.WebSocketApp(ws_url,on_open=self.on_open,on_message=self.on_message,on_error=self.on_error,on_close=self.on_close)# 启动WebSocket线程threading.Thread(target=self.ws.run_forever).start()time.sleep(1)  # 等待连接建立def on_open(self, ws):"""连接建立回调"""print("已连接到印尼股票实时数据服务")self.connected = True# 重新订阅之前订阅的股票if self.subscriptions:self.subscribe(list(self.subscriptions))def on_message(self, ws, message):"""接收消息回调"""try:data = json.loads(message)# 处理实时行情数据if "pid" in data:symbol = data.get("symbol", "Unknown")price = data.get("last_numeric", 0)change = data.get("pc", 0)change_pct = data.get("pcp", 0)print(f"实时行情 [{symbol}]: {price} ({change} / {change_pct}%)")# 处理心跳响应elif data.get("action") == "pong":passexcept Exception as e:print(f"处理实时数据时出错: {str(e)}")def on_error(self, ws, error):"""错误处理回调"""print(f"WebSocket错误: {str(error)}")def on_close(self, ws, close_status_code, close_msg):"""连接关闭回调"""print("WebSocket连接已关闭")self.connected = False# 尝试重新连接print("尝试重新连接...")time.sleep(3)self.connect()def subscribe(self, pids):"""订阅股票"""if not self.connected:print("未连接,无法订阅")return False# 添加到订阅列表self.subscriptions.update(pids)# 构造订阅消息message = json.dumps({"action": "subscribe","pids": list(pids)})# 发送订阅请求self.ws.send(message)print(f"已订阅: {', '.join(map(str, pids))}")return Truedef unsubscribe(self, pids):"""取消订阅股票"""if not self.connected:print("未连接,无法取消订阅")return False# 从订阅列表中移除for pid in pids:self.subscriptions.discard(pid)# 构造取消订阅消息message = json.dumps({"action": "unsubscribe","pids": list(pids)})# 发送取消订阅请求self.ws.send(message)print(f"已取消订阅: {', '.join(map(str, pids))}")return Truedef heartbeat(self):"""心跳维护"""while True:if self.connected:try:# 每30秒发送一次心跳self.ws.send(json.dumps({"action": "ping"}))except Exception as e:print(f"发送心跳失败: {str(e)}")time.sleep(30)# 使用示例
if __name__ == "__main__":# 创建WebSocket客户端ws_client = IDXWebSocketClient(API_KEY)# 订阅股票(需要先获取股票ID)time.sleep(2)  # 等待连接建立ws_client.subscribe([41602, 41605])  # 订阅BBCA和BRIS# 保持主线程运行try:while True:time.sleep(1)except KeyboardInterrupt:print("程序已终止")

高级功能实现

1. 数据缓存策略

from cachetools import TTLCacheclass IDXDataCache:def __init__(self, maxsize=100, ttl=60):"""初始化数据缓存"""self.cache = TTLCache(maxsize=maxsize, ttl=ttl)def get_stock_data(self, symbol_or_id):"""获取股票数据(带缓存)"""# 检查缓存if symbol_or_id in self.cache:return self.cache[symbol_or_id]# 从API获取data = get_stock_detail(symbol_or_id)# 更新缓存if data:self.cache[symbol_or_id] = datareturn data# 使用示例
cache = IDXDataCache()
bbca_data = cache.get_stock_data("BBCA")

2. 实时数据处理器

class RealTimeDataProcessor:def __init__(self):self.data_buffer = {}self.batch_size = 10self.last_process_time = time.time()def add_data(self, symbol, data):"""添加实时数据到缓冲区"""if symbol not in self.data_buffer:self.data_buffer[symbol] = []self.data_buffer[symbol].append(data)# 检查是否达到批处理条件current_time = time.time()if (len(self.data_buffer[symbol]) >= self.batch_size or current_time - self.last_process_time >= 1.0):self.process_data(symbol)self.last_process_time = current_timedef process_data(self, symbol):"""处理缓冲区的数据"""if symbol not in self.data_buffer or not self.data_buffer[symbol]:returndata_points = self.data_buffer[symbol]# 计算统计指标prices = [d["last_numeric"] for d in data_points]volumes = [d.get("turnover_numeric", 0) for d in data_points]avg_price = sum(prices) / len(prices)max_price = max(prices)min_price = min(prices)total_volume = sum(volumes)print(f"\n{symbol} 实时数据统计 (最近 {len(data_points)} 个更新):")print(f"平均价格: {avg_price:.2f}, 最高: {max_price:.2f}, 最低: {min_price:.2f}")print(f"总成交量: {total_volume}")# 清空缓冲区self.data_buffer[symbol] = []# 在WebSocket客户端的on_message方法中使用
processor = RealTimeDataProcessor()def on_message(self, ws, message):try:data = json.loads(message)if "pid" in data:symbol = data.get("symbol", "Unknown")processor.add_data(symbol, data)except Exception as e:print(f"处理实时数据时出错: {str(e)}")

3. 错误处理与重试机制

def api_request_with_retry(url, params, max_retries=3):"""带重试机制的API请求"""for attempt in range(max_retries):try:response = requests.get(url, params=params, timeout=10)if response.status_code == 200:data = response.json()if data.get("code") == 200:return dataelif data.get("code") == 429:  # 请求过多retry_after = int(data.get("retryAfter", 30))print(f"请求过于频繁,等待 {retry_after} 秒后重试...")time.sleep(retry_after)else:print(f"API返回错误: {data.get('message')}")else:print(f"请求失败,状态码: {response.status_code}")except Exception as e:print(f"请求异常: {str(e)}")# 指数退避等待wait_time = 2 ** attemptprint(f"等待 {wait_time} 秒后重试 (尝试 {attempt+1}/{max_retries})")time.sleep(wait_time)print(f"请求失败,已达最大重试次数 {max_retries}")return None

完整示例应用

class IDXStockMonitor:def __init__(self, api_key):self.api_key = api_keyself.ws_client = Noneself.data_cache = IDXDataCache()self.monitored_stocks = set()def start_monitoring(self, symbols):"""开始监控指定股票"""print("开始监控印尼股票...")# 获取股票IDstock_ids = []for symbol in symbols:stock_data = self.data_cache.get_stock_data(symbol)if stock_data:stock_ids.append(stock_data["id"])self.monitored_stocks.add(symbol)print(f"已添加监控: {symbol} (ID: {stock_data['id']})")else:print(f"无法获取股票信息: {symbol}")# 启动WebSocket连接if stock_ids:self.ws_client = IDXWebSocketClient(self.api_key)time.sleep(2)  # 等待连接建立self.ws_client.subscribe(stock_ids)# 启动定期数据更新self.start_periodic_updates()def start_periodic_updates(self):"""启动定期数据更新"""def update_loop():while True:# 每5分钟更新一次指数数据self.update_indices()# 每10分钟更新一次股票列表if len(self.monitored_stocks) < 10:  # 只更新少量股票self.update_stock_list()time.sleep(300)  # 5分钟threading.Thread(target=update_loop, daemon=True).start()def update_indices(self):"""更新指数数据"""print("\n更新印尼指数数据...")indices = get_indonesia_indices()for index in indices:print(f"{index['symbol']}: {index['last']} ({index['chgPct']}%)")def update_stock_list(self):"""更新股票列表"""print("\n更新印尼股票列表...")stocks = get_indonesia_stocks(page_size=50)for stock in stocks[:10]:  # 只显示前10只print(f"{stock['symbol']}: {stock['name']} - {stock['last']}")def run(self):"""运行监控"""try:# 监控主要印尼股票symbols_to_monitor = ["BBCA", "BBRI", "TLKM", "ASII", "UNVR"]self.start_monitoring(symbols_to_monitor)# 保持主线程运行while True:time.sleep(1)except KeyboardInterrupt:print("\n监控已停止")except Exception as e:print(f"监控出错: {str(e)}")# 启动监控
if __name__ == "__main__":monitor = IDXStockMonitor(API_KEY)monitor.run()

部署建议

  1. API密钥管理:不要将API密钥硬编码在代码中,使用环境变量或配置文件
  2. 错误处理:增加更完善的错误处理和日志记录
  3. 速率限制:遵守API的速率限制,避免频繁请求
  4. 数据存储:考虑将重要数据存储到数据库中以供后续分析
  5. 监控告警:设置价格波动告警机制
# 从环境变量获取API密钥
import os
API_KEY = os.getenv("STOCKTV_API_KEY", "YOUR_API_KEY")

以上是一个完整的印尼股票数据API对接实现方案。您可以根据实际需求进行调整和扩展。如果您需要更多特定功能或有任何问题,请随时告诉我。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/907122.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

OpenBMB 发布无分词器 TTS VoxCPM;儿童口语硬件 Dex 融资 480 万美元:拍摄真实物体,对话学习外语丨日报

开发者朋友们大家好:这里是 「RTE 开发者日报」,每天和大家一起看新闻、聊八卦。我们的社区编辑团队会整理分享 RTE(Real-Time Engagement) 领域内「有话题的技术」、「有亮点的产品」、「有思考的文章」、「有态度…

一天一款实用的AI工具,第1期,AI标题生成工具

本期介绍的是一款专业的标题生成工具,它能帮你产出高质量标题,让点击率提升,让内容被看见。现实问题 在内容创作的世界里,有句话特别扎心: 好的标题=成功的一半。 很多创作者都遇到过这样的困境: 花了一下午写好…

重组蛋白表达避坑指南

重组蛋白表达避坑指南重组蛋白表达是分子生物学、生物技术以及生物医学研究中非常基础却经常“出问题”的环节。一个合适的蛋白表达方案,不仅要能产生足够的产量,还要确保蛋白正确折叠、具有功能、具有良好的纯度与稳…

易被忽略的vim中视图模式

常见的都是vim三种模式,但视图模式也不可忽略,主要进行批量操作在 Vim 中,可视模式(Visual Mode)是一种强大的文本选择和编辑模式,允许你高亮选中一段文本,然后对其进行操作(如复制、删除、替换、注释等)。 一…

详细介绍:智慧校园统一身份认证中心:一个账号畅行校园内外

pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "Monaco", "Courier New", …

电商核心业务 - 指南

电商核心业务 - 指南pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "Monaco", "…

一言

一些日常的感想,为了节约时间,为了不暴露太多东西,为了不挑起矛盾,内容会很简洁,在合适的时候公布详情。9.17 说好的向阳而生呢?冷静啊,兄弟。 9.18 你们不相信我,我必将证明我,夺回属于我的荣耀。

ai

https://qsqs.life/login?redirect=/system/dashboard本文来自博客园,作者:zjxgdq,转载请注明原文链接:https://www.cnblogs.com/zjxzhj/p/19098509

LlamaIndex 项目深度技术分析 - 详解

LlamaIndex 项目深度技术分析 - 详解pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "Monac…

苏州才是最美的烟雨江南,苏州游玩必去的10大景点

苏州才是最美的烟雨江南,苏州游玩必去的10大景点 蜘蛛指南 关注2024-05-22 16:22 北京 来源:澎湃新闻澎湃号湃客 字号苏州人间天堂 最美的烟雨江南 苏州,一个极具江南风情的城市,既有园林之美,也有诗情画意,也是…

深入解析:css消除图片下的白边

深入解析:css消除图片下的白边pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "Monaco&quo…

linux增加网卡ip地址

linux增加网卡ip地址example ip addr add 192.168.5.124/24 dev eth0 label eth0:5 ifconfig eth0:5 up ip addr del 192.168.1.100/24 dev eth0 example ip addr add 192.168.10.199/24 dev eth0 label eth0:10 route…

Python 包与环境管理简史:从混乱到优雅

自动包管理工具的先驱:easy_install 在一切规范化工具出现之前,Python 的包管理是相当原始的。开发者们需要把第三方库的源码下载下来,手动放到项目目录里。 为了解决自动安装包的问题,easy_install 应运而生。 20…

qoj853 Flat Organization

SOLUTION FROM WUMIN4 题意 给出一个 \(n\) 个点的带权竞赛图(定向完全图),你可以进行任意次操作,每次操作反转一条边,代价为边权,求使得图强连通的最小代价和与方案,或输出无解。 \(n\le 2000\)。 思路 我们先…

实用指南:Chromium 138 编译指南 macOS 篇:Xcode 与开发工具安装配置(二)

pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "Monaco", "Courier New", …

shell命令中循环执行操作的命令

shell命令中循环执行操作的命令reference: for i in $(seq 1 10000); do echo "Iteration $i" && echo "Iteration $i"; done for i in $(seq 1 10000); do cat /sys/class/net/eth0/carr…

2025年9月中国数据库排行榜:达梦挺进榜眼位,崖山首入前十强

9月墨天轮排行榜解读已出炉!本月前十变动较大,老将突围、新秀崛起,达梦凭借强劲势头跃升至第二位、TiDB排名上升、崖山首次闯入前十,此外还有一些产品表现亮眼!本月墨天轮社区的中国数据库排行榜再起波澜。达梦凭…

基于QEMU模拟器搭建Builtroot下的QT开发环境

基于QEMU模拟器搭建Builtroot下的QT开发环境https://www.cnblogs.com/arnoldlu/p/17250728.html

OpenSSH漏洞修复

前期准备 (先使用Telnet远程连接工具,连接服务器,确保Telnet连接正常,SSH连接后进行漏洞修复升级(防止修复失败,导致远程连接无法连接时,可以通过另一个远程工具连接进行恢复) telnet安装与开启:https://www.…

5个存款技巧,银行员工都在用,你知道几个?

5个存款技巧,银行员工都在用,你知道几个?Posted on 2025-09-18 12:05 lzhdim 阅读(0) 评论(0) 收藏 举报对于普通人来说,辛苦打拼攒下的钱,那都是血汗的结晶。去银行存钱时,咱们往往只知道常规操作,可是银…