Python爬虫实战:使用Selenium与Playwright高效采集餐厅点评数据

引言:数据驱动下的餐饮行业洞察

在数字化时代,餐厅点评数据已成为餐饮行业的重要资产。从消费者行为分析到竞争情报,从口碑管理到趋势预测,这些数据蕴含着巨大的商业价值。本文将详细介绍如何使用Python爬虫技术,结合最新的自动化工具Selenium和Playwright,高效、稳定地采集餐厅点评数据。

技术选型:为什么选择Selenium与Playwright?

Selenium:成熟的Web自动化框架

Selenium是业界最知名的Web自动化测试工具,支持多种浏览器和编程语言。它的优势在于:

  • 成熟的社区支持和丰富的文档

  • 真实的浏览器环境,能够执行JavaScript

  • 支持多种定位策略(XPath、CSS选择器等)

Playwright:微软出品的现代化方案

Playwright是Microsoft开发的浏览器自动化库,具有以下特点:

  • 支持Chromium、Firefox和WebKit三大引擎

  • 自动等待机制,减少代码中的显式等待

  • 强大的网络拦截和模拟功能

  • 比Selenium更快的执行速度

项目架构设计

python

""" 餐厅点评数据采集系统架构 模块化设计,便于维护和扩展 """ import asyncio import json import logging import random import time from datetime import datetime from typing import Dict, List, Optional, Any # 第三方库 from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.common.exceptions import TimeoutException, NoSuchElementException from playwright.async_api import async_playwright import pandas as pd from bs4 import BeautifulSoup import undetected_chromedriver as uc # 用于绕过反爬 # 数据库相关 import sqlalchemy as sa from sqlalchemy.orm import declarative_base, sessionmaker

配置与日志系统

python

# 配置类 class Config: """爬虫配置管理""" def __init__(self): # 目标网站配置 self.target_sites = { 'dianping': { 'base_url': 'https://www.dianping.com', 'search_url': 'https://www.dianping.com/search/keyword/{city_id}_{keyword}', 'max_pages': 50 }, 'meituan': { 'base_url': 'https://www.meituan.com', 'search_url': 'https://sz.meituan.com/meishi/{keyword}/', 'max_pages': 100 } } # 爬虫配置 self.request_delay = random.uniform(1, 3) # 请求延迟 self.timeout = 30 # 超时时间 self.headless = False # 是否无头模式 self.max_retries = 3 # 最大重试次数 # 代理配置 self.proxy_pool = [ 'http://proxy1:port', 'http://proxy2:port', ] # 用户代理池 self.user_agents = [ 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36', 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36' ] # 日志配置 def setup_logger(): """配置日志系统""" logger = logging.getLogger('restaurant_crawler') logger.setLevel(logging.INFO) # 文件处理器 file_handler = logging.FileHandler( f'restaurant_crawler_{datetime.now().strftime("%Y%m%d")}.log', encoding='utf-8' ) file_handler.setLevel(logging.INFO) # 控制台处理器 console_handler = logging.StreamHandler() console_handler.setLevel(logging.INFO) # 格式化 formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) file_handler.setFormatter(formatter) console_handler.setFormatter(formatter) logger.addHandler(file_handler) logger.addHandler(console_handler) return logger logger = setup_logger()

数据库模型设计

python

# SQLAlchemy 数据模型 Base = declarative_base() class Restaurant(Base): """餐厅基本信息""" __tablename__ = 'restaurants' id = sa.Column(sa.String(100), primary_key=True) # 平台ID platform = sa.Column(sa.String(50)) # 平台名称 name = sa.Column(sa.String(200)) address = sa.Column(sa.String(500)) phone = sa.Column(sa.String(50)) avg_price = sa.Column(sa.Float) avg_rating = sa.Column(sa.Float) review_count = sa.Column(sa.Integer) category = sa.Column(sa.String(100)) city = sa.Column(sa.String(50)) latitude = sa.Column(sa.Float) longitude = sa.Column(sa.Float) created_at = sa.Column(sa.DateTime, default=datetime.now) updated_at = sa.Column(sa.DateTime, default=datetime.now, onupdate=datetime.now) class Review(Base): """用户评价""" __tablename__ = 'reviews' id = sa.Column(sa.Integer, primary_key=True, autoincrement=True) restaurant_id = sa.Column(sa.String(100), sa.ForeignKey('restaurants.id')) user_id = sa.Column(sa.String(100)) user_name = sa.Column(sa.String(100)) rating = sa.Column(sa.Float) content = sa.Column(sa.Text) review_time = sa.Column(sa.DateTime) like_count = sa.Column(sa.Integer) reply_count = sa.Column(sa.Integer) photos = sa.Column(sa.JSON) # 存储照片URL列表 created_at = sa.Column(sa.DateTime, default=datetime.now) # 数据库连接 def init_database(connection_string='sqlite:///restaurant_data.db'): """初始化数据库""" engine = sa.create_engine(connection_string) Base.metadata.create_all(engine) Session = sessionmaker(bind=engine) return Session()

