基于异步协程与智能解析的大规模影视资源信息聚合Python爬虫实战

引言:影视资源聚合的爬虫技术挑战

在当今数字化娱乐时代,影视资源信息聚合成为用户获取影视内容的重要途径。传统的同步爬虫在应对海量影视网站时面临效率低下、反爬规避困难等问题。本文将深入探讨如何利用Python最新异步协程技术、智能解析算法和分布式架构,构建高效稳定的影视资源聚合爬虫系统。

技术架构概览

核心技术栈

  • 异步框架:aiohttp + asyncio 实现高并发请求

  • 解析引擎:Playwright + BeautifulSoup4 应对动态渲染

  • 智能代理:Rotating proxy pools with automatic retry

  • 数据存储:PostgreSQL + Redis 异步缓存

  • 反反爬策略:请求指纹随机化 + 浏览器特征模拟

完整爬虫系统实现

1. 异步爬虫核心引擎

python

import asyncio import aiohttp import logging from typing import List, Dict, Optional from dataclasses import dataclass from urllib.parse import urljoin, urlparse import random import hashlib from datetime import datetime import json # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) @dataclass class MovieResource: """影视资源数据模型""" title: str alternative_titles: List[str] year: int directors: List[str] actors: List[str] genres: List[str] rating: Optional[float] description: str resources: List[Dict] # 播放资源列表 cover_url: str imdb_id: Optional[str] douban_id: Optional[str] update_time: datetime class AsyncMovieSpider: """异步影视爬虫基类""" def __init__(self, max_concurrent: int = 50, request_timeout: int = 30, use_proxy: bool = True): """ 初始化爬虫 Args: max_concurrent: 最大并发数 request_timeout: 请求超时时间 use_proxy: 是否使用代理 """ self.max_concurrent = max_concurrent self.request_timeout = aiohttp.ClientTimeout(total=request_timeout) self.use_proxy = use_proxy self.session = None self.proxy_pool = self._init_proxy_pool() self.headers_pool = self._init_headers_pool() def _init_proxy_pool(self) -> List[str]: """初始化代理池(实际使用时应从API获取)""" return [ 'http://proxy1.example.com:8080', 'http://proxy2.example.com:8080', # 更多代理... ] def _init_headers_pool(self) -> List[Dict]: """初始化请求头池""" user_agents = [ 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15', 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36', ] return [ { 'User-Agent': ua, 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', 'Accept-Language': 'zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3', 'Accept-Encoding': 'gzip, deflate', 'DNT': '1', 'Connection': 'keep-alive', 'Upgrade-Insecure-Requests': '1', 'Cache-Control': 'max-age=0', 'Sec-Fetch-Dest': 'document', 'Sec-Fetch-Mode': 'navigate', 'Sec-Fetch-Site': 'none', 'Sec-Fetch-User': '?1', } for ua in user_agents ] def _get_random_headers(self) -> Dict: """获取随机请求头""" return random.choice(self.headers_pool) def _get_random_proxy(self) -> Optional[str]: """获取随机代理""" if self.use_proxy and self.proxy_pool: return random.choice(self.proxy_pool) return None async def __aenter__(self): """异步上下文管理器入口""" connector = aiohttp.TCPConnector( limit=self.max_concurrent, ssl=False ) self.session = aiohttp.ClientSession( connector=connector, timeout=self.request_timeout ) return self async def __aexit__(self, exc_type, exc_val, exc_tb): """异步上下文管理器出口""" if self.session: await self.session.close() async def fetch(self, url: str, **kwargs) -> Optional[str]: """ 异步获取页面内容 Args: url: 目标URL **kwargs: 其他请求参数 Returns: 页面HTML内容或None """ headers = self._get_random_headers() proxy = self._get_random_proxy() try: async with self.session.get( url, headers=headers, proxy=proxy, **kwargs ) as response: if response.status == 200: content = await response.text() logger.info(f"成功获取 {url}") return content else: logger.warning(f"请求失败 {url}: 状态码 {response.status}") return None except Exception as e: logger.error(f"请求异常 {url}: {str(e)}") return None async def fetch_multiple(self, urls: List[str]) -> Dict[str, Optional[str]]: """ 并发获取多个页面 Args: urls: URL列表 Returns: 字典映射 URL -> 内容 """ tasks = [] for url in urls: task = asyncio.create_task(self.fetch(url)) tasks.append((url, task)) results = {} for url, task in tasks: try: content = await task results[url] = content except Exception as e: logger.error(f"任务异常 {url}: {str(e)}") results[url] = None return results

