amazon_crawler.py 10.2 KB
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Amazon商品数据爬虫 V2
使用万邦API按关键字搜索商品并保存结果
支持配置文件和命令行参数
"""

import requests
import json
import time
import os
import sys
import argparse
from pathlib import Path
from typing import Optional, Dict, Any
from datetime import datetime
import logging

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('amazon_crawler.log'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)


class AmazonCrawler:
    """亚马逊商品爬虫类"""
    
    def __init__(self, api_key: str, api_secret: str, results_dir: str = "amazon_results"):
        """
        初始化爬虫
        
        Args:
            api_key: API调用key
            api_secret: API调用密钥
            results_dir: 结果保存目录
        """
        self.api_key = api_key
        self.api_secret = api_secret
        self.base_url = "https://api-gw.onebound.cn/amazon/item_search"
        self.api_name = "item_search"
        
        # 创建结果保存目录
        self.results_dir = Path(results_dir)
        self.results_dir.mkdir(parents=True, exist_ok=True)
        
        # 请求统计
        self.total_requests = 0
        self.successful_requests = 0
        self.failed_requests = 0
        self.start_time = None
    
    def search_items(self, query: str, **kwargs) -> Optional[Dict[str, Any]]:
        """
        按关键字搜索商品
        
        Args:
            query: 搜索关键字
            **kwargs: 其他可选参数
            
        Returns:
            API响应的JSON数据,失败返回None
        """
        params = {
            'key': self.api_key,
            'secret': self.api_secret,
            'q': query,
            'cache': kwargs.get('cache', 'yes'),
            'result_type': kwargs.get('result_type', 'json'),
            'lang': kwargs.get('lang', 'cn'),
        }
        
        # 添加其他可选参数
        optional_params = ['start_price', 'end_price', 'page', 'cat', 
                          'discount_only', 'sort', 'page_size', 'seller_info', 
                          'nick', 'ppath']
        for param in optional_params:
            if param in kwargs and kwargs[param]:
                params[param] = kwargs[param]
        
        try:
            logger.info(f"Making request: {query}")
            self.total_requests += 1
            
            response = requests.get(
                self.base_url,
                params=params,
                timeout=30
            )
            response.raise_for_status()
            
            data = response.json()
            
            if data.get('error_code') == '0000':
                logger.info(f"✓ Success: {query} - Got {data.get('items', {}).get('real_total_results', 0)} results")
                self.successful_requests += 1
                return data
            else:
                logger.error(f"✗ API error: {query} - {data.get('reason', 'Unknown error')}")
                self.failed_requests += 1
                return data
                
        except requests.exceptions.RequestException as e:
            logger.error(f"✗ Request failed: {query} - {str(e)}")
            self.failed_requests += 1
            return None
        except json.JSONDecodeError as e:
            logger.error(f"✗ JSON parse failed: {query} - {str(e)}")
            self.failed_requests += 1
            return None
    
    def save_result(self, query: str, data: Dict[str, Any], index: int):
        """保存搜索结果到JSON文件"""
        safe_query = "".join(c if c.isalnum() or c in (' ', '_', '-') else '_' 
                           for c in query)
        safe_query = safe_query.replace(' ', '_')[:50]
        
        filename = f"{index:04d}_{safe_query}.json"
        filepath = self.results_dir / filename
        
        try:
            with open(filepath, 'w', encoding='utf-8') as f:
                json.dump(data, f, ensure_ascii=False, indent=2)
            logger.debug(f"Saved: {filename}")
        except Exception as e:
            logger.error(f"Save failed: {filename} - {str(e)}")
    
    def crawl_from_file(self, queries_file: str, delay: float = 1.0, 
                       start_index: int = 0, max_queries: Optional[int] = None):
        """从文件读取查询列表并批量爬取"""
        self.start_time = datetime.now()
        logger.info("=" * 70)
        logger.info(f"Amazon crawler started - {self.start_time.strftime('%Y-%m-%d %H:%M:%S')}")
        logger.info("=" * 70)
        logger.info(f"Queries file: {queries_file}")
        logger.info(f"Results directory: {self.results_dir}")
        
        try:
            with open(queries_file, 'r', encoding='utf-8') as f:
                queries = [line.strip() for line in f if line.strip()]
            
            total_queries = len(queries)
            logger.info(f"Total queries read: {total_queries}")
            
            if start_index > 0:
                queries = queries[start_index:]
                logger.info(f"Starting from index {start_index}")
            
            if max_queries:
                queries = queries[:max_queries]
                logger.info(f"Limit crawl count to: {max_queries}")
            
            logger.info(f"Request interval: {delay} seconds")
            logger.info("=" * 70)
            
            # 逐个爬取
            for i, query in enumerate(queries, start=start_index):
                progress = i - start_index + 1
                total = len(queries)
                percentage = (progress / total) * 100
                
                logger.info(f"[{progress}/{total}] ({percentage:.1f}%) - {query}")
                
                data = self.search_items(query)
                
                if data:
                    self.save_result(query, data, i)
                else:
                    error_data = {
                        'error': 'Request failed',
                        'query': query,
                        'index': i,
                        'timestamp': datetime.now().isoformat()
                    }
                    self.save_result(query, error_data, i)
                
                # 延迟
                if progress < total:
                    time.sleep(delay)
            
            # 统计信息
            end_time = datetime.now()
            duration = end_time - self.start_time
            
            logger.info("=" * 70)
            logger.info("Crawling completed!")
            logger.info("=" * 70)
            logger.info(f"Start time: {self.start_time.strftime('%Y-%m-%d %H:%M:%S')}")
            logger.info(f"End time: {end_time.strftime('%Y-%m-%d %H:%M:%S')}")
            logger.info(f"Total duration: {duration}")
            logger.info(f"Total requests: {self.total_requests}")
            logger.info(f"Successful: {self.successful_requests} ({self.successful_requests/self.total_requests*100:.1f}%)")
            logger.info(f"Failed: {self.failed_requests} ({self.failed_requests/self.total_requests*100:.1f}%)")
            logger.info(f"Results saved to: {self.results_dir.absolute()}")
            logger.info("=" * 70)
            
        except FileNotFoundError:
            logger.error(f"File not found: {queries_file}")
        except KeyboardInterrupt:
            logger.warning("\nUser interrupted crawling")
            logger.info(f"Completed: {self.successful_requests}/{self.total_requests}")
        except Exception as e:
            logger.error(f"Error during crawling: {str(e)}", exc_info=True)


def load_config():
    """加载配置文件"""
    try:
        # 尝试导入config.py
        import config
        return config
    except ImportError:
        logger.warning("Config file config.py not found, using default configuration")
        return None


def main():
    """主函数"""
    parser = argparse.ArgumentParser(description='Amazon商品数据爬虫')
    parser.add_argument('--key', type=str, help='API Key')
    parser.add_argument('--secret', type=str, help='API Secret')
    parser.add_argument('--queries', type=str, default='queries.txt',
                       help='查询文件路径')
    parser.add_argument('--delay', type=float, default=2.0,
                       help='请求间隔(秒)')
    parser.add_argument('--start', type=int, default=0,
                       help='起始索引')
    parser.add_argument('--max', type=int, default=None,
                       help='最大爬取数量')
    parser.add_argument('--output', type=str, default='amazon_results',
                       help='结果保存目录')
    
    args = parser.parse_args()
    
    # 获取API密钥
    api_key = args.key
    api_secret = args.secret
    
    # 如果命令行没有提供,尝试从配置文件加载
    if not api_key or not api_secret:
        config = load_config()
        if config:
            api_key = api_key or getattr(config, 'API_KEY', None)
            api_secret = api_secret or getattr(config, 'API_SECRET', None)
    
    # 如果仍然没有,尝试从环境变量读取
    if not api_key or not api_secret:
        api_key = api_key or os.getenv('ONEBOUND_API_KEY')
        api_secret = api_secret or os.getenv('ONEBOUND_API_SECRET')
    
    # 检查API密钥
    if not api_key or not api_secret or \
       api_key == "your_api_key_here" or api_secret == "your_api_secret_here":
        logger.error("=" * 70)
        logger.error("Error: API key not configured!")
        logger.error("")
        logger.error("Please configure API key using one of the following methods:")
        logger.error("1. Command line arguments: --key YOUR_KEY --secret YOUR_SECRET")
        logger.error("2. Config file: Copy config.example.py to config.py and fill in the keys")
        logger.error("3. Environment variables: ONEBOUND_API_KEY and ONEBOUND_API_SECRET")
        logger.error("=" * 70)
        return
    
    # 创建爬虫实例
    crawler = AmazonCrawler(api_key, api_secret, args.output)
    
    # 开始爬取
    crawler.crawl_from_file(
        queries_file=args.queries,
        delay=args.delay,
        start_index=args.start,
        max_queries=args.max
    )


if __name__ == "__main__":
    main()