#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Amazon爬取结果分析工具 用于统计和分析爬取到的JSON数据 """ import json from pathlib import Path from typing import Dict, List from collections import defaultdict import logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) class ResultAnalyzer: """结果分析器""" def __init__(self, results_dir: str = "amazon_results"): """ 初始化分析器 Args: results_dir: 结果目录路径 """ self.results_dir = Path(results_dir) if not self.results_dir.exists(): logger.error(f"结果目录不存在: {self.results_dir}") raise FileNotFoundError(f"Directory not found: {self.results_dir}") def analyze(self): """执行完整分析""" logger.info("=" * 70) logger.info("Amazon爬取结果分析") logger.info("=" * 70) logger.info(f"结果目录: {self.results_dir.absolute()}") # 获取所有JSON文件 json_files = list(self.results_dir.glob("*.json")) logger.info(f"JSON文件数量: {len(json_files)}") if not json_files: logger.warning("未找到任何JSON文件") return # 统计数据 stats = { 'total_files': len(json_files), 'successful': 0, 'failed': 0, 'total_items': 0, 'queries': [], 'items_per_query': [], 'price_ranges': defaultdict(int), 'avg_reviews': [], 'avg_stars': [] } # 分析每个文件 logger.info("\n正在分析文件...") for json_file in json_files: try: with open(json_file, 'r', encoding='utf-8') as f: data = json.load(f) # 检查是否成功 if data.get('error_code') == '0000': stats['successful'] += 1 # 获取查询词 query = data.get('items', {}).get('q', '') if query: stats['queries'].append(query) # 获取商品列表 items = data.get('items', {}).get('item', []) item_count = len(items) stats['total_items'] += item_count stats['items_per_query'].append(item_count) # 分析商品数据 for item in items: # 价格分析 try: price = float(item.get('price', 0)) if price < 10: stats['price_ranges']['<$10'] += 1 elif price < 50: stats['price_ranges']['$10-$50'] += 1 elif price < 100: stats['price_ranges']['$50-$100'] += 1 else: stats['price_ranges']['≥$100'] += 1 except (ValueError, TypeError): pass # 评论数分析 try: reviews = int(item.get('reviews', 0)) if reviews > 0: stats['avg_reviews'].append(reviews) except (ValueError, TypeError): pass # 评分分析 try: stars = float(item.get('stars', 0)) if stars > 0: stats['avg_stars'].append(stars) except (ValueError, TypeError): pass else: stats['failed'] += 1 except Exception as e: logger.error(f"分析文件失败 {json_file.name}: {str(e)}") stats['failed'] += 1 # 输出统计结果 self.print_stats(stats) # 保存统计报告 self.save_report(stats) def print_stats(self, stats: Dict): """打印统计信息""" logger.info("\n" + "=" * 70) logger.info("统计结果") logger.info("=" * 70) # 基本统计 logger.info(f"\n【文件统计】") logger.info(f"总文件数: {stats['total_files']}") logger.info(f"成功: {stats['successful']} ({stats['successful']/stats['total_files']*100:.1f}%)") logger.info(f"失败: {stats['failed']} ({stats['failed']/stats['total_files']*100:.1f}%)") # 商品统计 logger.info(f"\n【商品统计】") logger.info(f"总商品数: {stats['total_items']}") if stats['items_per_query']: avg_items = sum(stats['items_per_query']) / len(stats['items_per_query']) max_items = max(stats['items_per_query']) min_items = min(stats['items_per_query']) logger.info(f"平均每个查询: {avg_items:.1f} 个商品") logger.info(f"最多: {max_items} 个") logger.info(f"最少: {min_items} 个") # 价格分布 if stats['price_ranges']: logger.info(f"\n【价格分布】") total_priced = sum(stats['price_ranges'].values()) for price_range, count in sorted(stats['price_ranges'].items()): percentage = count / total_priced * 100 logger.info(f"{price_range}: {count} ({percentage:.1f}%)") # 评论统计 if stats['avg_reviews']: avg_reviews = sum(stats['avg_reviews']) / len(stats['avg_reviews']) max_reviews = max(stats['avg_reviews']) logger.info(f"\n【评论统计】") logger.info(f"平均评论数: {avg_reviews:.0f}") logger.info(f"最高评论数: {max_reviews}") # 评分统计 if stats['avg_stars']: avg_stars = sum(stats['avg_stars']) / len(stats['avg_stars']) logger.info(f"\n【评分统计】") logger.info(f"平均评分: {avg_stars:.2f}") logger.info("\n" + "=" * 70) def save_report(self, stats: Dict): """保存分析报告""" report_file = self.results_dir / "analysis_report.json" # 准备报告数据 report = { 'total_files': stats['total_files'], 'successful': stats['successful'], 'failed': stats['failed'], 'success_rate': f"{stats['successful']/stats['total_files']*100:.1f}%", 'total_items': stats['total_items'], 'price_distribution': dict(stats['price_ranges']) } if stats['items_per_query']: report['avg_items_per_query'] = sum(stats['items_per_query']) / len(stats['items_per_query']) report['max_items'] = max(stats['items_per_query']) report['min_items'] = min(stats['items_per_query']) if stats['avg_reviews']: report['avg_reviews'] = sum(stats['avg_reviews']) / len(stats['avg_reviews']) report['max_reviews'] = max(stats['avg_reviews']) if stats['avg_stars']: report['avg_stars'] = sum(stats['avg_stars']) / len(stats['avg_stars']) # 保存报告 try: with open(report_file, 'w', encoding='utf-8') as f: json.dump(report, f, ensure_ascii=False, indent=2) logger.info(f"分析报告已保存: {report_file}") except Exception as e: logger.error(f"保存报告失败: {str(e)}") def export_csv(self, output_file: str = None): """导出为CSV格式""" import csv if output_file is None: output_file = self.results_dir / "items_export.csv" logger.info(f"\n导出CSV: {output_file}") json_files = list(self.results_dir.glob("*.json")) with open(output_file, 'w', encoding='utf-8', newline='') as f: writer = csv.writer(f) writer.writerow(['Query', 'Title', 'Price', 'Reviews', 'Stars', 'Sales', 'URL']) for json_file in json_files: try: with open(json_file, 'r', encoding='utf-8') as jf: data = json.load(jf) if data.get('error_code') == '0000': query = data.get('items', {}).get('q', '') items = data.get('items', {}).get('item', []) for item in items: writer.writerow([ query, item.get('title', ''), item.get('price', ''), item.get('reviews', ''), item.get('stars', ''), item.get('sales', ''), item.get('detail_url', '') ]) except Exception as e: logger.error(f"导出失败 {json_file.name}: {str(e)}") logger.info(f"CSV导出完成: {output_file}") def main(): """主函数""" import argparse parser = argparse.ArgumentParser(description='分析Amazon爬取结果') parser.add_argument('--dir', type=str, default='amazon_results', help='结果目录路径') parser.add_argument('--csv', action='store_true', help='导出为CSV文件') parser.add_argument('--output', type=str, help='CSV输出文件路径') args = parser.parse_args() try: analyzer = ResultAnalyzer(args.dir) analyzer.analyze() if args.csv: analyzer.export_csv(args.output) except Exception as e: logger.error(f"分析失败: {str(e)}") if __name__ == "__main__": main()