Blame view

data_crawling/amazon_crawler.py 10.1 KB
8f6f14da   tangwang   test data prepare:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
  #!/usr/bin/env python3
  # -*- coding: utf-8 -*-
  """
  Amazon商品数据爬虫 V2
  使用万邦API按关键字搜索商品并保存结果
  支持配置文件和命令行参数
  """
  
  import requests
  import json
  import time
  import os
  import sys
  import argparse
  from pathlib import Path
  from typing import Optional, Dict, Any
  from datetime import datetime
  import logging
  
  # 配置日志
  logging.basicConfig(
      level=logging.INFO,
      format='%(asctime)s - %(levelname)s - %(message)s',
      handlers=[
          logging.FileHandler('amazon_crawler.log'),
          logging.StreamHandler()
      ]
  )
  logger = logging.getLogger(__name__)
  
  
  class AmazonCrawler:
      """亚马逊商品爬虫类"""
      
      def __init__(self, api_key: str, api_secret: str, results_dir: str = "amazon_results"):
          """
          初始化爬虫
          
          Args:
              api_key: API调用key
              api_secret: API调用密钥
              results_dir: 结果保存目录
          """
          self.api_key = api_key
          self.api_secret = api_secret
          self.base_url = "https://api-gw.onebound.cn/amazon/item_search"
          self.api_name = "item_search"
          
          # 创建结果保存目录
          self.results_dir = Path(results_dir)
          self.results_dir.mkdir(parents=True, exist_ok=True)
          
          # 请求统计
          self.total_requests = 0
          self.successful_requests = 0
          self.failed_requests = 0
          self.start_time = None
      
      def search_items(self, query: str, **kwargs) -> Optional[Dict[str, Any]]:
          """
          按关键字搜索商品
          
          Args:
              query: 搜索关键字
              **kwargs: 其他可选参数
              
          Returns:
              API响应的JSON数据,失败返回None
          """
          params = {
              'key': self.api_key,
              'secret': self.api_secret,
              'q': query,
              'cache': kwargs.get('cache', 'yes'),
              'result_type': kwargs.get('result_type', 'json'),
              'lang': kwargs.get('lang', 'cn'),
          }
          
          # 添加其他可选参数
          optional_params = ['start_price', 'end_price', 'page', 'cat', 
                            'discount_only', 'sort', 'page_size', 'seller_info', 
                            'nick', 'ppath']
          for param in optional_params:
              if param in kwargs and kwargs[param]:
                  params[param] = kwargs[param]
          
          try:
              logger.info(f"正在请求: {query}")
              self.total_requests += 1
              
              response = requests.get(
                  self.base_url,
                  params=params,
                  timeout=30
              )
              response.raise_for_status()
              
              data = response.json()
              
              if data.get('error_code') == '0000':
                  logger.info(f"✓ 成功: {query} - 获得 {data.get('items', {}).get('real_total_results', 0)} 个结果")
                  self.successful_requests += 1
                  return data
              else:
                  logger.error(f"✗ API错误: {query} - {data.get('reason', 'Unknown error')}")
                  self.failed_requests += 1
                  return data
                  
          except requests.exceptions.RequestException as e:
              logger.error(f"✗ 请求失败: {query} - {str(e)}")
              self.failed_requests += 1
              return None
          except json.JSONDecodeError as e:
              logger.error(f"✗ JSON解析失败: {query} - {str(e)}")
              self.failed_requests += 1
              return None
      
      def save_result(self, query: str, data: Dict[str, Any], index: int):
          """保存搜索结果到JSON文件"""
          safe_query = "".join(c if c.isalnum() or c in (' ', '_', '-') else '_' 
                             for c in query)
          safe_query = safe_query.replace(' ', '_')[:50]
          
          filename = f"{index:04d}_{safe_query}.json"
          filepath = self.results_dir / filename
          
          try:
              with open(filepath, 'w', encoding='utf-8') as f:
                  json.dump(data, f, ensure_ascii=False, indent=2)
              logger.debug(f"已保存: {filename}")
          except Exception as e:
              logger.error(f"保存失败: {filename} - {str(e)}")
      
      def crawl_from_file(self, queries_file: str, delay: float = 1.0, 
                         start_index: int = 0, max_queries: Optional[int] = None):
          """从文件读取查询列表并批量爬取"""
          self.start_time = datetime.now()
          logger.info("=" * 70)
          logger.info(f"Amazon爬虫启动 - {self.start_time.strftime('%Y-%m-%d %H:%M:%S')}")
          logger.info("=" * 70)
          logger.info(f"查询文件: {queries_file}")
          logger.info(f"结果目录: {self.results_dir}")
          
          try:
              with open(queries_file, 'r', encoding='utf-8') as f:
                  queries = [line.strip() for line in f if line.strip()]
              
              total_queries = len(queries)
              logger.info(f"共读取 {total_queries} 个查询")
              
              if start_index > 0:
                  queries = queries[start_index:]
                  logger.info(f"从索引 {start_index} 开始")
              
              if max_queries:
                  queries = queries[:max_queries]
                  logger.info(f"限制爬取数量: {max_queries}")
              
              logger.info(f"请求间隔: {delay} 秒")
              logger.info("=" * 70)
              
              # 逐个爬取
              for i, query in enumerate(queries, start=start_index):
                  progress = i - start_index + 1
                  total = len(queries)
                  percentage = (progress / total) * 100
                  
                  logger.info(f"[{progress}/{total}] ({percentage:.1f}%) - {query}")
                  
                  data = self.search_items(query)
                  
                  if data:
                      self.save_result(query, data, i)
                  else:
                      error_data = {
                          'error': 'Request failed',
                          'query': query,
                          'index': i,
                          'timestamp': datetime.now().isoformat()
                      }
                      self.save_result(query, error_data, i)
                  
                  # 延迟
                  if progress < total:
                      time.sleep(delay)
              
              # 统计信息
              end_time = datetime.now()
              duration = end_time - self.start_time
              
              logger.info("=" * 70)
              logger.info("爬取完成!")
              logger.info("=" * 70)
              logger.info(f"开始时间: {self.start_time.strftime('%Y-%m-%d %H:%M:%S')}")
              logger.info(f"结束时间: {end_time.strftime('%Y-%m-%d %H:%M:%S')}")
              logger.info(f"总耗时: {duration}")
              logger.info(f"总请求数: {self.total_requests}")
              logger.info(f"成功: {self.successful_requests} ({self.successful_requests/self.total_requests*100:.1f}%)")
              logger.info(f"失败: {self.failed_requests} ({self.failed_requests/self.total_requests*100:.1f}%)")
              logger.info(f"结果保存在: {self.results_dir.absolute()}")
              logger.info("=" * 70)
              
          except FileNotFoundError:
              logger.error(f"文件不存在: {queries_file}")
          except KeyboardInterrupt:
              logger.warning("\n用户中断爬取")
              logger.info(f"已完成: {self.successful_requests}/{self.total_requests}")
          except Exception as e:
              logger.error(f"爬取过程出错: {str(e)}", exc_info=True)
  
  
  def load_config():
      """加载配置文件"""
      try:
          # 尝试导入config.py
          import config
          return config
      except ImportError:
          logger.warning("未找到配置文件 config.py,使用默认配置")
          return None
  
  
  def main():
      """主函数"""
      parser = argparse.ArgumentParser(description='Amazon商品数据爬虫')
      parser.add_argument('--key', type=str, help='API Key')
      parser.add_argument('--secret', type=str, help='API Secret')
      parser.add_argument('--queries', type=str, default='queries.txt',
                         help='查询文件路径')
      parser.add_argument('--delay', type=float, default=2.0,
                         help='请求间隔(秒)')
      parser.add_argument('--start', type=int, default=0,
                         help='起始索引')
      parser.add_argument('--max', type=int, default=None,
                         help='最大爬取数量')
      parser.add_argument('--output', type=str, default='amazon_results',
                         help='结果保存目录')
      
      args = parser.parse_args()
      
      # 获取API密钥
      api_key = args.key
      api_secret = args.secret
      
      # 如果命令行没有提供,尝试从配置文件加载
      if not api_key or not api_secret:
          config = load_config()
          if config:
              api_key = api_key or getattr(config, 'API_KEY', None)
              api_secret = api_secret or getattr(config, 'API_SECRET', None)
      
      # 如果仍然没有,尝试从环境变量读取
      if not api_key or not api_secret:
          api_key = api_key or os.getenv('ONEBOUND_API_KEY')
          api_secret = api_secret or os.getenv('ONEBOUND_API_SECRET')
      
      # 检查API密钥
      if not api_key or not api_secret or \
         api_key == "your_api_key_here" or api_secret == "your_api_secret_here":
          logger.error("=" * 70)
          logger.error("错误: 未配置API密钥!")
          logger.error("")
          logger.error("请使用以下任一方式配置API密钥:")
          logger.error("1. 命令行参数: --key YOUR_KEY --secret YOUR_SECRET")
          logger.error("2. 配置文件: 复制 config.example.py 为 config.py 并填入密钥")
          logger.error("3. 环境变量: ONEBOUND_API_KEY 和 ONEBOUND_API_SECRET")
          logger.error("=" * 70)
          return
      
      # 创建爬虫实例
      crawler = AmazonCrawler(api_key, api_secret, args.output)
      
      # 开始爬取
      crawler.crawl_from_file(
          queries_file=args.queries,
          delay=args.delay,
          start_index=args.start,
          max_queries=args.max
      )
  
  
  if __name__ == "__main__":
      main()