核心爬虫类实现

1. 基于Selenium的爬虫

python

class SeleniumCrawler: """使用Selenium的爬虫实现""" def __init__(self, config: Config): self.config = config self.driver = None self.session = init_database() def init_driver(self): """初始化浏览器驱动""" options = webdriver.ChromeOptions() # 反反爬虫配置 options.add_argument('--disable-blink-features=AutomationControlled') options.add_experimental_option("excludeSwitches", ["enable-automation"]) options.add_experimental_option('useAutomationExtension', False) # 随机用户代理 options.add_argument(f'user-agent={random.choice(self.config.user_agents)}') # 其他配置 if self.config.headless: options.add_argument('--headless') options.add_argument('--disable-gpu') options.add_argument('--no-sandbox') options.add_argument('--disable-dev-shm-usage') options.add_argument('--window-size=1920,1080') # 使用undetected-chromedriver绕过检测 self.driver = uc.Chrome(options=options) # 执行CDP命令,绕过自动化检测 self.driver.execute_cdp_cmd('Page.addScriptToEvaluateOnNewDocument', { 'source': ''' Object.defineProperty(navigator, 'webdriver', { get: () => undefined }); ''' }) # 设置隐式等待 self.driver.implicitly_wait(self.config.timeout) logger.info("Selenium驱动初始化完成") def crawl_dianping_restaurants(self, city_id: int, keyword: str): """爬取大众点评餐厅列表""" try: url = self.config.target_sites['dianping']['search_url'].format( city_id=city_id, keyword=keyword ) self.driver.get(url) time.sleep(self.config.request_delay) restaurants = [] page_num = 1 max_pages = self.config.target_sites['dianping']['max_pages'] while page_num <= max_pages: logger.info(f"正在爬取第 {page_num} 页") # 等待内容加载 wait = WebDriverWait(self.driver, 20) wait.until( EC.presence_of_element_located((By.CLASS_NAME, "shop-list")) ) # 解析餐厅列表 soup = BeautifulSoup(self.driver.page_source, 'html.parser') shop_items = soup.find_all('div', {'class': 'shop-list'}) for item in shop_items: restaurant_data = self._parse_dianping_shop(item) if restaurant_data: restaurants.append(restaurant_data) self._save_restaurant(restaurant_data) # 检查是否有下一页 try: next_btn = self.driver.find_element( By.CSS_SELECTOR, '.next:not([style*="display: none"])' ) if next_btn.is_enabled(): next_btn.click() page_num += 1 time.sleep(random.uniform(2, 4)) else: break except NoSuchElementException: logger.info("已到达最后一页") break return restaurants except Exception as e: logger.error(f"爬取过程中出错: {str(e)}") return [] def _parse_dianping_shop(self, shop_item) -> Optional[Dict]: """解析单个餐厅信息""" try: # 提取餐厅ID shop_link = shop_item.find('a', {'data-click-name': 'shop_title_click'}) shop_id = shop_link['href'].split('/')[-1] if shop_link else None # 餐厅名称 name_tag = shop_item.find('h4') name = name_tag.get_text(strip=True) if name_tag else None # 评分 rating_tag = shop_item.find('span', {'class': 'sml-rank-stars'}) rating = float(rating_tag['title'].replace('星', '')) if rating_tag else None # 评价数量 review_tag = shop_item.find('span', {'class': 'review-num'}) review_count = int(review_tag.b.get_text()) if review_tag else 0 # 人均价格 price_tag = shop_item.find('span', {'class': 'mean-price'}) avg_price = price_tag.get_text(strip=True) if price_tag else None # 地址 addr_tag = shop_item.find('span', {'class': 'addr'}) address = addr_tag.get_text(strip=True) if addr_tag else None return { 'id': f'dianping_{shop_id}', 'platform': 'dianping', 'name': name, 'rating': rating, 'review_count': review_count, 'avg_price': avg_price, 'address': address, 'source_url': f'https://www.dianping.com/shop/{shop_id}' } except Exception as e: logger.warning(f"解析餐厅信息失败: {str(e)}") return None def crawl_restaurant_reviews(self, shop_id: str): """爬取餐厅的详细评价""" try: review_url = f'https://www.dianping.com/shop/{shop_id}/review_all' self.driver.get(review_url) time.sleep(self.config.request_delay) reviews = [] page_num = 1 while True: logger.info(f"爬取餐厅 {shop_id} 的评价第 {page_num} 页") # 等待评价加载 wait = WebDriverWait(self.driver, 20) wait.until( EC.presence_of_element_located((By.CLASS_NAME, "reviews-items")) ) # 解析评价 soup = BeautifulSoup(self.driver.page_source, 'html.parser') review_items = soup.find_all('div', {'class': 'main-review'}) for item in review_items: review_data = self._parse_dianping_review(item, shop_id) if review_data: reviews.append(review_data) self._save_review(review_data) # 滚动加载更多 self.driver.execute_script("window.scrollTo(0, document.body.scrollHeight);") time.sleep(2) # 检查是否有更多内容 if "没有更多了" in self.driver.page_source: logger.info("评价已全部加载完毕") break page_num += 1 if page_num > 100: # 安全限制 break return reviews except Exception as e: logger.error(f"爬取评价失败: {str(e)}") return [] def _parse_dianping_review(self, review_item, shop_id: str) -> Optional[Dict]: """解析单个评价""" try: # 用户信息 user_tag = review_item.find('a', {'class': 'name'}) user_name = user_tag.get_text(strip=True) if user_tag else "匿名用户" user_id = user_tag['href'].split('/')[-1] if user_tag else None # 评分 rating_tag = review_item.find('span', {'class': 'sml-rank-stars'}) rating = float(rating_tag['title'].replace('星', '')) if rating_tag else None # 评价内容 content_tag = review_item.find('div', {'class': 'review-words'}) content = content_tag.get_text(strip=True) if content_tag else "" # 评价时间 time_tag = review_item.find('span', {'class': 'time'}) review_time = datetime.strptime( time_tag.get_text(strip=True), '%Y-%m-%d %H:%M' ) if time_tag else datetime.now() # 点赞数 like_tag = review_item.find('a', {'class': 'zan'}) like_count = int(like_tag.get_text(strip=True)) if like_tag else 0 return { 'restaurant_id': f'dianping_{shop_id}', 'user_id': user_id, 'user_name': user_name, 'rating': rating, 'content': content, 'review_time': review_time, 'like_count': like_count, 'platform': 'dianping' } except Exception as e: logger.warning(f"解析评价失败: {str(e)}") return None def _save_restaurant(self, data: Dict): """保存餐厅数据到数据库""" try: restaurant = Restaurant(**data) self.session.merge(restaurant) # 使用merge实现upsert self.session.commit() logger.info(f"保存餐厅数据: {data['name']}") except Exception as e: logger.error(f"保存餐厅数据失败: {str(e)}") self.session.rollback() def _save_review(self, data: Dict): """保存评价数据到数据库""" try: review = Review(**data) self.session.add(review) self.session.commit() logger.info(f"保存评价数据: {data['user_name']}") except Exception as e: logger.error(f"保存评价数据失败: {str(e)}") self.session.rollback() def close(self): """清理资源""" if self.driver: self.driver.quit() self.session.close()