2. 智能解析器与动态渲染处理

python

from bs4 import BeautifulSoup import re from playwright.async_api import async_playwright import asyncio from typing import Set, Tuple import hashlib class SmartParser: """智能页面解析器""" def __init__(self): self.title_patterns = [ r'<title[^>]*>(.*?)</title>', r'<h1[^>]*>(.*?)</h1>', r'class=".*?title.*?">(.*?)<', r'id=".*?title.*?">(.*?)<' ] async def parse_with_playwright(self, url: str) -> Optional[str]: """ 使用Playwright处理动态渲染页面 Args: url: 目标URL Returns: 渲染后的HTML """ async with async_playwright() as p: # 启动浏览器(可配置无头模式) browser = await p.chromium.launch( headless=True, args=['--disable-blink-features=AutomationControlled'] ) # 创建上下文 context = await browser.new_context( user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', viewport={'width': 1920, 'height': 1080} ) # 创建页面 page = await context.new_page() try: # 导航到页面 await page.goto(url, wait_until='networkidle') # 等待可能的内容加载 await page.wait_for_timeout(2000) # 获取页面内容 content = await page.content() return content except Exception as e: logger.error(f"Playwright渲染失败 {url}: {str(e)}") return None finally: await browser.close() def extract_movie_info(self, html: str, base_url: str) -> Optional[MovieResource]: """ 从HTML中提取影视信息 Args: html: HTML内容 base_url: 基础URL Returns: MovieResource对象或None """ if not html: return None soup = BeautifulSoup(html, 'lxml') # 尝试多种方式提取标题 title = self._extract_title(soup) # 提取其他信息 year = self._extract_year(soup) directors = self._extract_directors(soup) actors = self._extract_actors(soup) genres = self._extract_genres(soup) rating = self._extract_rating(soup) description = self._extract_description(soup) cover_url = self._extract_cover_url(soup, base_url) # 提取播放资源 resources = self._extract_resources(soup, base_url) return MovieResource( title=title, alternative_titles=[], year=year, directors=directors, actors=actors, genres=genres, rating=rating, description=description, resources=resources, cover_url=cover_url, imdb_id=None, douban_id=None, update_time=datetime.now() ) def _extract_title(self, soup: BeautifulSoup) -> str: """提取标题""" # 尝试从多个位置提取 selectors = [ 'h1.title', '.movie-title', '#title', 'meta[property="og:title"]', 'meta[name="title"]' ] for selector in selectors: element = soup.select_one(selector) if element: title = element.get('content') if element.name == 'meta' else element.text if title and len(title.strip()) > 0: return title.strip() # 回退到页面标题 if soup.title: return soup.title.string.strip() return "未知标题" def _extract_resources(self, soup: BeautifulSoup, base_url: str) -> List[Dict]: """提取播放资源""" resources = [] # 查找可能的资源链接 resource_patterns = [ ('magnet', r'magnet:\?xt=urn:btih:[a-zA-Z0-9]{32,40}'), ('ed2k', r'ed2k://\|file\|.*'), ('thunder', r'thunder://.*'), ('ftp', r'ftp://.*\.(mp4|avi|mkv|rmvb)'), ('http', r'https?://.*\.(mp4|avi|mkv|rmvb|flv)') ] # 扫描所有文本内容 all_text = soup.get_text() for resource_type, pattern in resource_patterns: matches = re.findall(pattern, all_text, re.IGNORECASE) for match in matches: resources.append({ 'type': resource_type, 'url': match, 'quality': self._detect_quality(match), 'source': 'direct' }) # 查找播放按钮 play_selectors = [ 'a[href*="play"]', 'a[href*="video"]', 'button[onclick*="play"]', '.play-btn' ] for selector in play_selectors: elements = soup.select(selector) for element in elements: href = element.get('href') or element.get('onclick', '') if href and ('http' in href or '//' in href): full_url = urljoin(base_url, href) resources.append({ 'type': 'play_link', 'url': full_url, 'quality': 'unknown', 'source': 'player' }) return resources def _detect_quality(self, url: str) -> str: """检测资源质量""" quality_patterns = [ (r'4k|2160p|uhd', '4K'), (r'1080p|fhd', '1080P'), (r'720p|hd', '720P'), (r'480p|sd', '480P'), (r'bdrip|bluray', 'BluRay'), (r'web-dl|webdl', 'WEB-DL'), (r'dvdrip|dvd', 'DVD'), (r'ts|tc|cam', '枪版') ] url_lower = url.lower() for pattern, quality in quality_patterns: if re.search(pattern, url_lower): return quality return '未知'

