#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 测试脚本 - 只爬取前5个查询词 """ import requests import json import time from pathlib import Path from datetime import datetime from urllib.parse import urlencode # API配置 API_KEY = 't8618339029' API_SECRET = '9029f568' API_URL = 'https://api-gw.onebound.cn/shopee/item_search' # 配置 COUNTRY = '.com.my' PAGE = 1 DELAY = 2 TEST_COUNT = 5 # 只测试前5个 def fetch_shopee_data(query): """请求API""" params = { 'key': API_KEY, 'secret': API_SECRET, 'q': query, 'page': PAGE, 'country': COUNTRY, 'cache': 'yes', 'result_type': 'json', 'lang': 'en' } url = f"{API_URL}?{urlencode(params)}" try: print(f" 请求中...") response = requests.get(url, timeout=30) data = response.json() if data.get('error_code') == '0000': items = len(data.get('items', {}).get('item', [])) print(f" ✓ 成功! 获取 {items} 个商品") return data else: print(f" ✗ API错误: {data.get('reason')}") return data except Exception as e: print(f" ✗ 失败: {e}") return None def main(): """测试主函数""" script_dir = Path(__file__).parent query_file = script_dir / 'queries.txt' results_dir = script_dir / 'test_results' results_dir.mkdir(exist_ok=True) print("=" * 60) print("Shopee API 测试 (前5个查询词)") print("=" * 60) # 读取查询词 with open(query_file, 'r', encoding='utf-8') as f: queries = [line.strip() for line in f if line.strip()][:TEST_COUNT] print(f"测试数量: {len(queries)}") print(f"结果目录: {results_dir}") print("=" * 60) # 爬取 for idx, query in enumerate(queries, 1): print(f"\n[{idx}/{len(queries)}] '{query}'") data = fetch_shopee_data(query) if data: filename = f"{idx:04d}_{query[:30].replace(' ', '_')}.json" filepath = results_dir / filename with open(filepath, 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=2) print(f" 已保存: {filename}") if idx < len(queries): print(f" 等待 {DELAY} 秒...") time.sleep(DELAY) print("\n" + "=" * 60) print("测试完成!") print("=" * 60) if __name__ == '__main__': main()