2. 基于Playwright的异步爬虫

python

class PlaywrightCrawler: """使用Playwright的异步爬虫""" def __init__(self, config: Config): self.config = config self.playwright = None self.browser = None self.context = None async def init_browser(self): """初始化Playwright浏览器""" self.playwright = await async_playwright().start() # 启动浏览器 self.browser = await self.playwright.chromium.launch( headless=self.config.headless, args=[ '--disable-blink-features=AutomationControlled', '--no-sandbox', f'--user-agent={random.choice(self.config.user_agents)}' ] ) # 创建上下文 self.context = await self.browser.new_context( viewport={'width': 1920, 'height': 1080}, user_agent=random.choice(self.config.user_agents) ) # 设置cookie和本地存储 await self.context.add_init_script(""" Object.defineProperty(navigator, 'webdriver', { get: () => undefined }); """) logger.info("Playwright浏览器初始化完成") async def crawl_meituan_restaurants(self, city: str, keyword: str): """爬取美团餐厅数据""" try: page = await self.context.new_page() # 导航到搜索页面 url = f'https://{city}.meituan.com/meishi/{keyword}/' await page.goto(url, wait_until='networkidle') restaurants = [] page_num = 1 while page_num <= self.config.target_sites['meituan']['max_pages']: logger.info(f"正在爬取美团第 {page_num} 页") # 等待餐厅列表加载 await page.wait_for_selector('.poi-tile', timeout=20000) # 获取餐厅列表 restaurant_items = await page.query_selector_all('.poi-tile') for item in restaurant_items: restaurant_data = await self._parse_meituan_restaurant(item) if restaurant_data: restaurants.append(restaurant_data) # 检查是否有下一页 next_button = await page.query_selector('a[aria-label="下一页"]') if next_button and await next_button.is_enabled(): await next_button.click() await page.wait_for_timeout(random.randint(2000, 4000)) page_num += 1 else: break await page.close() return restaurants except Exception as e: logger.error(f"爬取美团数据失败: {str(e)}") return [] async def _parse_meituan_restaurant(self, item) -> Optional[Dict]: """解析美团餐厅信息""" try: # 获取餐厅ID link_element = await item.query_selector('a[data-poi]') if not link_element: return None data_poi = await link_element.get_attribute('data-poi') poi_data = json.loads(data_oi) if data_poi else {} # 餐厅名称 name_element = await item.query_selector('.title') name = await name_element.inner_text() if name_element else None # 评分 rating_element = await item.query_selector('.star .star-num') rating_text = await rating_element.inner_text() if rating_element else None rating = float(rating_text) if rating_text else None # 评价数量 review_element = await item.query_selector('.comment') review_text = await review_element.inner_text() if review_element else '' review_count = int(review_text.replace('条评价', '')) if '条评价' in review_text else 0 # 人均价格 price_element = await item.query_selector('.mean-price') price_text = await price_element.inner_text() if price_element else '' avg_price = float(price_text.replace('人均¥', '')) if '人均¥' in price_text else None return { 'id': f'meituan_{poi_data.get("id", "")}', 'platform': 'meituan', 'name': name, 'rating': rating, 'review_count': review_count, 'avg_price': avg_price, 'address': poi_data.get('address', ''), 'latitude': poi_data.get('lat'), 'longitude': poi_data.get('lng'), 'source_url': f'https://www.meituan.com/meishi/{poi_data.get("id", "")}/' } except Exception as e: logger.warning(f"解析美团餐厅失败: {str(e)}") return None async def crawl_with_parallel(self, tasks: List): """并行爬取多个任务""" results = await asyncio.gather(*tasks, return_exceptions=True) return results async def close(self): """清理资源""" if self.context: await self.context.close() if self.browser: await self.browser.close() if self.playwright: await self.playwright.stop()