3. 分布式任务调度与去重

python

import redis import pickle import zlib from abc import ABC, abstractmethod from typing import Any, List import asyncio_redis class DistributedScheduler: """分布式任务调度器""" def __init__(self, redis_url: str = 'redis://localhost:6379/0'): """ 初始化调度器 Args: redis_url: Redis连接URL """ self.redis_url = redis_url self.connection_pool = None self.bloom_filter = None async def connect(self): """连接Redis""" self.connection_pool = await asyncio_redis.Pool.create( host='localhost', port=6379, poolsize=10 ) async def add_url(self, queue_name: str, url: str, priority: int = 0): """ 添加URL到队列 Args: queue_name: 队列名称 url: URL priority: 优先级(越高越优先) """ if not await self.is_duplicate(url): score = priority * 1000000 + int(datetime.now().timestamp()) await self.connection_pool.zadd(queue_name, {url: score}) async def get_url(self, queue_name: str) -> Optional[str]: """ 从队列获取URL Args: queue_name: 队列名称 Returns: URL或None """ results = await self.connection_pool.zpopmax(queue_name, count=1) if results and len(results) > 0: url, _ = results[0] return url.decode() if isinstance(url, bytes) else url return None async def is_duplicate(self, url: str) -> bool: """ 检查URL是否重复 Args: url: 要检查的URL Returns: 是否重复 """ # 使用布隆过滤器进行快速去重 url_hash = hashlib.md5(url.encode()).hexdigest() key = f'url:bloom:{url_hash[:2]}' # 检查是否已存在 exists = await self.connection_pool.getbit(key, int(url_hash[2:4], 16)) if not exists: # 标记为已存在 await self.connection_pool.setbit(key, int(url_hash[2:4], 16), 1) return False return True class MovieAggregationCrawler: """影视聚合爬虫主类""" def __init__(self, start_urls: List[str], max_depth: int = 3, worker_count: int = 10): """ 初始化聚合爬虫 Args: start_urls: 起始URL列表 max_depth: 最大爬取深度 worker_count: 工作协程数量 """ self.start_urls = start_urls self.max_depth = max_depth self.worker_count = worker_count self.spider = AsyncMovieSpider(max_concurrent=worker_count) self.parser = SmartParser() self.scheduler = DistributedScheduler() self.results = [] self.visited_urls = set() async def crawl(self): """开始爬取""" logger.info("开始影视资源聚合爬取") # 连接Redis await self.scheduler.connect() # 添加起始URL for url in self.start_urls: await self.scheduler.add_url('pending_urls', url, priority=10) # 创建工作协程 tasks = [] for i in range(self.worker_count): task = asyncio.create_task(self.worker(f'worker-{i}')) tasks.append(task) # 等待所有任务完成 await asyncio.gather(*tasks) logger.info(f"爬取完成,共获取 {len(self.results)} 个影视资源") return self.results async def worker(self, worker_id: str): """ 工作协程 Args: worker_id: 工作者ID """ logger.info(f"启动工作协程 {worker_id}") while True: try: # 获取待处理URL url = await self.scheduler.get_url('pending_urls') if not url: # 短暂等待后重试 await asyncio.sleep(2) continue # 检查是否已访问 if url in self.visited_urls: continue self.visited_urls.add(url) logger.info(f"{worker_id} 正在处理: {url}") # 获取页面内容 html = await self.spider.fetch(url) if not html: continue # 解析页面 movie_info = self.parser.extract_movie_info(html, url) if movie_info: self.results.append(movie_info) logger.info(f"{worker_id} 成功提取: {movie_info.title}") # 提取新链接并加入队列 new_urls = await self.extract_links(html, url) for new_url in new_urls: if new_url not in self.visited_urls: await self.scheduler.add_url('pending_urls', new_url, priority=1) # 礼貌性延迟 await asyncio.sleep(random.uniform(0.5, 1.5)) except Exception as e: logger.error(f"{worker_id} 发生错误: {str(e)}") await asyncio.sleep(5) # 错误后等待 async def extract_links(self, html: str, base_url: str) -> Set[str]: """ 从HTML中提取链接 Args: html: HTML内容 base_url: 基础URL Returns: 链接集合 """ soup = BeautifulSoup(html, 'lxml') links = set() # 查找所有a标签 for a_tag in soup.find_all('a', href=True): href = a_tag['href'] # 过滤无效链接 if not href or href.startswith(('javascript:', 'mailto:', 'tel:')): continue # 转换为绝对URL full_url = urljoin(base_url, href) # 过滤非HTTP链接和特定扩展名 parsed = urlparse(full_url) if parsed.scheme not in ('http', 'https'): continue # 添加有效链接 links.add(full_url) return links

