#!/usr/bin/env python3 """ Simple API server for testing aggregation functionality without external dependencies. """ import json import time import random from http.server import HTTPServer, BaseHTTPRequestHandler from urllib.parse import urlparse, parse_qs import threading class SearchAPIHandler(BaseHTTPRequestHandler): """Simple API handler for search requests.""" def do_OPTIONS(self): """Handle CORS preflight requests.""" self.send_response(200) self.send_header('Access-Control-Allow-Origin', '*') self.send_header('Access-Control-Allow-Methods', 'GET, POST, OPTIONS') self.send_header('Access-Control-Allow-Headers', 'Content-Type') self.end_headers() def do_POST(self): """Handle POST requests.""" if self.path == '/': self.handle_search() elif self.path == '/search/': self.handle_search() else: self.send_response(404) self.end_headers() def handle_search(self): """Handle search requests with aggregations.""" try: # Read request body content_length = int(self.headers['Content-Length']) post_data = self.rfile.read(content_length) request_data = json.loads(post_data.decode('utf-8')) query = request_data.get('query', '') size = request_data.get('size', 10) sort_by = request_data.get('sort_by', 'relevance') aggregations = request_data.get('aggregations', {}) filters = request_data.get('filters', {}) print(f"Search request: query='{query}', size={size}, sort_by={sort_by}") print(f"Aggregations: {list(aggregations.keys()) if aggregations else 'None'}") print(f"Filters: {filters if filters else 'None'}") # Simulate processing time time.sleep(0.1) # Generate mock search results results = self.generate_mock_results(query, size, sort_by, filters) # Generate mock aggregations aggregation_results = self.generate_mock_aggregations(aggregations, filters) # Build response response = { "hits": results, "total": len(results) + random.randint(10, 100), "max_score": round(random.uniform(1.5, 3.5), 3), "took_ms": random.randint(15, 45), "aggregations": aggregation_results, "query_info": { "original_query": query, "rewritten_query": query, "detected_language": "zh" if any('\u4e00' <= char <= '\u9fff' for char in query) else "en", "domain": "default", "translations": {}, "has_vector": False } } # Send response self.send_response(200) self.send_header('Content-Type', 'application/json') self.send_header('Access-Control-Allow-Origin', '*') self.end_headers() response_json = json.dumps(response, ensure_ascii=False, indent=2) self.wfile.write(response_json.encode('utf-8')) print(f"Response sent with {len(results)} results and {len(aggregation_results)} aggregations") except Exception as e: print(f"Error handling request: {e}") self.send_response(500) self.send_header('Content-Type', 'application/json') self.send_header('Access-Control-Allow-Origin', '*') self.end_headers() error_response = { "error": str(e), "detail": "Internal server error" } self.wfile.write(json.dumps(error_response).encode('utf-8')) def generate_mock_results(self, query, size, sort_by, filters): """Generate mock search results.""" # Sample product data sample_products = [ { "skuId": 1001, "name": "芭比娃娃梦幻套装", "enSpuName": "Barbie Dream House Playset", "ruSkuName": "Кукла Барби Мечтательный домик", "categoryName": "芭比", "brandName": "美泰", "supplierName": "义乌玩具厂", "price": 89.99, "imageUrl": "https://picsum.photos/seed/barbie1/200/200.jpg", "create_time": "2024-01-15T10:30:00Z", "days_since_last_update": 45 }, { "skuId": 1002, "name": "芭比娃娃时尚系列", "enSpuName": "Barbie Fashion Doll Collection", "ruSkuName": "Кукла Барби Модная коллекция", "categoryName": "芭比", "brandName": "美泰", "supplierName": "汕头玩具公司", "price": 45.50, "imageUrl": "https://picsum.photos/seed/barbie2/200/200.jpg", "create_time": "2024-02-20T14:15:00Z", "days_since_last_update": 30 }, { "skuId": 1003, "name": "儿童积木套装", "enSpuName": "Kids Building Blocks Set", "ruSkuName": "Детский строительный набор", "categoryName": "积木", "brandName": "乐高", "supplierName": "深圳塑胶制品厂", "price": 158.00, "imageUrl": "https://picsum.photos/seed/blocks1/200/200.jpg", "create_time": "2024-01-10T09:20:00Z", "days_since_last_update": 60 }, { "skuId": 1004, "name": "消防车玩具模型", "enSpuName": "Fire Truck Toy Model", "ruSkuName": "Модель пожарной машины", "categoryName": "小汽车", "brandName": "多美卡", "supplierName": "东莞玩具制造厂", "price": 78.50, "imageUrl": "https://picsum.photos/seed/firetruck1/200/200.jpg", "create_time": "2024-03-05T16:45:00Z", "days_since_last_update": 15 }, { "skuId": 1005, "name": "婴儿毛绒玩具", "enSpuName": "Baby Plush Toy", "ruSkuName": "Детская плюшевая игрушка", "categoryName": "婴儿娃娃", "brandName": "迪士尼", "supplierName": "上海礼品公司", "price": 32.00, "imageUrl": "https://picsum.photos/seed/plush1/200/200.jpg", "create_time": "2024-02-14T11:30:00Z", "days_since_last_update": 25 } ] # Apply filters if any if filters: filtered_products = [] for product in sample_products: include = True # Check category filter if 'category_name' in filters: if product['categoryName'] not in filters['category_name']: include = False # Check brand filter if 'brand_name' in filters: if product['brandName'] not in filters['brand_name']: include = False # Check price range filter if 'price_ranges' in filters: price = product['price'] in_range = False for price_range in filters['price_ranges']: if price_range == '0-50' and price <= 50: in_range = True elif price_range == '50-100' and 50 < price <= 100: in_range = True elif price_range == '100-200' and 100 < price <= 200: in_range = True elif price_range == '200+' and price > 200: in_range = True if not in_range: include = False if include: filtered_products.append(product) sample_products = filtered_products # Apply sorting if sort_by == 'price_asc': sample_products.sort(key=lambda x: x.get('price', 0)) elif sort_by == 'price_desc': sample_products.sort(key=lambda x: x.get('price', 0), reverse=True) elif sort_by == 'time_desc': sample_products.sort(key=lambda x: x.get('create_time', ''), reverse=True) # Convert to API response format results = [] for i, product in enumerate(sample_products[:size]): hit = { "_id": str(product['skuId']), "_score": round(random.uniform(1.5, 3.5), 3), "_source": product } results.append(hit) return results def generate_mock_aggregations(self, aggregations, filters): """Generate mock aggregation results.""" if not aggregations: return {} result = {} for agg_name, agg_spec in aggregations.items(): agg_type = agg_spec.get('type', 'terms') if agg_type == 'terms': # Generate mock terms aggregation if agg_name == 'category_name': buckets = [ {"key": "芭比", "doc_count": random.randint(15, 35)}, {"key": "儿童娃娃", "doc_count": random.randint(8, 20)}, {"key": "积木", "doc_count": random.randint(5, 15)}, {"key": "小汽车", "doc_count": random.randint(3, 12)}, {"key": "婴儿娃娃", "doc_count": random.randint(4, 10)}, {"key": "人物", "doc_count": random.randint(6, 18)} ] elif agg_name == 'brand_name': buckets = [ {"key": "美泰", "doc_count": random.randint(20, 40)}, {"key": "乐高", "doc_count": random.randint(10, 25)}, {"key": "迪士尼", "doc_count": random.randint(8, 20)}, {"key": "多美卡", "doc_count": random.randint(5, 15)}, {"key": "孩之宝", "doc_count": random.randint(6, 18)}, {"key": "万代", "doc_count": random.randint(3, 10)} ] elif agg_name == 'material_type': buckets = [ {"key": "塑料", "doc_count": random.randint(40, 80)}, {"key": "布绒", "doc_count": random.randint(8, 20)}, {"key": "金属", "doc_count": random.randint(5, 15)}, {"key": "木质", "doc_count": random.randint(3, 12)} ] else: # Generic terms aggregation buckets = [ {"key": f"选项{i+1}", "doc_count": random.randint(5, 25)} for i in range(5) ] result[agg_name] = { "doc_count_error_upper_bound": 0, "sum_other_doc_count": random.randint(10, 50), "buckets": buckets } elif agg_type == 'range': # Generate mock range aggregation (usually for price) if agg_name == 'price_ranges': ranges = agg_spec.get('ranges', []) buckets = [] for range_spec in ranges: key = range_spec.get('key', 'unknown') count = random.randint(5, 30) bucket_data = {"key": key, "doc_count": count} # Add range bounds if 'to' in range_spec: bucket_data['to'] = range_spec['to'] if 'from' in range_spec: bucket_data['from'] = range_spec['from'] buckets.append(bucket_data) result[agg_name] = {"buckets": buckets} return result def log_message(self, format, *args): """Override to reduce log noise.""" pass def run_server(): """Run the API server.""" server_address = ('', 6002) httpd = HTTPServer(server_address, SearchAPIHandler) print("🚀 Simple Search API Server started!") print("📍 API: http://localhost:6002") print("🔍 Search endpoint: http://localhost:6002/search/") print("🌐 Frontend should connect to: http://localhost:6002") print("⏹️ Press Ctrl+C to stop") try: httpd.serve_forever() except KeyboardInterrupt: print("\n🛑 Server stopped") httpd.server_close() def run_server(): """Run the API server - main entry point.""" server_address = ('', 6002) httpd = HTTPServer(server_address, SearchAPIHandler) print("🚀 Simple Search API Server started!") print("📍 API: http://localhost:6002") print("🔍 Search endpoint: http://localhost:6002/search/") print("🌐 Frontend should connect to: http://localhost:6002") print("⏹️ Press Ctrl+C to stop") try: httpd.serve_forever() except KeyboardInterrupt: print("\n🛑 Server stopped") httpd.server_close() if __name__ == '__main__': run_server()