引言:数据驱动下的餐饮行业洞察
在数字化时代,餐厅点评数据已成为餐饮行业的重要资产。从消费者行为分析到竞争情报,从口碑管理到趋势预测,这些数据蕴含着巨大的商业价值。本文将详细介绍如何使用Python爬虫技术,结合最新的自动化工具Selenium和Playwright,高效、稳定地采集餐厅点评数据。
技术选型:为什么选择Selenium与Playwright?
Selenium:成熟的Web自动化框架
Selenium是业界最知名的Web自动化测试工具,支持多种浏览器和编程语言。它的优势在于:
成熟的社区支持和丰富的文档
真实的浏览器环境,能够执行JavaScript
支持多种定位策略(XPath、CSS选择器等)
Playwright:微软出品的现代化方案
Playwright是Microsoft开发的浏览器自动化库,具有以下特点:
支持Chromium、Firefox和WebKit三大引擎
自动等待机制,减少代码中的显式等待
强大的网络拦截和模拟功能
比Selenium更快的执行速度
项目架构设计
python
""" 餐厅点评数据采集系统架构 模块化设计,便于维护和扩展 """ import asyncio import json import logging import random import time from datetime import datetime from typing import Dict, List, Optional, Any # 第三方库 from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.common.exceptions import TimeoutException, NoSuchElementException from playwright.async_api import async_playwright import pandas as pd from bs4 import BeautifulSoup import undetected_chromedriver as uc # 用于绕过反爬 # 数据库相关 import sqlalchemy as sa from sqlalchemy.orm import declarative_base, sessionmaker
配置与日志系统
python
# 配置类 class Config: """爬虫配置管理""" def __init__(self): # 目标网站配置 self.target_sites = { 'dianping': { 'base_url': 'https://www.dianping.com', 'search_url': 'https://www.dianping.com/search/keyword/{city_id}_{keyword}', 'max_pages': 50 }, 'meituan': { 'base_url': 'https://www.meituan.com', 'search_url': 'https://sz.meituan.com/meishi/{keyword}/', 'max_pages': 100 } } # 爬虫配置 self.request_delay = random.uniform(1, 3) # 请求延迟 self.timeout = 30 # 超时时间 self.headless = False # 是否无头模式 self.max_retries = 3 # 最大重试次数 # 代理配置 self.proxy_pool = [ 'http://proxy1:port', 'http://proxy2:port', ] # 用户代理池 self.user_agents = [ 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36', 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36' ] # 日志配置 def setup_logger(): """配置日志系统""" logger = logging.getLogger('restaurant_crawler') logger.setLevel(logging.INFO) # 文件处理器 file_handler = logging.FileHandler( f'restaurant_crawler_{datetime.now().strftime("%Y%m%d")}.log', encoding='utf-8' ) file_handler.setLevel(logging.INFO) # 控制台处理器 console_handler = logging.StreamHandler() console_handler.setLevel(logging.INFO) # 格式化 formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) file_handler.setFormatter(formatter) console_handler.setFormatter(formatter) logger.addHandler(file_handler) logger.addHandler(console_handler) return logger logger = setup_logger()数据库模型设计
python
# SQLAlchemy 数据模型 Base = declarative_base() class Restaurant(Base): """餐厅基本信息""" __tablename__ = 'restaurants' id = sa.Column(sa.String(100), primary_key=True) # 平台ID platform = sa.Column(sa.String(50)) # 平台名称 name = sa.Column(sa.String(200)) address = sa.Column(sa.String(500)) phone = sa.Column(sa.String(50)) avg_price = sa.Column(sa.Float) avg_rating = sa.Column(sa.Float) review_count = sa.Column(sa.Integer) category = sa.Column(sa.String(100)) city = sa.Column(sa.String(50)) latitude = sa.Column(sa.Float) longitude = sa.Column(sa.Float) created_at = sa.Column(sa.DateTime, default=datetime.now) updated_at = sa.Column(sa.DateTime, default=datetime.now, onupdate=datetime.now) class Review(Base): """用户评价""" __tablename__ = 'reviews' id = sa.Column(sa.Integer, primary_key=True, autoincrement=True) restaurant_id = sa.Column(sa.String(100), sa.ForeignKey('restaurants.id')) user_id = sa.Column(sa.String(100)) user_name = sa.Column(sa.String(100)) rating = sa.Column(sa.Float) content = sa.Column(sa.Text) review_time = sa.Column(sa.DateTime) like_count = sa.Column(sa.Integer) reply_count = sa.Column(sa.Integer) photos = sa.Column(sa.JSON) # 存储照片URL列表 created_at = sa.Column(sa.DateTime, default=datetime.now) # 数据库连接 def init_database(connection_string='sqlite:///restaurant_data.db'): """初始化数据库""" engine = sa.create_engine(connection_string) Base.metadata.create_all(engine) Session = sessionmaker(bind=engine) return Session()核心爬虫类实现
1. 基于Selenium的爬虫
python
class SeleniumCrawler: """使用Selenium的爬虫实现""" def __init__(self, config: Config): self.config = config self.driver = None self.session = init_database() def init_driver(self): """初始化浏览器驱动""" options = webdriver.ChromeOptions() # 反反爬虫配置 options.add_argument('--disable-blink-features=AutomationControlled') options.add_experimental_option("excludeSwitches", ["enable-automation"]) options.add_experimental_option('useAutomationExtension', False) # 随机用户代理 options.add_argument(f'user-agent={random.choice(self.config.user_agents)}') # 其他配置 if self.config.headless: options.add_argument('--headless') options.add_argument('--disable-gpu') options.add_argument('--no-sandbox') options.add_argument('--disable-dev-shm-usage') options.add_argument('--window-size=1920,1080') # 使用undetected-chromedriver绕过检测 self.driver = uc.Chrome(options=options) # 执行CDP命令,绕过自动化检测 self.driver.execute_cdp_cmd('Page.addScriptToEvaluateOnNewDocument', { 'source': ''' Object.defineProperty(navigator, 'webdriver', { get: () => undefined }); ''' }) # 设置隐式等待 self.driver.implicitly_wait(self.config.timeout) logger.info("Selenium驱动初始化完成") def crawl_dianping_restaurants(self, city_id: int, keyword: str): """爬取大众点评餐厅列表""" try: url = self.config.target_sites['dianping']['search_url'].format( city_id=city_id, keyword=keyword ) self.driver.get(url) time.sleep(self.config.request_delay) restaurants = [] page_num = 1 max_pages = self.config.target_sites['dianping']['max_pages'] while page_num <= max_pages: logger.info(f"正在爬取第 {page_num} 页") # 等待内容加载 wait = WebDriverWait(self.driver, 20) wait.until( EC.presence_of_element_located((By.CLASS_NAME, "shop-list")) ) # 解析餐厅列表 soup = BeautifulSoup(self.driver.page_source, 'html.parser') shop_items = soup.find_all('div', {'class': 'shop-list'}) for item in shop_items: restaurant_data = self._parse_dianping_shop(item) if restaurant_data: restaurants.append(restaurant_data) self._save_restaurant(restaurant_data) # 检查是否有下一页 try: next_btn = self.driver.find_element( By.CSS_SELECTOR, '.next:not([style*="display: none"])' ) if next_btn.is_enabled(): next_btn.click() page_num += 1 time.sleep(random.uniform(2, 4)) else: break except NoSuchElementException: logger.info("已到达最后一页") break return restaurants except Exception as e: logger.error(f"爬取过程中出错: {str(e)}") return [] def _parse_dianping_shop(self, shop_item) -> Optional[Dict]: """解析单个餐厅信息""" try: # 提取餐厅ID shop_link = shop_item.find('a', {'data-click-name': 'shop_title_click'}) shop_id = shop_link['href'].split('/')[-1] if shop_link else None # 餐厅名称 name_tag = shop_item.find('h4') name = name_tag.get_text(strip=True) if name_tag else None # 评分 rating_tag = shop_item.find('span', {'class': 'sml-rank-stars'}) rating = float(rating_tag['title'].replace('星', '')) if rating_tag else None # 评价数量 review_tag = shop_item.find('span', {'class': 'review-num'}) review_count = int(review_tag.b.get_text()) if review_tag else 0 # 人均价格 price_tag = shop_item.find('span', {'class': 'mean-price'}) avg_price = price_tag.get_text(strip=True) if price_tag else None # 地址 addr_tag = shop_item.find('span', {'class': 'addr'}) address = addr_tag.get_text(strip=True) if addr_tag else None return { 'id': f'dianping_{shop_id}', 'platform': 'dianping', 'name': name, 'rating': rating, 'review_count': review_count, 'avg_price': avg_price, 'address': address, 'source_url': f'https://www.dianping.com/shop/{shop_id}' } except Exception as e: logger.warning(f"解析餐厅信息失败: {str(e)}") return None def crawl_restaurant_reviews(self, shop_id: str): """爬取餐厅的详细评价""" try: review_url = f'https://www.dianping.com/shop/{shop_id}/review_all' self.driver.get(review_url) time.sleep(self.config.request_delay) reviews = [] page_num = 1 while True: logger.info(f"爬取餐厅 {shop_id} 的评价第 {page_num} 页") # 等待评价加载 wait = WebDriverWait(self.driver, 20) wait.until( EC.presence_of_element_located((By.CLASS_NAME, "reviews-items")) ) # 解析评价 soup = BeautifulSoup(self.driver.page_source, 'html.parser') review_items = soup.find_all('div', {'class': 'main-review'}) for item in review_items: review_data = self._parse_dianping_review(item, shop_id) if review_data: reviews.append(review_data) self._save_review(review_data) # 滚动加载更多 self.driver.execute_script("window.scrollTo(0, document.body.scrollHeight);") time.sleep(2) # 检查是否有更多内容 if "没有更多了" in self.driver.page_source: logger.info("评价已全部加载完毕") break page_num += 1 if page_num > 100: # 安全限制 break return reviews except Exception as e: logger.error(f"爬取评价失败: {str(e)}") return [] def _parse_dianping_review(self, review_item, shop_id: str) -> Optional[Dict]: """解析单个评价""" try: # 用户信息 user_tag = review_item.find('a', {'class': 'name'}) user_name = user_tag.get_text(strip=True) if user_tag else "匿名用户" user_id = user_tag['href'].split('/')[-1] if user_tag else None # 评分 rating_tag = review_item.find('span', {'class': 'sml-rank-stars'}) rating = float(rating_tag['title'].replace('星', '')) if rating_tag else None # 评价内容 content_tag = review_item.find('div', {'class': 'review-words'}) content = content_tag.get_text(strip=True) if content_tag else "" # 评价时间 time_tag = review_item.find('span', {'class': 'time'}) review_time = datetime.strptime( time_tag.get_text(strip=True), '%Y-%m-%d %H:%M' ) if time_tag else datetime.now() # 点赞数 like_tag = review_item.find('a', {'class': 'zan'}) like_count = int(like_tag.get_text(strip=True)) if like_tag else 0 return { 'restaurant_id': f'dianping_{shop_id}', 'user_id': user_id, 'user_name': user_name, 'rating': rating, 'content': content, 'review_time': review_time, 'like_count': like_count, 'platform': 'dianping' } except Exception as e: logger.warning(f"解析评价失败: {str(e)}") return None def _save_restaurant(self, data: Dict): """保存餐厅数据到数据库""" try: restaurant = Restaurant(**data) self.session.merge(restaurant) # 使用merge实现upsert self.session.commit() logger.info(f"保存餐厅数据: {data['name']}") except Exception as e: logger.error(f"保存餐厅数据失败: {str(e)}") self.session.rollback() def _save_review(self, data: Dict): """保存评价数据到数据库""" try: review = Review(**data) self.session.add(review) self.session.commit() logger.info(f"保存评价数据: {data['user_name']}") except Exception as e: logger.error(f"保存评价数据失败: {str(e)}") self.session.rollback() def close(self): """清理资源""" if self.driver: self.driver.quit() self.session.close()2. 基于Playwright的异步爬虫
python
class PlaywrightCrawler: """使用Playwright的异步爬虫""" def __init__(self, config: Config): self.config = config self.playwright = None self.browser = None self.context = None async def init_browser(self): """初始化Playwright浏览器""" self.playwright = await async_playwright().start() # 启动浏览器 self.browser = await self.playwright.chromium.launch( headless=self.config.headless, args=[ '--disable-blink-features=AutomationControlled', '--no-sandbox', f'--user-agent={random.choice(self.config.user_agents)}' ] ) # 创建上下文 self.context = await self.browser.new_context( viewport={'width': 1920, 'height': 1080}, user_agent=random.choice(self.config.user_agents) ) # 设置cookie和本地存储 await self.context.add_init_script(""" Object.defineProperty(navigator, 'webdriver', { get: () => undefined }); """) logger.info("Playwright浏览器初始化完成") async def crawl_meituan_restaurants(self, city: str, keyword: str): """爬取美团餐厅数据""" try: page = await self.context.new_page() # 导航到搜索页面 url = f'https://{city}.meituan.com/meishi/{keyword}/' await page.goto(url, wait_until='networkidle') restaurants = [] page_num = 1 while page_num <= self.config.target_sites['meituan']['max_pages']: logger.info(f"正在爬取美团第 {page_num} 页") # 等待餐厅列表加载 await page.wait_for_selector('.poi-tile', timeout=20000) # 获取餐厅列表 restaurant_items = await page.query_selector_all('.poi-tile') for item in restaurant_items: restaurant_data = await self._parse_meituan_restaurant(item) if restaurant_data: restaurants.append(restaurant_data) # 检查是否有下一页 next_button = await page.query_selector('a[aria-label="下一页"]') if next_button and await next_button.is_enabled(): await next_button.click() await page.wait_for_timeout(random.randint(2000, 4000)) page_num += 1 else: break await page.close() return restaurants except Exception as e: logger.error(f"爬取美团数据失败: {str(e)}") return [] async def _parse_meituan_restaurant(self, item) -> Optional[Dict]: """解析美团餐厅信息""" try: # 获取餐厅ID link_element = await item.query_selector('a[data-poi]') if not link_element: return None data_poi = await link_element.get_attribute('data-poi') poi_data = json.loads(data_oi) if data_poi else {} # 餐厅名称 name_element = await item.query_selector('.title') name = await name_element.inner_text() if name_element else None # 评分 rating_element = await item.query_selector('.star .star-num') rating_text = await rating_element.inner_text() if rating_element else None rating = float(rating_text) if rating_text else None # 评价数量 review_element = await item.query_selector('.comment') review_text = await review_element.inner_text() if review_element else '' review_count = int(review_text.replace('条评价', '')) if '条评价' in review_text else 0 # 人均价格 price_element = await item.query_selector('.mean-price') price_text = await price_element.inner_text() if price_element else '' avg_price = float(price_text.replace('人均¥', '')) if '人均¥' in price_text else None return { 'id': f'meituan_{poi_data.get("id", "")}', 'platform': 'meituan', 'name': name, 'rating': rating, 'review_count': review_count, 'avg_price': avg_price, 'address': poi_data.get('address', ''), 'latitude': poi_data.get('lat'), 'longitude': poi_data.get('lng'), 'source_url': f'https://www.meituan.com/meishi/{poi_data.get("id", "")}/' } except Exception as e: logger.warning(f"解析美团餐厅失败: {str(e)}") return None async def crawl_with_parallel(self, tasks: List): """并行爬取多个任务""" results = await asyncio.gather(*tasks, return_exceptions=True) return results async def close(self): """清理资源""" if self.context: await self.context.close() if self.browser: await self.browser.close() if self.playwright: await self.playwright.stop()反爬虫策略与应对方案
python
class AntiAntiCrawler: """反反爬虫策略""" @staticmethod def rotate_user_agent(): """轮换用户代理""" user_agents = [ 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36', 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36', 'Mozilla/5.0 (iPhone; CPU iPhone OS 14_0 like Mac OS X) AppleWebKit/537.36', ] return random.choice(user_agents) @staticmethod def random_delay(min_seconds=1, max_seconds=5): """随机延迟""" time.sleep(random.uniform(min_seconds, max_seconds)) @staticmethod def simulate_human_behavior(driver): """模拟人类行为""" # 随机滚动 scroll_height = driver.execute_script("return document.body.scrollHeight") scroll_times = random.randint(1, 3) for _ in range(scroll_times): scroll_to = random.randint(0, scroll_height) driver.execute_script(f"window.scrollTo(0, {scroll_to});") time.sleep(random.uniform(0.5, 2)) # 随机鼠标移动 action = webdriver.ActionChains(driver) for _ in range(random.randint(2, 5)): x_offset = random.randint(-100, 100) y_offset = random.randint(-100, 100) action.move_by_offset(x_offset, y_offset).perform() time.sleep(random.uniform(0.1, 0.5)) @staticmethod def use_proxy_rotation(): """代理轮换""" proxies = [ 'http://proxy1.example.com:8080', 'http://proxy2.example.com:8080', 'http://proxy3.example.com:8080', ] return random.choice(proxies)数据存储与导出
python
class DataExporter: """数据导出工具""" def __init__(self, session): self.session = session def export_to_csv(self, filename='restaurant_data.csv'): """导出为CSV文件""" try: # 查询数据 query = self.session.query(Restaurant) df = pd.read_sql(query.statement, self.session.bind) # 保存为CSV df.to_csv(filename, index=False, encoding='utf-8-sig') logger.info(f"数据已导出到 {filename}") except Exception as e: logger.error(f"导出CSV失败: {str(e)}") def export_to_excel(self, filename='restaurant_data.xlsx'): """导出为Excel文件""" try: # 查询餐厅数据 restaurants_query = self.session.query(Restaurant) restaurants_df = pd.read_sql(restaurants_query.statement, self.session.bind) # 查询评价数据 reviews_query = self.session.query(Review) reviews_df = pd.read_sql(reviews_query.statement, self.session.bind) # 写入Excel with pd.ExcelWriter(filename, engine='openpyxl') as writer: restaurants_df.to_excel(writer, sheet_name='餐厅信息', index=False) reviews_df.to_excel(writer, sheet_name='用户评价', index=False) logger.info(f"数据已导出到 {filename}") except Exception as e: logger.error(f"导出Excel失败: {str(e)}") def export_to_json(self, filename='restaurant_data.json'): """导出为JSON文件""" try: data = { 'restaurants': [], 'reviews': [] } # 获取餐厅数据 restaurants = self.session.query(Restaurant).all() for r in restaurants: data['restaurants'].append({ 'name': r.name, 'address': r.address, 'rating': r.avg_rating, 'price': r.avg_price, 'review_count': r.review_count }) # 获取评价数据 reviews = self.session.query(Review).limit(1000).all() # 限制数量 for rev in reviews: data['reviews'].append({ 'user': rev.user_name, 'rating': rev.rating, 'content': rev.content[:200], # 截断内容 'time': rev.review_time.isoformat() if rev.review_time else None }) # 写入JSON with open(filename, 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=2) logger.info(f"数据已导出到 {filename}") except Exception as e: logger.error(f"导出JSON失败: {str(e)}")主程序与调度系统
python
class CrawlerScheduler: """爬虫调度器""" def __init__(self): self.config = Config() self.selenium_crawler = None self.playwright_crawler = None self.session = init_database() def run_selenium_crawler(self): """运行Selenium爬虫""" logger.info("启动Selenium爬虫...") try: self.selenium_crawler = SeleniumCrawler(self.config) self.selenium_crawler.init_driver() # 爬取大众点评数据(示例:上海火锅) logger.info("开始爬取大众点评数据...") restaurants = self.selenium_crawler.crawl_dianping_restaurants( city_id=1, # 上海的城市ID keyword='火锅' ) logger.info(f"成功爬取 {len(restaurants)} 家餐厅") # 随机选择几家餐厅爬取详细评价 sample_size = min(5, len(restaurants)) sample_restaurants = random.sample(restaurants, sample_size) for restaurant in sample_restaurants: shop_id = restaurant['id'].replace('dianping_', '') logger.info(f"爬取餐厅 {restaurant['name']} 的评价...") reviews = self.selenium_crawler.crawl_restaurant_reviews(shop_id) logger.info(f"爬取到 {len(reviews)} 条评价") # 随机延迟,避免请求过于频繁 time.sleep(random.uniform(3, 6)) except Exception as e: logger.error(f"Selenium爬虫运行失败: {str(e)}") finally: if self.selenium_crawler: self.selenium_crawler.close() async def run_playwright_crawler(self): """运行Playwright爬虫""" logger.info("启动Playwright爬虫...") try: self.playwright_crawler = PlaywrightCrawler(self.config) await self.playwright_crawler.init_browser() # 爬取美团数据(示例:深圳粤菜) logger.info("开始爬取美团数据...") restaurants = await self.playwright_crawler.crawl_meituan_restaurants( city='sz', # 深圳 keyword='粤菜' ) logger.info(f"成功爬取 {len(restaurants)} 家餐厅") # 保存数据到数据库 for restaurant in restaurants: try: restaurant_obj = Restaurant(**restaurant) self.session.merge(restaurant_obj) except Exception as e: logger.warning(f"保存餐厅数据失败: {str(e)}") self.session.commit() except Exception as e: logger.error(f"Playwright爬虫运行失败: {str(e)}") finally: if self.playwright_crawler: await self.playwright_crawler.close() def export_data(self): """导出所有数据""" logger.info("开始导出数据...") exporter = DataExporter(self.session) # 导出为多种格式 exporter.export_to_csv() exporter.export_to_excel() exporter.export_to_json() logger.info("数据导出完成") def run(self, use_selenium=True, use_playwright=True): """主运行方法""" try: if use_selenium: self.run_selenium_crawler() if use_playwright: # 运行异步爬虫 asyncio.run(self.run_playwright_crawler()) # 导出数据 self.export_data() logger.info("爬虫任务全部完成!") except KeyboardInterrupt: logger.info("用户中断爬虫任务") except Exception as e: logger.error(f"爬虫任务失败: {str(e)}") finally: self.session.close() # 运行示例 if __name__ == "__main__": # 创建调度器并运行 scheduler = CrawlerScheduler() # 配置运行参数 scheduler.config.headless = True # 生产环境建议使用无头模式 scheduler.config.request_delay = 2 # 增加请求延迟 # 运行爬虫 scheduler.run( use_selenium=True, use_playwright=True )高级功能与优化
1. 分布式爬虫架构
python
import redis import pickle from multiprocessing import Process, Queue class DistributedCrawler: """分布式爬虫管理器""" def __init__(self, num_workers=4): self.num_workers = num_workers self.task_queue = Queue() self.result_queue = Queue() self.redis_client = redis.Redis(host='localhost', port=6379, db=0) def create_tasks(self, cities, keywords): """创建爬虫任务""" for city in cities: for keyword in keywords: task = { 'city': city, 'keyword': keyword, 'platform': 'dianping' # 或 'meituan' } self.task_queue.put(task) # 添加结束标记 for _ in range(self.num_workers): self.task_queue.put(None) def worker(self, worker_id): """工作进程""" crawler = SeleniumCrawler(Config()) crawler.init_driver() while True: task = self.task_queue.get() if task is None: break try: logger.info(f"Worker {worker_id} 处理任务: {task}") if task['platform'] == 'dianping': results = crawler.crawl_dianping_restaurants( city_id=task['city'], keyword=task['keyword'] ) else: # 处理其他平台 pass # 存储结果到Redis result_key = f"result:{worker_id}:{time.time()}" self.redis_client.setex( result_key, 3600, # 1小时过期 pickle.dumps(results) ) except Exception as e: logger.error(f"Worker {worker_id} 任务失败: {str(e)}") crawler.close() def run_distributed(self): """运行分布式爬虫""" processes = [] # 启动工作进程 for i in range(self.num_workers): p = Process(target=self.worker, args=(i,)) p.start() processes.append(p) # 等待所有进程完成 for p in processes: p.join()2. 数据质量监控
python
class DataQualityMonitor: """数据质量监控""" @staticmethod def check_completeness(df, required_columns): """检查数据完整性""" missing_columns = [col for col in required_columns if col not in df.columns] if missing_columns: logger.warning(f"缺失列: {missing_columns}") return False return True @staticmethod def check_consistency(df, column_rules): """检查数据一致性""" violations = [] for column, rule in column_rules.items(): if column in df.columns: if rule == 'not_null': null_count = df[column].isnull().sum() if null_count > 0: violations.append(f"{column}: {null_count} 个空值") elif isinstance(rule, tuple) and rule[0] == 'range': min_val, max_val = rule[1], rule[2] out_of_range = ((df[column] < min_val) | (df[column] > max_val)).sum() if out_of_range > 0: violations.append(f"{column}: {out_of_range} 个值超出范围") if violations: logger.warning(f"数据一致性警告: {violations}") return False return True @staticmethod def generate_quality_report(df, dataset_name): """生成质量报告""" report = { 'dataset': dataset_name, 'total_records': len(df), 'columns': list(df.columns), 'missing_values': df.isnull().sum().to_dict(), 'data_types': df.dtypes.astype(str).to_dict(), 'basic_stats': {} } # 数值列的基本统计 numeric_cols = df.select_dtypes(include=['int64', 'float64']).columns for col in numeric_cols: report['basic_stats'][col] = { 'mean': float(df[col].mean()), 'std': float(df[col].std()), 'min': float(df[col].min()), 'max': float(df[col].max()) } return report部署与维护建议
1. 容器化部署
dockerfile
# Dockerfile FROM python:3.9-slim # 安装系统依赖 RUN apt-get update && apt-get install -y \ wget \ gnupg \ unzip \ curl \ && rm -rf /var/lib/apt/lists/* # 安装Chrome RUN wget -q -O - https://dl-ssl.google.com/linux/linux_signing_key.pub | apt-key add - \ && echo "deb http://dl.google.com/linux/chrome/deb/ stable main" >> /etc/apt/sources.list.d/google.list \ && apt-get update \ && apt-get install -y google-chrome-stable # 设置工作目录 WORKDIR /app # 复制依赖文件 COPY requirements.txt . # 安装Python依赖 RUN pip install --no-cache-dir -r requirements.txt # 复制应用代码 COPY . . # 创建数据目录 RUN mkdir -p /app/data # 运行爬虫 CMD ["python", "main.py"]
2. 定时任务配置
python
# scheduler.py import schedule import time from datetime import datetime def daily_crawl_job(): """每日爬虫任务""" logger.info(f"开始每日爬虫任务: {datetime.now()}") scheduler = CrawlerScheduler() scheduler.config.headless = True # 爬取不同分类 categories = ['火锅', '川菜', '日料', '西餐', '烧烤'] for category in categories: logger.info(f"爬取分类: {category}") try: # 这里可以添加具体的爬取逻辑 pass except Exception as e: logger.error(f"爬取 {category} 失败: {str(e)}") time.sleep(60) # 分类间延迟 logger.info(f"每日爬虫任务完成: {datetime.now()}") # 设置定时任务 schedule.every().day.at("02:00").do(daily_crawl_job) # 每天凌晨2点运行 if __name__ == "__main__": logger.info("定时爬虫调度器启动...") while True: schedule.run_pending() time.sleep(60) # 每分钟检查一次总结与最佳实践
通过本文的详细介绍,我们构建了一个完整的餐厅点评数据采集系统。以下是一些关键总结:
技术要点总结:
双引擎支持:结合Selenium和Playwright的优势,适应不同场景
反爬虫策略:使用多种技术绕过网站防护
数据质量:实现完整的数据验证和清洗流程
可扩展架构:支持分布式部署和任务调度
最佳实践建议:
遵守robots.txt:尊重网站的爬虫政策
限制请求频率:避免对目标网站造成过大压力
数据脱敏:处理个人隐私信息
错误恢复:实现健壮的错误处理机制
监控告警:建立完整的监控体系