4. 数据存储与导出模块

python

import asyncpg import pandas as pd from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession from sqlalchemy.orm import declarative_base, sessionmaker from sqlalchemy import Column, Integer, String, Float, Text, JSON, DateTime import csv import json as json_lib from typing import List Base = declarative_base() class MovieResourceDB(Base): """影视资源数据库模型""" __tablename__ = 'movie_resources' id = Column(Integer, primary_key=True) title = Column(String(500), nullable=False, index=True) year = Column(Integer, index=True) directors = Column(JSON) actors = Column(JSON) genres = Column(JSON) rating = Column(Float) description = Column(Text) resources = Column(JSON) cover_url = Column(String(1000)) imdb_id = Column(String(100)) douban_id = Column(String(100)) source_url = Column(String(1000)) created_at = Column(DateTime, default=datetime.now) updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now) class DataStorage: """数据存储管理器""" def __init__(self, db_url: str = 'postgresql+asyncpg://user:password@localhost/movies'): """ 初始化数据存储 Args: db_url: 数据库URL """ self.db_url = db_url self.engine = None self.async_session = None async def initialize(self): """初始化数据库连接""" self.engine = create_async_engine( self.db_url, echo=False, pool_size=20, max_overflow=30 ) self.async_session = sessionmaker( self.engine, class_=AsyncSession, expire_on_commit=False ) # 创建表 async with self.engine.begin() as conn: await conn.run_sync(Base.metadata.create_all) async def save_movie(self, movie: MovieResource, source_url: str) -> int: """ 保存电影资源到数据库 Args: movie: 电影资源对象 source_url: 来源URL Returns: 插入的ID """ async with self.async_session() as session: db_movie = MovieResourceDB( title=movie.title, year=movie.year, directors=movie.directors, actors=movie.actors, genres=movie.genres, rating=movie.rating, description=movie.description, resources=movie.resources, cover_url=movie.cover_url, imdb_id=movie.imdb_id, douban_id=movie.douban_id, source_url=source_url ) session.add(db_movie) await session.commit() await session.refresh(db_movie) return db_movie.id async def batch_save(self, movies: List[MovieResource], source_urls: List[str]): """ 批量保存电影资源 Args: movies: 电影资源列表 source_urls: 来源URL列表 """ async with self.async_session() as session: for movie, source_url in zip(movies, source_urls): db_movie = MovieResourceDB( title=movie.title, year=movie.year, directors=movie.directors, actors=movie.actors, genres=movie.genres, rating=movie.rating, description=movie.description, resources=movie.resources, cover_url=movie.cover_url, imdb_id=movie.imdb_id, douban_id=movie.douban_id, source_url=source_url ) session.add(db_movie) await session.commit() def export_to_csv(self, movies: List[MovieResource], filename: str): """ 导出到CSV文件 Args: movies: 电影资源列表 filename: 文件名 """ data = [] for movie in movies: data.append({ '标题': movie.title, '年份': movie.year, '导演': '、'.join(movie.directors), '演员': '、'.join(movie.actors[:5]), # 只取前5个 '类型': '、'.join(movie.genres), '评分': movie.rating or '无', '简介': movie.description[:100] + '...' if len(movie.description) > 100 else movie.description, '资源数量': len(movie.resources), '封面URL': movie.cover_url }) df = pd.DataFrame(data) df.to_csv(filename, index=False, encoding='utf-8-sig') def export_to_json(self, movies: List[MovieResource], filename: str): """ 导出到JSON文件 Args: movies: 电影资源列表 filename: 文件名 """ data = [] for movie in movies: movie_dict = { 'title': movie.title, 'year': movie.year, 'directors': movie.directors, 'actors': movie.actors, 'genres': movie.genres, 'rating': movie.rating, 'description': movie.description, 'resources': movie.resources, 'cover_url': movie.cover_url, 'imdb_id': movie.imdb_id, 'douban_id': movie.douban_id, 'update_time': movie.update_time.isoformat() } data.append(movie_dict) with open(filename, 'w', encoding='utf-8') as f: json_lib.dump(data, f, ensure_ascii=False, indent=2)

