analyze_results.py 10.3 KB
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Amazon爬取结果分析工具
用于统计和分析爬取到的JSON数据
"""

import json
from pathlib import Path
from typing import Dict, List
from collections import defaultdict
import logging

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)


class ResultAnalyzer:
    """结果分析器"""
    
    def __init__(self, results_dir: str = "amazon_results"):
        """
        初始化分析器
        
        Args:
            results_dir: 结果目录路径
        """
        self.results_dir = Path(results_dir)
        if not self.results_dir.exists():
            logger.error(f"结果目录不存在: {self.results_dir}")
            raise FileNotFoundError(f"Directory not found: {self.results_dir}")
    
    def analyze(self):
        """执行完整分析"""
        logger.info("=" * 70)
        logger.info("Amazon爬取结果分析")
        logger.info("=" * 70)
        logger.info(f"结果目录: {self.results_dir.absolute()}")
        
        # 获取所有JSON文件
        json_files = list(self.results_dir.glob("*.json"))
        logger.info(f"JSON文件数量: {len(json_files)}")
        
        if not json_files:
            logger.warning("未找到任何JSON文件")
            return
        
        # 统计数据
        stats = {
            'total_files': len(json_files),
            'successful': 0,
            'failed': 0,
            'total_items': 0,
            'queries': [],
            'items_per_query': [],
            'price_ranges': defaultdict(int),
            'avg_reviews': [],
            'avg_stars': []
        }
        
        # 分析每个文件
        logger.info("\n正在分析文件...")
        for json_file in json_files:
            try:
                with open(json_file, 'r', encoding='utf-8') as f:
                    data = json.load(f)
                
                # 检查是否成功
                if data.get('error_code') == '0000':
                    stats['successful'] += 1
                    
                    # 获取查询词
                    query = data.get('items', {}).get('q', '')
                    if query:
                        stats['queries'].append(query)
                    
                    # 获取商品列表
                    items = data.get('items', {}).get('item', [])
                    item_count = len(items)
                    stats['total_items'] += item_count
                    stats['items_per_query'].append(item_count)
                    
                    # 分析商品数据
                    for item in items:
                        # 价格分析
                        try:
                            price = float(item.get('price', 0))
                            if price < 10:
                                stats['price_ranges']['<$10'] += 1
                            elif price < 50:
                                stats['price_ranges']['$10-$50'] += 1
                            elif price < 100:
                                stats['price_ranges']['$50-$100'] += 1
                            else:
                                stats['price_ranges']['≥$100'] += 1
                        except (ValueError, TypeError):
                            pass
                        
                        # 评论数分析
                        try:
                            reviews = int(item.get('reviews', 0))
                            if reviews > 0:
                                stats['avg_reviews'].append(reviews)
                        except (ValueError, TypeError):
                            pass
                        
                        # 评分分析
                        try:
                            stars = float(item.get('stars', 0))
                            if stars > 0:
                                stats['avg_stars'].append(stars)
                        except (ValueError, TypeError):
                            pass
                else:
                    stats['failed'] += 1
                    
            except Exception as e:
                logger.error(f"分析文件失败 {json_file.name}: {str(e)}")
                stats['failed'] += 1
        
        # 输出统计结果
        self.print_stats(stats)
        
        # 保存统计报告
        self.save_report(stats)
    
    def print_stats(self, stats: Dict):
        """打印统计信息"""
        logger.info("\n" + "=" * 70)
        logger.info("统计结果")
        logger.info("=" * 70)
        
        # 基本统计
        logger.info(f"\n【文件统计】")
        logger.info(f"总文件数: {stats['total_files']}")
        logger.info(f"成功: {stats['successful']} ({stats['successful']/stats['total_files']*100:.1f}%)")
        logger.info(f"失败: {stats['failed']} ({stats['failed']/stats['total_files']*100:.1f}%)")
        
        # 商品统计
        logger.info(f"\n【商品统计】")
        logger.info(f"总商品数: {stats['total_items']}")
        if stats['items_per_query']:
            avg_items = sum(stats['items_per_query']) / len(stats['items_per_query'])
            max_items = max(stats['items_per_query'])
            min_items = min(stats['items_per_query'])
            logger.info(f"平均每个查询: {avg_items:.1f} 个商品")
            logger.info(f"最多: {max_items} 个")
            logger.info(f"最少: {min_items} 个")
        
        # 价格分布
        if stats['price_ranges']:
            logger.info(f"\n【价格分布】")
            total_priced = sum(stats['price_ranges'].values())
            for price_range, count in sorted(stats['price_ranges'].items()):
                percentage = count / total_priced * 100
                logger.info(f"{price_range}: {count} ({percentage:.1f}%)")
        
        # 评论统计
        if stats['avg_reviews']:
            avg_reviews = sum(stats['avg_reviews']) / len(stats['avg_reviews'])
            max_reviews = max(stats['avg_reviews'])
            logger.info(f"\n【评论统计】")
            logger.info(f"平均评论数: {avg_reviews:.0f}")
            logger.info(f"最高评论数: {max_reviews}")
        
        # 评分统计
        if stats['avg_stars']:
            avg_stars = sum(stats['avg_stars']) / len(stats['avg_stars'])
            logger.info(f"\n【评分统计】")
            logger.info(f"平均评分: {avg_stars:.2f}")
        
        logger.info("\n" + "=" * 70)
    
    def save_report(self, stats: Dict):
        """保存分析报告"""
        report_file = self.results_dir / "analysis_report.json"
        
        # 准备报告数据
        report = {
            'total_files': stats['total_files'],
            'successful': stats['successful'],
            'failed': stats['failed'],
            'success_rate': f"{stats['successful']/stats['total_files']*100:.1f}%",
            'total_items': stats['total_items'],
            'price_distribution': dict(stats['price_ranges'])
        }
        
        if stats['items_per_query']:
            report['avg_items_per_query'] = sum(stats['items_per_query']) / len(stats['items_per_query'])
            report['max_items'] = max(stats['items_per_query'])
            report['min_items'] = min(stats['items_per_query'])
        
        if stats['avg_reviews']:
            report['avg_reviews'] = sum(stats['avg_reviews']) / len(stats['avg_reviews'])
            report['max_reviews'] = max(stats['avg_reviews'])
        
        if stats['avg_stars']:
            report['avg_stars'] = sum(stats['avg_stars']) / len(stats['avg_stars'])
        
        # 保存报告
        try:
            with open(report_file, 'w', encoding='utf-8') as f:
                json.dump(report, f, ensure_ascii=False, indent=2)
            logger.info(f"分析报告已保存: {report_file}")
        except Exception as e:
            logger.error(f"保存报告失败: {str(e)}")
    
    def export_csv(self, output_file: str = None):
        """导出为CSV格式"""
        import csv
        
        if output_file is None:
            output_file = self.results_dir / "items_export.csv"
        
        logger.info(f"\n导出CSV: {output_file}")
        
        json_files = list(self.results_dir.glob("*.json"))
        
        with open(output_file, 'w', encoding='utf-8', newline='') as f:
            writer = csv.writer(f)
            writer.writerow(['Query', 'Title', 'Price', 'Reviews', 'Stars', 'Sales', 'URL'])
            
            for json_file in json_files:
                try:
                    with open(json_file, 'r', encoding='utf-8') as jf:
                        data = json.load(jf)
                    
                    if data.get('error_code') == '0000':
                        query = data.get('items', {}).get('q', '')
                        items = data.get('items', {}).get('item', [])
                        
                        for item in items:
                            writer.writerow([
                                query,
                                item.get('title', ''),
                                item.get('price', ''),
                                item.get('reviews', ''),
                                item.get('stars', ''),
                                item.get('sales', ''),
                                item.get('detail_url', '')
                            ])
                except Exception as e:
                    logger.error(f"导出失败 {json_file.name}: {str(e)}")
        
        logger.info(f"CSV导出完成: {output_file}")


def main():
    """主函数"""
    import argparse
    
    parser = argparse.ArgumentParser(description='分析Amazon爬取结果')
    parser.add_argument('--dir', type=str, default='amazon_results',
                       help='结果目录路径')
    parser.add_argument('--csv', action='store_true',
                       help='导出为CSV文件')
    parser.add_argument('--output', type=str,
                       help='CSV输出文件路径')
    
    args = parser.parse_args()
    
    try:
        analyzer = ResultAnalyzer(args.dir)
        analyzer.analyze()
        
        if args.csv:
            analyzer.export_csv(args.output)
            
    except Exception as e:
        logger.error(f"分析失败: {str(e)}")


if __name__ == "__main__":
    main()