反爬虫策略与应对方案

python

class AntiAntiCrawler: """反反爬虫策略""" @staticmethod def rotate_user_agent(): """轮换用户代理""" user_agents = [ 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36', 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36', 'Mozilla/5.0 (iPhone; CPU iPhone OS 14_0 like Mac OS X) AppleWebKit/537.36', ] return random.choice(user_agents) @staticmethod def random_delay(min_seconds=1, max_seconds=5): """随机延迟""" time.sleep(random.uniform(min_seconds, max_seconds)) @staticmethod def simulate_human_behavior(driver): """模拟人类行为""" # 随机滚动 scroll_height = driver.execute_script("return document.body.scrollHeight") scroll_times = random.randint(1, 3) for _ in range(scroll_times): scroll_to = random.randint(0, scroll_height) driver.execute_script(f"window.scrollTo(0, {scroll_to});") time.sleep(random.uniform(0.5, 2)) # 随机鼠标移动 action = webdriver.ActionChains(driver) for _ in range(random.randint(2, 5)): x_offset = random.randint(-100, 100) y_offset = random.randint(-100, 100) action.move_by_offset(x_offset, y_offset).perform() time.sleep(random.uniform(0.1, 0.5)) @staticmethod def use_proxy_rotation(): """代理轮换""" proxies = [ 'http://proxy1.example.com:8080', 'http://proxy2.example.com:8080', 'http://proxy3.example.com:8080', ] return random.choice(proxies)

数据存储与导出

python

class DataExporter: """数据导出工具""" def __init__(self, session): self.session = session def export_to_csv(self, filename='restaurant_data.csv'): """导出为CSV文件""" try: # 查询数据 query = self.session.query(Restaurant) df = pd.read_sql(query.statement, self.session.bind) # 保存为CSV df.to_csv(filename, index=False, encoding='utf-8-sig') logger.info(f"数据已导出到 {filename}") except Exception as e: logger.error(f"导出CSV失败: {str(e)}") def export_to_excel(self, filename='restaurant_data.xlsx'): """导出为Excel文件""" try: # 查询餐厅数据 restaurants_query = self.session.query(Restaurant) restaurants_df = pd.read_sql(restaurants_query.statement, self.session.bind) # 查询评价数据 reviews_query = self.session.query(Review) reviews_df = pd.read_sql(reviews_query.statement, self.session.bind) # 写入Excel with pd.ExcelWriter(filename, engine='openpyxl') as writer: restaurants_df.to_excel(writer, sheet_name='餐厅信息', index=False) reviews_df.to_excel(writer, sheet_name='用户评价', index=False) logger.info(f"数据已导出到 {filename}") except Exception as e: logger.error(f"导出Excel失败: {str(e)}") def export_to_json(self, filename='restaurant_data.json'): """导出为JSON文件""" try: data = { 'restaurants': [], 'reviews': [] } # 获取餐厅数据 restaurants = self.session.query(Restaurant).all() for r in restaurants: data['restaurants'].append({ 'name': r.name, 'address': r.address, 'rating': r.avg_rating, 'price': r.avg_price, 'review_count': r.review_count }) # 获取评价数据 reviews = self.session.query(Review).limit(1000).all() # 限制数量 for rev in reviews: data['reviews'].append({ 'user': rev.user_name, 'rating': rev.rating, 'content': rev.content[:200], # 截断内容 'time': rev.review_time.isoformat() if rev.review_time else None }) # 写入JSON with open(filename, 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=2) logger.info(f"数据已导出到 {filename}") except Exception as e: logger.error(f"导出JSON失败: {str(e)}")