5. 主程序与配置

python

import yaml import signal import sys from contextlib import asynccontextmanager class Config: """配置管理类""" def __init__(self, config_file: str = 'config.yaml'): """ 初始化配置 Args: config_file: 配置文件路径 """ self.config_file = config_file self.config = self.load_config() def load_config(self) -> Dict: """加载配置文件""" default_config = { 'crawler': { 'max_concurrent': 50, 'request_timeout': 30, 'max_depth': 3, 'worker_count': 10, 'use_proxy': True, 'delay_range': [0.5, 2.0] }, 'database': { 'url': 'postgresql+asyncpg://user:password@localhost/movies', 'pool_size': 20, 'max_overflow': 30 }, 'redis': { 'url': 'redis://localhost:6379/0', 'poolsize': 10 }, 'websites': [ 'https://www.example-movie-site1.com', 'https://www.example-movie-site2.com', 'https://www.example-movie-site3.com' ], 'output': { 'csv_path': 'movies.csv', 'json_path': 'movies.json' } } try: with open(self.config_file, 'r', encoding='utf-8') as f: user_config = yaml.safe_load(f) # 合并配置 self.merge_config(default_config, user_config) except FileNotFoundError: logger.warning(f"配置文件 {self.config_file} 不存在,使用默认配置") return default_config def merge_config(self, default: Dict, user: Dict, parent_key: str = ''): """递归合并配置""" for key, value in user.items(): if key in default: if isinstance(value, dict) and isinstance(default[key], dict): self.merge_config(default[key], value, f"{parent_key}.{key}") else: default[key] = value else: default[key] = value @asynccontextmanager async def crawler_lifetime(config: Config): """ 爬虫生命周期管理上下文 Args: config: 配置对象 """ # 初始化组件 storage = DataStorage(config.config['database']['url']) await storage.initialize() crawler = MovieAggregationCrawler( start_urls=config.config['websites'], max_depth=config.config['crawler']['max_depth'], worker_count=config.config['crawler']['worker_count'] ) try: yield crawler, storage finally: # 清理资源 logger.info("正在清理资源...") async def main(): """主函数""" # 加载配置 config = Config('movie_crawler_config.yaml') # 设置信号处理 def signal_handler(signum, frame): logger.info("收到停止信号,正在优雅退出...") sys.exit(0) signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) async with crawler_lifetime(config) as (crawler, storage): try: # 开始爬取 movies = await crawler.crawl() # 保存到数据库 if movies: source_urls = [list(crawler.visited_urls)[i % len(crawler.visited_urls)] for i in range(len(movies))] await storage.batch_save(movies, source_urls) logger.info(f"已保存 {len(movies)} 条记录到数据库") # 导出文件 if movies: storage.export_to_csv(movies, config.config['output']['csv_path']) storage.export_to_json(movies, config.config['output']['json_path']) logger.info(f"已导出数据到文件") # 生成统计报告 generate_report(movies) except Exception as e: logger.error(f"爬虫执行失败: {str(e)}", exc_info=True) def generate_report(movies: List[MovieResource]): """生成统计报告""" if not movies: logger.info("未获取到电影数据") return total_movies = len(movies) total_resources = sum(len(movie.resources) for movie in movies) # 统计年份分布 year_dist = {} for movie in movies: if movie.year: year_dist[movie.year] = year_dist.get(movie.year, 0) + 1 # 统计类型分布 genre_dist = {} for movie in movies: for genre in movie.genres: genre_dist[genre] = genre_dist.get(genre, 0) + 1 # 统计资源类型 resource_type_dist = {} for movie in movies: for resource in movie.resources: rtype = resource.get('type', 'unknown') resource_type_dist[rtype] = resource_type_dist.get(rtype, 0) + 1 logger.info("=" * 50) logger.info("爬取统计报告") logger.info("=" * 50) logger.info(f"总电影数量: {total_movies}") logger.info(f"总资源数量: {total_resources}") logger.info(f"平均资源/电影: {total_resources/total_movies:.2f}") logger.info(f"年份分布 (前10): {dict(sorted(year_dist.items(), key=lambda x: x[1], reverse=True)[:10])}") logger.info(f"类型分布 (前10): {dict(sorted(genre_dist.items(), key=lambda x: x[1], reverse=True)[:10])}") logger.info(f"资源类型分布: {resource_type_dist}") logger.info("=" * 50) if __name__ == "__main__": # 运行主程序 asyncio.run(main())

