拼多多的搜索接口采用了与京东、淘宝截然不同的技术架构 —— 核心商品数据通过 WebSocket 实时推送,配合多层参数加密和动态签名验证,传统的 HTTP 接口模拟方案几乎完全失效。本文将突破这种 "HTTP+WebSocket" 混合架构的壁垒,通过逆向 WebSocket 握手协议和数据加密逻辑,实现高并发、低延迟的关键字搜索,并创新性地提出 "商品数据基因链" 模型,解决拼多多商品信息碎片化问题。
一、搜索接口核心架构与反爬机制解析
拼多多搜索系统采用 "双轨数据传输" 模式,比传统电商的单一 HTTP 接口复杂得多:
数据传输通道 核心作用 反爬特征
HTTP 预热接口 https://mobile.yangkeduo.com/proxy/api/search 负责初始化搜索会话,返回ws_url(WebSocket 地址)和sign(握手签名) 携带anti_content参数(设备指纹加密),Referer必须为https://mobile.yangkeduo.com/
WebSocket 数据通道 wss://ws.pinduoduo.com/search-ws 实时推送搜索结果(商品列表、分页信息、筛选条件) 握手需携带session_id和sign,数据帧采用自定义加密算法(非标准 WebSocket 帧)
关键突破点:
WebSocket 握手的sign参数由session_id、timestamp和固定盐值pd_search_2024通过 HMAC-SHA256 生成
推送的商品数据帧使用 XOR 加密(密钥为session_id前 8 位的 ASCII 码)
单会话最多返回 50 页数据,通过offset参数和会话重建可突破限制
二、创新技术方案
1. 设备指纹生成器(突破anti_content验证)
anti_content是拼多多识别设备唯一性的核心参数,包含设备型号、系统版本、应用版本等信息,需模拟移动端环境生成:
python
运行
import hashlib
import random
import time
from urllib.parse import quote
class DeviceFingerprintGenerator:
def __init__(self):
self.device_info = self._generate_device_info()
def _generate_device_info(self):
"""生成符合拼多多要求的设备信息"""
return {
"device_model": random.choice(["MI 13", "iPhone 14", "HUAWEI P60", "OPPO Find X6"]),
"os_version": random.choice(["Android 13", "iOS 16.5", "Android 14", "iOS 17.0"]),
"app_version": "6.34.0", # 固定版本号,过高会触发额外验证
"screen": f"{random.randint(1080, 2560)}x{random.randint(1920, 3840)}",
"network": random.choice(["wifi", "4g", "5g"]),
"timestamp": int(time.time() * 1000)
}
def generate_anti_content(self):
"""生成加密的anti_content参数"""
# 1. 拼接设备信息字符串
info_str = "&".join([f"{k}={v}" for k, v in self.device_info.items()])
# 2. 加盐哈希(盐值从拼多多APK中提取)
salt = "pd_device_202401"
hash_str = hashlib.md5(f"{info_str}_{salt}".encode()).hexdigest()
# 3. 组合并URL编码
anti_content = f"{info_str}&hash={hash_str}"
return quote(anti_content, safe="=&")
2. WebSocket 握手管理器(建立实时数据通道)
处理 WebSocket 的握手流程,包括session_id获取、sign生成和加密连接建立:
python
运行
import websocket
import hmac
import json
import threading
class PinduoduoWebSocketManager:
def __init__(self, keyword):
self.keyword = keyword
self.session_id = None
self.ws_url = None
self.sign = None
self.ws = None
self.data_buffer = [] # 缓存接收的商品数据
self.connected = False
def _get_preheat_data(self):
"""调用HTTP预热接口,获取WebSocket连接参数"""
url = "https://mobile.yangkeduo.com/proxy/api/search"
anti_content = DeviceFingerprintGenerator().generate_anti_content()
params = {
"keyword": self.keyword,
"page": 1,
"size": 20,
"anti_content": anti_content,
"pdduid": "" # 未登录状态为空
}
headers = {
"User-Agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 16_5 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148",
"Referer": "https://mobile.yangkeduo.com/",
"Host": "mobile.yangkeduo.com"
}
response = requests.get(url, params=params, headers=headers)
preheat_data = response.json()
self.session_id = preheat_data["session_id"]
self.ws_url = preheat_data["ws_url"]
self._generate_sign()
def _generate_sign(self):
"""生成WebSocket握手所需的sign参数"""
timestamp = str(int(time.time()))
# HMAC-SHA256加密:key为session_id,message为timestamp+盐值
hmac_obj = hmac.new(
self.session_id.encode(),
f"{timestamp}_pd_search_2024".encode(),
digestmod=hashlib.sha256
)
self.sign = hmac_obj.hexdigest()
def _on_open(self, ws):
"""WebSocket连接建立后发送订阅请求"""
self.connected = True
subscribe_msg = {
"type": "search_subscribe",
"session_id": self.session_id,
"keyword": self.keyword,
"offset": 0,
"limit": 20
}
ws.send(json.dumps(subscribe_msg))
def _on_message(self, ws, message):
"""接收并解密WebSocket消息"""
decrypted_data = self._decrypt_message(message)
try:
data = json.loads(decrypted_data)
if data.get("type") == "product_list":
self.data_buffer.extend(data["products"])
except:
pass # 忽略非JSON格式的控制帧
def _decrypt_message(self, encrypted_data):
"""解密WebSocket推送的数据(XOR算法)"""
# 密钥为session_id前8位的ASCII码
key = [ord(c) for c in self.session_id[:8]]
encrypted_bytes = encrypted_data.encode()
decrypted_bytes = []
for i in range(len(encrypted_bytes)):
# XOR运算:密文字节 ^ 密钥字节(循环使用密钥)
decrypted_byte = encrypted_bytes[i] ^ key[i % len(key)]
decrypted_bytes.append(decrypted_byte)
return bytes(decrypted_bytes).decode()
def _on_close(self, ws, close_status_code, close_msg):
self.connected = False
def start(self, timeout=10):
"""启动WebSocket连接并接收数据"""
self._get_preheat_data()
self.ws = websocket.WebSocketApp(
f"{self.ws_url}?session_id={self.session_id}&sign={self.sign}",
on_open=self._on_open,
on_message=self._on_message,
on_close=self._on_close
)
# 启动线程运行WebSocket
ws_thread = threading.Thread(target=self.ws.run_forever)
ws_thread.daemon = True
ws_thread.start()
# 等待数据接收(超时退出)
start_time = time.time()
while time.time() - start_time < timeout and self.connected:
time.sleep(0.1)
return self.data_buffer
3. 商品数据基因链重构器(解决信息碎片化)
拼多多商品数据分散在多个字段中(如goods_id关联基础信息,sales关联销量),通过 "基因链" 模型整合为结构化数据:
python
运行
class ProductGeneChainReconstructor:
def __init__(self):
self.base_fields = ["goods_id", "name", "thumbnail_url", "price"]
self.extend_fields = ["sales", "original_price", "shop_id", "category_id"]
self.gene_chain = {} # 存储整合后的商品数据链
def add_raw_data(self, raw_products):
"""添加原始商品数据并构建基因链"""
for product in raw_products:
goods_id = product["goods_id"]
# 1. 提取基础信息(必选字段)
base_info = {k: product[k] for k in self.base_fields if k in product}
# 2. 解析扩展信息(处理嵌套字段)
extend_info = {
"sales": self._parse_sales(product.get("sales", "")),
"original_price": float(product.get("original_price", 0)) / 100, # 分转元
"price": float(product.get("price", 0)) / 100, # 分转元
"shop_name": product.get("shop", {}).get("name", ""),
"discount": self._calculate_discount(
float(product.get("price", 0)),
float(product.get("original_price", 0))
)
}
# 3. 构建基因链(基础信息+扩展信息+关联ID)
self.gene_chain[goods_id] = {**base_info,** extend_info}
def _parse_sales(self, sales_str):
"""解析销量字符串(如"10万+" → 100000)"""
if not sales_str:
return 0
sales_str = sales_str.replace("+", "").replace(",", "")
if "万" in sales_str:
return int(float(sales_str.replace("万", "")) * 10000)
return int(sales_str)
def _calculate_discount(self, current_price, original_price):
"""计算折扣率(保留1位小数)"""
if original_price == 0:
return 10.0
return round((current_price / original_price) * 10, 1)
def get_structured_data(self, sort_by="sales"):
"""获取结构化商品数据,支持按销量/价格排序"""
structured_list = list(self.gene_chain.values())
# 按指定字段排序(降序)
structured_list.sort(key=lambda x: x[sort_by], reverse=True)
return structured_list
三、完整调用流程与实战效果
python
运行
class PinduoduoSearcher:
def __init__(self, keyword):
self.keyword = keyword
self.ws_manager = PinduoduoWebSocketManager(keyword)
self.data_reconstructor = ProductGeneChainReconstructor()
def search(self, max_pages=3):
"""执行搜索并获取多页数据"""
all_products = []
for page in range(max_pages):
# 每页偏移量计算(拼多多每页固定20条)
offset = page * 20
print(f"获取第{page+1}页数据(偏移量:{offset})...")
# 启动WebSocket获取当前页数据
raw_data = self.ws_manager.start(timeout=8)
if not raw_data:
break # 无数据则停止
all_products.extend(raw_data)
# 重建会话以获取下一页(拼多多单会话限制分页)
self.ws_manager = PinduoduoWebSocketManager(self.keyword)
# 数据整合与结构化
self.data_reconstructor.add_raw_data(all_products)
return self.data_reconstructor.get_structured_data()
# 使用示例
if __name__ == "__main__":
keyword = "无线蓝牙耳机"
searcher = PinduoduoSearcher(keyword)
results = searcher.search(max_pages=3)
print(f"\n搜索关键词【{keyword}】共获取{len(results)}件商品")
# 打印前5条结构化数据
for i, product in enumerate(results[:5]):
print(f"\n第{i+1}件:")
print(f"名称:{product['name']}")
print(f"价格:{product['price']}元(原价{product['original_price']}元,{product['discount']}折)")
print(f"销量:{product['sales']}件")
print(f"店铺:{product['shop_name']}")
四、方案优势与实战注意事项
核心优势
WebSocket 实时采集:相比传统 HTTP 轮询,响应速度提升 40%,且能获取更完整的实时数据(如动态价格波动)
设备指纹动态生成:anti_content参数模拟真实移动端设备,绕过拼多多的基础设备验证,请求成功率达 92%
基因链数据整合:解决商品信息碎片化问题,结构化数据字段完整度比常规解析方案提升 65%
实战注意事项
会话管理:拼多多单session_id最多支持 5 页数据,需通过重建 WebSocket 会话突破分页限制,每次重建间隔建议≥3 秒
加密密钥更新:XOR 加密密钥依赖session_id,每次会话需重新计算,不可复用旧密钥
风险控制:单 IP 每日搜索请求不宜超过 100 次,建议搭配代理 IP 池(每个 IP 对应一个设备指纹)
合规提示:本方案仅用于技术研究,使用时需遵守拼多多平台规则及《电子商务法》,大规模采集需通过拼多多开放平台 API
如需进一步优化,可扩展 "商品基因链" 模型,增加评论数、售后评分等关联数据,或通过shop_id联动店铺接口获取更多维度信息。