主程序与调度系统

python

class CrawlerScheduler: """爬虫调度器""" def __init__(self): self.config = Config() self.selenium_crawler = None self.playwright_crawler = None self.session = init_database() def run_selenium_crawler(self): """运行Selenium爬虫""" logger.info("启动Selenium爬虫...") try: self.selenium_crawler = SeleniumCrawler(self.config) self.selenium_crawler.init_driver() # 爬取大众点评数据(示例:上海火锅) logger.info("开始爬取大众点评数据...") restaurants = self.selenium_crawler.crawl_dianping_restaurants( city_id=1, # 上海的城市ID keyword='火锅' ) logger.info(f"成功爬取 {len(restaurants)} 家餐厅") # 随机选择几家餐厅爬取详细评价 sample_size = min(5, len(restaurants)) sample_restaurants = random.sample(restaurants, sample_size) for restaurant in sample_restaurants: shop_id = restaurant['id'].replace('dianping_', '') logger.info(f"爬取餐厅 {restaurant['name']} 的评价...") reviews = self.selenium_crawler.crawl_restaurant_reviews(shop_id) logger.info(f"爬取到 {len(reviews)} 条评价") # 随机延迟,避免请求过于频繁 time.sleep(random.uniform(3, 6)) except Exception as e: logger.error(f"Selenium爬虫运行失败: {str(e)}") finally: if self.selenium_crawler: self.selenium_crawler.close() async def run_playwright_crawler(self): """运行Playwright爬虫""" logger.info("启动Playwright爬虫...") try: self.playwright_crawler = PlaywrightCrawler(self.config) await self.playwright_crawler.init_browser() # 爬取美团数据(示例:深圳粤菜) logger.info("开始爬取美团数据...") restaurants = await self.playwright_crawler.crawl_meituan_restaurants( city='sz', # 深圳 keyword='粤菜' ) logger.info(f"成功爬取 {len(restaurants)} 家餐厅") # 保存数据到数据库 for restaurant in restaurants: try: restaurant_obj = Restaurant(**restaurant) self.session.merge(restaurant_obj) except Exception as e: logger.warning(f"保存餐厅数据失败: {str(e)}") self.session.commit() except Exception as e: logger.error(f"Playwright爬虫运行失败: {str(e)}") finally: if self.playwright_crawler: await self.playwright_crawler.close() def export_data(self): """导出所有数据""" logger.info("开始导出数据...") exporter = DataExporter(self.session) # 导出为多种格式 exporter.export_to_csv() exporter.export_to_excel() exporter.export_to_json() logger.info("数据导出完成") def run(self, use_selenium=True, use_playwright=True): """主运行方法""" try: if use_selenium: self.run_selenium_crawler() if use_playwright: # 运行异步爬虫 asyncio.run(self.run_playwright_crawler()) # 导出数据 self.export_data() logger.info("爬虫任务全部完成!") except KeyboardInterrupt: logger.info("用户中断爬虫任务") except Exception as e: logger.error(f"爬虫任务失败: {str(e)}") finally: self.session.close() # 运行示例 if __name__ == "__main__": # 创建调度器并运行 scheduler = CrawlerScheduler() # 配置运行参数 scheduler.config.headless = True # 生产环境建议使用无头模式 scheduler.config.request_delay = 2 # 增加请求延迟 # 运行爬虫 scheduler.run( use_selenium=True, use_playwright=True )

高级功能与优化

1. 分布式爬虫架构

python