配置文件示例 (config.yaml)

yaml

crawler: max_concurrent: 100 request_timeout: 60 max_depth: 5 worker_count: 20 use_proxy: true delay_range: [0.3, 1.5] database: url: "postgresql+asyncpg://movie_user:password123@localhost:5432/movie_db" pool_size: 50 max_overflow: 100 redis: url: "redis://localhost:6379/0" poolsize: 20 websites: - "https://www.imdb.com" - "https://www.douban.com" - "https://www.netflix.com" - "https://www.hulu.com" - "https://www.amazon.com/primevideo" output: csv_path: "data/movies_export.csv" json_path: "data/movies_export.json" database_backup: "data/movies_backup.sql" proxy: api_url: "https://proxy-provider.com/api/get" api_key: "your_api_key_here" check_interval: 300

部署与优化建议

1. 容器化部署 (Docker)

dockerfile

# Dockerfile FROM python:3.9-slim WORKDIR /app # 安装系统依赖 RUN apt-get update && apt-get install -y \ wget \ gnupg \ && wget -q -O - https://dl-ssl.google.com/linux/linux_signing_key.pub | apt-key add - \ && echo "deb [arch=amd64] http://dl.google.com/linux/chrome/deb/ stable main" >> /etc/apt/sources.list.d/google.list \ && apt-get update && apt-get install -y \ google-chrome-stable \ fonts-ipafont-gothic \ fonts-wqy-zenhei \ fonts-thai-tlwg \ fonts-kacst \ fonts-freefont-ttf \ libxss1 \ --no-install-recommends \ && rm -rf /var/lib/apt/lists/* # 安装Python依赖 COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt # 复制源代码 COPY . . # 运行爬虫 CMD ["python", "main.py"]

2. 性能优化技巧

  1. 连接池优化:调整数据库和HTTP连接池大小

  2. 缓存策略:使用Redis缓存已解析页面

  3. 智能延迟:根据网站响应动态调整请求间隔

  4. 增量爬取:只爬取更新的内容

  5. 故障转移:实现多个数据源备份

3. 监控与日志

python

# 监控装饰器 def monitor_performance(func): async def wrapper(*args, **kwargs): start_time = time.time() result = await func(*args, **kwargs) end_time = time.time() logger.info(f"{func.__name__} 执行时间: {end_time - start_time:.2f}秒") return result return wrapper # 应用监控 @monitor_performance async def crawl_with_monitoring(): # 爬取逻辑 pass

法律与道德声明

重要提示

  1. 本爬虫代码仅用于技术学习和研究目的

  2. 在实际使用前,请确保:

    • 遵守目标网站的robots.txt协议

    • 尊重网站的服务条款

    • 控制请求频率,避免对目标网站造成负担

    • 仅爬取公开可访问的信息

  3. 不得将本代码用于:

    • 侵犯版权的内容获取

    • 商业盗版资源收集

    • 任何违法活动

总结

本文详细介绍了基于Python最新异步技术的影视资源信息聚合爬虫的实现。该系统采用了多种先进技术:

  1. 异步并发架构:使用asyncio和aiohttp实现高并发请求

  2. 智能解析系统:结合BeautifulSoup和Playwright处理静态和动态页面

  3. 分布式调度:基于Redis的分布式任务队列和去重机制

  4. 数据持久化:支持多种存储后端和导出格式

  5. 容错机制:完善的错误处理和重试逻辑

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/1123305.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

微收付赋能 6000 万实体商家破局转型

在实体行业迭代加速的今天&#xff0c;能沉淀 8 年的品牌愈发珍贵。2016 年&#xff0c;微收付品牌正式创立&#xff0c;从软件开发深耕&#xff0c;到 2024 年广州运营部门的成立&#xff0c;再到 19 家分公司的全国布局&#xff0c;这家搜熊旗下的企业用 8 年时间&#xff0c…

软件I2C总线冲突避免方法:项目应用实例

软件I2C为何总“抽风”&#xff1f;一个真实项目中的总线冲突破局之道你有没有遇到过这种情况&#xff1a;系统明明跑得好好的&#xff0c;突然某个传感器读不到了&#xff0c;OLED屏幕开始花屏&#xff0c;甚至整个I2C总线像死了一样&#xff0c;只能靠复位“续命”&#xff1…

上市公司关键核心技术专利数据(2007-2024)

1824上市公司关键核心技术专利数据&#xff08;2007-2024&#xff09;数据简介企业开展关键核心技术创新面临诸多挑战&#xff0c;主要体现在四个方面&#xff1a;第一&#xff0c;短期与长期的抉择。虽然关键核心技术具有长期价值&#xff0c;但研发周期长、难度大&#xff0c…

用AI自动化生成CONSUL配置管理工具

快速体验 打开 InsCode(快马)平台 https://www.inscode.net输入框内输入如下内容&#xff1a; 创建一个基于CONSUL的微服务配置管理工具&#xff0c;包含服务注册、服务发现、健康检查、KV存储等功能。使用Go语言实现&#xff0c;提供RESTful API接口。要求自动生成完整的项目…

WMT25赛事夺冠模型开源,Hunyuan-MT-7B推动行业进步

Hunyuan-MT-7B&#xff1a;从赛事冠军到开箱即用的翻译引擎 在机器翻译领域&#xff0c;一个长期存在的悖论是&#xff1a;实验室里的顶尖模型&#xff0c;往往难以走出论文&#xff0c;真正服务于真实场景。许多开源模型虽然公布了权重&#xff0c;却要求用户自行搭建推理环境…

效率对比:XART如何将艺术创作时间缩短80%

快速体验 打开 InsCode(快马)平台 https://www.inscode.net输入框内输入如下内容&#xff1a; 开发一个效率对比工具&#xff0c;展示XART与传统编码方式在艺术项目开发中的差异。要求&#xff1a;1&#xff09;提供两种方式实现同一艺术效果的代码量对比&#xff1b;2&#…

【教育观察】一本畅销练习册的25年:揭秘《幼小衔接倒计时99天》如何成为家长心中的“衔接标尺”

作为一名长期观察基础教育领域的记者&#xff0c;我接触过无数的教辅资料&#xff0c;也倾听过众多家长在“幼升小”焦虑期的选择与困惑。在众多产品中&#xff0c;《幼小衔接倒计时99天》 是一个无法忽视的名字。今年&#xff0c;其推出的“25年升级版”再次引发市场关注。它不…

反向海淘的隐藏玩法:你不知道的跨境操作

当我们还在琢磨如何淘到海外好货时&#xff0c;一种逆向操作的跨境购物模式早已悄然崛起 —— 反向海淘。它打破了 “海外商品更吃香” 的固有认知&#xff0c;让中国供应链的高性价比好物通过数字化渠道直达全球消费者&#xff0c;更藏着不少省钱、高效、合规的隐藏玩法&#…

具备远程控制能力的GravityRAT木马攻击Windows、Android和macOS系统

GravityRAT是一种自2016年起就针对政府机构和军事组织的远程访问木马。该恶意软件最初仅针对Windows系统&#xff0c;现已演变为可攻击Windows、Android和macOS系统的跨平台工具。它通过伪造应用程序和精心设计的电子邮件传播&#xff0c;普通用户很难察觉其威胁。恶意软件运作…

企业级Office XML数据处理实战案例

快速体验 打开 InsCode(快马)平台 https://www.inscode.net输入框内输入如下内容&#xff1a; 开发一个企业级数据处理系统&#xff0c;专门处理批量Office 2007 XML格式的财务报表。系统应能自动提取表格数据&#xff0c;进行数据清洗&#xff0c;生成可视化报表&#xff0c…