import redis import pickle from multiprocessing import Process, Queue class DistributedCrawler: """分布式爬虫管理器""" def __init__(self, num_workers=4): self.num_workers = num_workers self.task_queue = Queue() self.result_queue = Queue() self.redis_client = redis.Redis(host='localhost', port=6379, db=0) def create_tasks(self, cities, keywords): """创建爬虫任务""" for city in cities: for keyword in keywords: task = { 'city': city, 'keyword': keyword, 'platform': 'dianping' # 或 'meituan' } self.task_queue.put(task) # 添加结束标记 for _ in range(self.num_workers): self.task_queue.put(None) def worker(self, worker_id): """工作进程""" crawler = SeleniumCrawler(Config()) crawler.init_driver() while True: task = self.task_queue.get() if task is None: break try: logger.info(f"Worker {worker_id} 处理任务: {task}") if task['platform'] == 'dianping': results = crawler.crawl_dianping_restaurants( city_id=task['city'], keyword=task['keyword'] ) else: # 处理其他平台 pass # 存储结果到Redis result_key = f"result:{worker_id}:{time.time()}" self.redis_client.setex( result_key, 3600, # 1小时过期 pickle.dumps(results) ) except Exception as e: logger.error(f"Worker {worker_id} 任务失败: {str(e)}") crawler.close() def run_distributed(self): """运行分布式爬虫""" processes = [] # 启动工作进程 for i in range(self.num_workers): p = Process(target=self.worker, args=(i,)) p.start() processes.append(p) # 等待所有进程完成 for p in processes: p.join()

2. 数据质量监控

python

class DataQualityMonitor: """数据质量监控""" @staticmethod def check_completeness(df, required_columns): """检查数据完整性""" missing_columns = [col for col in required_columns if col not in df.columns] if missing_columns: logger.warning(f"缺失列: {missing_columns}") return False return True @staticmethod def check_consistency(df, column_rules): """检查数据一致性""" violations = [] for column, rule in column_rules.items(): if column in df.columns: if rule == 'not_null': null_count = df[column].isnull().sum() if null_count > 0: violations.append(f"{column}: {null_count} 个空值") elif isinstance(rule, tuple) and rule[0] == 'range': min_val, max_val = rule[1], rule[2] out_of_range = ((df[column] < min_val) | (df[column] > max_val)).sum() if out_of_range > 0: violations.append(f"{column}: {out_of_range} 个值超出范围") if violations: logger.warning(f"数据一致性警告: {violations}") return False return True @staticmethod def generate_quality_report(df, dataset_name): """生成质量报告""" report = { 'dataset': dataset_name, 'total_records': len(df), 'columns': list(df.columns), 'missing_values': df.isnull().sum().to_dict(), 'data_types': df.dtypes.astype(str).to_dict(), 'basic_stats': {} } # 数值列的基本统计 numeric_cols = df.select_dtypes(include=['int64', 'float64']).columns for col in numeric_cols: report['basic_stats'][col] = { 'mean': float(df[col].mean()), 'std': float(df[col].std()), 'min': float(df[col].min()), 'max': float(df[col].max()) } return report

部署与维护建议

1. 容器化部署

dockerfile

# Dockerfile FROM python:3.9-slim # 安装系统依赖 RUN apt-get update && apt-get install -y \ wget \ gnupg \ unzip \ curl \ && rm -rf /var/lib/apt/lists/* # 安装Chrome RUN wget -q -O - https://dl-ssl.google.com/linux/linux_signing_key.pub | apt-key add - \ && echo "deb http://dl.google.com/linux/chrome/deb/ stable main" >> /etc/apt/sources.list.d/google.list \ && apt-get update \ && apt-get install -y google-chrome-stable # 设置工作目录 WORKDIR /app # 复制依赖文件 COPY requirements.txt . # 安装Python依赖 RUN pip install --no-cache-dir -r requirements.txt # 复制应用代码 COPY . . # 创建数据目录 RUN mkdir -p /app/data # 运行爬虫 CMD ["python", "main.py"]

2. 定时任务配置

python

# scheduler.py import schedule import time from datetime import datetime def daily_crawl_job(): """每日爬虫任务""" logger.info(f"开始每日爬虫任务: {datetime.now()}") scheduler = CrawlerScheduler() scheduler.config.headless = True # 爬取不同分类 categories = ['火锅', '川菜', '日料', '西餐', '烧烤'] for category in categories: logger.info(f"爬取分类: {category}") try: # 这里可以添加具体的爬取逻辑 pass except Exception as e: logger.error(f"爬取 {category} 失败: {str(e)}") time.sleep(60) # 分类间延迟 logger.info(f"每日爬虫任务完成: {datetime.now()}") # 设置定时任务 schedule.every().day.at("02:00").do(daily_crawl_job) # 每天凌晨2点运行 if __name__ == "__main__": logger.info("定时爬虫调度器启动...") while True: schedule.run_pending() time.sleep(60) # 每分钟检查一次

总结与最佳实践

通过本文的详细介绍,我们构建了一个完整的餐厅点评数据采集系统。以下是一些关键总结:

技术要点总结:

  1. 双引擎支持:结合Selenium和Playwright的优势,适应不同场景

  2. 反爬虫策略:使用多种技术绕过网站防护

  3. 数据质量:实现完整的数据验证和清洗流程

  4. 可扩展架构:支持分布式部署和任务调度

最佳实践建议:

  1. 遵守robots.txt:尊重网站的爬虫政策

  2. 限制请求频率:避免对目标网站造成过大压力

  3. 数据脱敏:处理个人隐私信息

  4. 错误恢复:实现健壮的错误处理机制

  5. 监控告警:建立完整的监控体系

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/1123316.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

动态仿生机制优化算法无人机应用【附代码】

✅ 博主简介&#xff1a;擅长数据搜集与处理、建模仿真、程序设计、仿真代码、论文写作与指导&#xff0c;毕业论文、期刊论文经验交流。✅成品或者定制&#xff0c;扫描文章底部微信二维码。(1) 混合动态仿生优化算法的机理与改进 随着无人机任务环境的日益复杂&#xff0c;单…

如何在7天内完成MCP混合架构性能调优?:一线专家总结的紧急应对方案

第一章&#xff1a;MCP混合架构性能调优的核心挑战在现代分布式系统中&#xff0c;MCP&#xff08;Microservices Containerization Platform&#xff09;混合架构已成为主流部署模式。尽管该架构提升了系统的可扩展性与部署灵活性&#xff0c;但在实际性能调优过程中仍面临多…

AI学生福利:免费领取Hunyuan-MT-7B算力Token用于学习

AI学生福利&#xff1a;免费领取Hunyuan-MT-7B算力Token用于学习 在人工智能加速渗透教育领域的今天&#xff0c;一个现实问题依然困扰着许多学生和一线教师&#xff1a;如何在没有专业背景、缺乏高性能设备的情况下&#xff0c;真正“动手”体验前沿大模型的能力&#xff1f;…

零基础入门:NVIDIA Profile Inspector使用全图解

快速体验 打开 InsCode(快马)平台 https://www.inscode.net输入框内输入如下内容&#xff1a; 开发一个交互式NVIDIA Profile Inspector学习应用&#xff0c;功能&#xff1a;1. 分步骤图文指导安装和使用 2. 常见参数解释和设置建议 3. 内置安全检测防止错误设置 4. 提供模拟…

少数民族语言翻译难?Hunyuan-MT-7B给出工业级解决方案

少数民族语言翻译难&#xff1f;Hunyuan-MT-7B给出工业级解决方案 在全球化与数字化深度交织的今天&#xff0c;信息流动的速度几乎定义了社会运行的效率。但当我们谈论“无障碍沟通”时&#xff0c;往往默认的是英语、中文、西班牙语这类主流语言之间的互译。而在中国广袤的西…

TCP-BBR拥塞控制算法公平性优化【附代码】

✅ 博主简介&#xff1a;擅长数据搜集与处理、建模仿真、程序设计、仿真代码、论文写作与指导&#xff0c;毕业论文、期刊论文经验交流。✅成品或者定制&#xff0c;扫描文章底部微信二维码。(1) BBR算法公平性问题的根源分析与流体模型构建** TCP-BBR&#xff08;Bottleneck B…

突然被公司通知降薪,怎么办?

见字如面&#xff0c;我是军哥&#xff01;一位读者昨天晚上和我说&#xff0c;公司要求全员降薪&#xff0c;只领基本工资5000块&#xff0c;一时很慌&#xff0c;问我怎么办&#xff1f;公司后续还会有什么其他招数&#xff1f;他的焦虑&#xff0c;隔着屏幕都能感受到。我完…

Hunyuan-MT-7B支持CUDA还是ROCm?GPU兼容性全面测试

Hunyuan-MT-7B支持CUDA还是ROCm&#xff1f;GPU兼容性全面测试 在AI基础设施日益多元化的今天&#xff0c;一个看似简单的问题却常常困扰着部署工程师&#xff1a;我手里的GPU能不能跑这个模型&#xff1f; 尤其当企业面临国产化替代、算力成本优化或异构集群调度时&#xf…

[20页中英文PDF]生物制药企业新一代知识管理:用知识图谱+大模型构建“第二大脑“

Pharma Knowledge Management: Building a "Second Brain" with AI 文章摘要 药物研发正面临知识爆炸的挑战。本文深入探讨如何利用大语言模型和知识图谱技术构建企业级"第二大脑"&#xff0c;将分散的科研数据、文献和隐性知识整合为可搜索的知识库&…

Qwen3Guard-Gen-8B模型在在线教育答题系统中的防作弊设计

Qwen3Guard-Gen-8B模型在在线教育答题系统中的防作弊设计 如今&#xff0c;在线教育平台正以前所未有的速度融入教学流程——从课后答疑到模拟考试&#xff0c;AI驱动的智能助手几乎无处不在。但随之而来的问题也愈发尖锐&#xff1a;学生是否正在利用大模型“越狱”式提问&…

基于异步协程与智能解析的大规模影视资源信息聚合Python爬虫实战

引言&#xff1a;影视资源聚合的爬虫技术挑战在当今数字化娱乐时代&#xff0c;影视资源信息聚合成为用户获取影视内容的重要途径。传统的同步爬虫在应对海量影视网站时面临效率低下、反爬规避困难等问题。本文将深入探讨如何利用Python最新异步协程技术、智能解析算法和分布式…

微收付赋能 6000 万实体商家破局转型

在实体行业迭代加速的今天&#xff0c;能沉淀 8 年的品牌愈发珍贵。2016 年&#xff0c;微收付品牌正式创立&#xff0c;从软件开发深耕&#xff0c;到 2024 年广州运营部门的成立&#xff0c;再到 19 家分公司的全国布局&#xff0c;这家搜熊旗下的企业用 8 年时间&#xff0c…

软件I2C总线冲突避免方法:项目应用实例

软件I2C为何总“抽风”&#xff1f;一个真实项目中的总线冲突破局之道你有没有遇到过这种情况&#xff1a;系统明明跑得好好的&#xff0c;突然某个传感器读不到了&#xff0c;OLED屏幕开始花屏&#xff0c;甚至整个I2C总线像死了一样&#xff0c;只能靠复位“续命”&#xff1…

上市公司关键核心技术专利数据(2007-2024)

1824上市公司关键核心技术专利数据&#xff08;2007-2024&#xff09;数据简介企业开展关键核心技术创新面临诸多挑战&#xff0c;主要体现在四个方面&#xff1a;第一&#xff0c;短期与长期的抉择。虽然关键核心技术具有长期价值&#xff0c;但研发周期长、难度大&#xff0c…

用AI自动化生成CONSUL配置管理工具

快速体验 打开 InsCode(快马)平台 https://www.inscode.net输入框内输入如下内容&#xff1a; 创建一个基于CONSUL的微服务配置管理工具&#xff0c;包含服务注册、服务发现、健康检查、KV存储等功能。使用Go语言实现&#xff0c;提供RESTful API接口。要求自动生成完整的项目…

WMT25赛事夺冠模型开源,Hunyuan-MT-7B推动行业进步

Hunyuan-MT-7B&#xff1a;从赛事冠军到开箱即用的翻译引擎 在机器翻译领域&#xff0c;一个长期存在的悖论是&#xff1a;实验室里的顶尖模型&#xff0c;往往难以走出论文&#xff0c;真正服务于真实场景。许多开源模型虽然公布了权重&#xff0c;却要求用户自行搭建推理环境…

效率对比:XART如何将艺术创作时间缩短80%

快速体验 打开 InsCode(快马)平台 https://www.inscode.net输入框内输入如下内容&#xff1a; 开发一个效率对比工具&#xff0c;展示XART与传统编码方式在艺术项目开发中的差异。要求&#xff1a;1&#xff09;提供两种方式实现同一艺术效果的代码量对比&#xff1b;2&#…

【教育观察】一本畅销练习册的25年:揭秘《幼小衔接倒计时99天》如何成为家长心中的“衔接标尺”

作为一名长期观察基础教育领域的记者&#xff0c;我接触过无数的教辅资料&#xff0c;也倾听过众多家长在“幼升小”焦虑期的选择与困惑。在众多产品中&#xff0c;《幼小衔接倒计时99天》 是一个无法忽视的名字。今年&#xff0c;其推出的“25年升级版”再次引发市场关注。它不…

反向海淘的隐藏玩法:你不知道的跨境操作

当我们还在琢磨如何淘到海外好货时&#xff0c;一种逆向操作的跨境购物模式早已悄然崛起 —— 反向海淘。它打破了 “海外商品更吃香” 的固有认知&#xff0c;让中国供应链的高性价比好物通过数字化渠道直达全球消费者&#xff0c;更藏着不少省钱、高效、合规的隐藏玩法&#…

具备远程控制能力的GravityRAT木马攻击Windows、Android和macOS系统

GravityRAT是一种自2016年起就针对政府机构和军事组织的远程访问木马。该恶意软件最初仅针对Windows系统&#xff0c;现已演变为可攻击Windows、Android和macOS系统的跨平台工具。它通过伪造应用程序和精心设计的电子邮件传播&#xff0c;普通用户很难察觉其威胁。恶意软件运作…