国际产品本地化提速:Hunyuan-MT-7B处理用户反馈翻译

国际产品本地化提速&#xff1a;Hunyuan-MT-7B处理用户反馈翻译 在跨国业务日益频繁的今天&#xff0c;企业每天都要面对成千上万条来自不同语言背景的用户反馈——从英语差评到阿拉伯语建议&#xff0c;再到藏语的使用困惑。如何快速、准确地理解这些声音&#xff0c;直接决定…

零基础学CMD:用AI助手写出第一个批处理脚本

快速体验 打开 InsCode(快马)平台 https://www.inscode.net输入框内输入如下内容&#xff1a; 生成一个新手教学项目&#xff1a;1) 从最基础的Hello World脚本开始 2) 逐步讲解常用CMD命令&#xff08;echo, dir, copy等&#xff09;3) 提供5个难度递增的练习任务 4) 每个练…

MCP实验操作指南:3大常见错误与正确执行路径详解

第一章&#xff1a;MCP实验题概述与基础准备在分布式系统与并发编程的学习中&#xff0c;MCP&#xff08;Multiple Consumer Producer&#xff09;实验题是理解线程同步、资源共享与任务调度机制的重要实践环节。该实验模拟多个生产者与消费者共享有限缓冲区的场景&#xff0c;…

新工具可移除Windows 11中的Copilot、Recall及其他AI组件,反抗微软数据收集

微软激进地将人工智能功能集成到 Windows 11 的举措&#xff0c;促使开发者创建了 RemoveWindowsAI 开源项目。该项目旨在从操作系统中移除或禁用不需要的 AI 组件。项目概况RemoveWindowsAI 是一个托管在 GitHub 上的社区驱动工具&#xff0c;可让用户对 Windows 11 中的 AI 功…

PyTorch完全入门指南:从安装到第一个程序

快速体验 打开 InsCode(快马)平台 https://www.inscode.net输入框内输入如下内容&#xff1a; 创建一个交互式学习教程&#xff0c;引导用户完成以下步骤&#xff1a;1) 安装PyTorch&#xff1b;2) 了解张量基本操作&#xff1b;3) 实现一个简单的线性回归模型。教程应采用问…

为什么顶尖企业都在抢有MCP认证的云原生开发者?(行业趋势深度解读)

第一章&#xff1a;MCP云原生开发认证的行业价值在当前企业加速向云原生架构转型的背景下&#xff0c;MCP&#xff08;Microsoft Certified Professional&#xff09;云原生开发认证已成为衡量开发者技术能力的重要标准。该认证不仅验证了开发者在Azure平台上构建、部署和管理云…

JSON零基础入门:从菜鸟到熟练只需30分钟

快速体验 打开 InsCode(快马)平台 https://www.inscode.net输入框内输入如下内容&#xff1a; 开发一个交互式JSON学习应用&#xff0c;包含&#xff1a;1.基础概念动画讲解 2.实时练习环境 3.渐进式难度示例 4.即时错误反馈 5.学习进度跟踪。要求界面友好&#xff0c;使用大…

为什么需要 Auto Scaling详细介绍

一、为什么需要 Auto Scaling&#xff08;背景&#xff09; 1️⃣ 高可用 ≠ 高扩展 多一台服务器 → 提高可用性&#xff08;Availability&#xff09; 流量暴增 → 仍可能因为容量不足而宕机 所以要解决的是 容量问题&#xff08;Scalability&#xff09; 二、两种系统架构对…

【MCP Azure虚拟机部署终极指南】:掌握高效部署的5大核心步骤与避坑策略

第一章&#xff1a;MCP Azure虚拟机部署概述Azure 虚拟机&#xff08;Virtual Machine&#xff09;是微软云平台提供的核心计算服务之一&#xff0c;支持快速部署和扩展 Windows 或 Linux 操作系统的实例。在 MCP&#xff08;Microsoft Certified Professional&#xff09;认证…

快速验证:用GERBER文件检查PCB设计可行性

快速体验 打开 InsCode(快马)平台 https://www.inscode.net输入框内输入如下内容&#xff1a; 开发一个PCB设计快速验证工具&#xff0c;要求&#xff1a;1. 从AD文件一键生成简化版GERBER 2. 提供3D板级可视化预览 3. 自动检测最小线宽、间距等DFM问题 4. 生成可制造性评估报…