"""Translator service HTTP app.""" import argparse import logging import os import pathlib import time import uuid from contextlib import asynccontextmanager from functools import lru_cache from logging.handlers import TimedRotatingFileHandler from typing import List, Optional, Union import uvicorn from fastapi import FastAPI, HTTPException, Request from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse from pydantic import BaseModel, ConfigDict, Field from config.services_config import get_translation_config from translation.logging_utils import ( TranslationRequestFilter, bind_translation_request_id, reset_translation_request_id, ) from translation.service import TranslationService from translation.settings import ( get_enabled_translation_models, normalize_translation_model, normalize_translation_scene, ) def configure_translator_logging() -> None: log_dir = pathlib.Path("logs") verbose_dir = log_dir / "verbose" log_dir.mkdir(exist_ok=True) verbose_dir.mkdir(parents=True, exist_ok=True) log_level = os.getenv("LOG_LEVEL", "INFO").upper() numeric_level = getattr(logging, log_level, logging.INFO) formatter = logging.Formatter("%(asctime)s | reqid:%(reqid)s | %(name)s | %(levelname)s | %(message)s") request_filter = TranslationRequestFilter() root_logger = logging.getLogger() root_logger.setLevel(numeric_level) root_logger.handlers.clear() console_handler = logging.StreamHandler() console_handler.setLevel(numeric_level) console_handler.setFormatter(formatter) console_handler.addFilter(request_filter) root_logger.addHandler(console_handler) file_handler = TimedRotatingFileHandler( filename=log_dir / "translator_api.log", when="midnight", interval=1, backupCount=30, encoding="utf-8", ) file_handler.setLevel(numeric_level) file_handler.setFormatter(formatter) file_handler.addFilter(request_filter) root_logger.addHandler(file_handler) verbose_logger = logging.getLogger("translator.verbose") verbose_logger.setLevel(numeric_level) verbose_logger.handlers.clear() verbose_logger.propagate = False verbose_handler = TimedRotatingFileHandler( filename=verbose_dir / "translator_verbose.log", when="midnight", interval=1, backupCount=30, encoding="utf-8", ) verbose_handler.setLevel(numeric_level) verbose_handler.setFormatter(formatter) verbose_handler.addFilter(request_filter) verbose_logger.addHandler(verbose_handler) configure_translator_logging() logger = logging.getLogger(__name__) verbose_logger = logging.getLogger("translator.verbose") @lru_cache(maxsize=1) def get_translation_service() -> TranslationService: return TranslationService(get_translation_config()) # Request/Response models class TranslationRequest(BaseModel): """Translation request model.""" model_config = ConfigDict( json_schema_extra={ "example": { "text": "商品名称", "target_lang": "en", "source_lang": "zh", "model": "llm", "scene": "sku_name", } } ) text: Union[str, List[str]] = Field(..., description="Text to translate (string or list of strings)") target_lang: str = Field(..., description="Target language code (zh, en, ru, etc.)") source_lang: Optional[str] = Field(None, description="Source language code (optional, auto-detect if not provided)") model: Optional[str] = Field(None, description="Enabled translation capability name") scene: Optional[str] = Field(None, description="Translation scene, paired with model routing") class TranslationResponse(BaseModel): """Translation response model.""" text: Union[str, List[str]] = Field(..., description="Original text (string or list)") target_lang: str = Field(..., description="Target language code") source_lang: Optional[str] = Field(None, description="Source language code (detected or provided)") translated_text: Union[str, List[Optional[str]]] = Field( ..., description="Translated text (string or list; list elements may be null on failure)", ) status: str = Field(..., description="Translation status") model: str = Field(..., description="Translation model used") scene: str = Field(..., description="Translation scene used") def _normalize_scene(service: TranslationService, scene: Optional[str]) -> str: return normalize_translation_scene(service.config, scene) def _normalize_model(service: TranslationService, model: Optional[str]) -> str: return normalize_translation_model(service.config, model or service.config["default_model"]) def _ensure_valid_text(text: Union[str, List[str]]) -> None: if isinstance(text, list): if not text: raise HTTPException(status_code=400, detail="Text list cannot be empty") return if not text or not text.strip(): raise HTTPException(status_code=400, detail="Text cannot be empty") def _normalize_batch_result( original: List[str], translated: Union[str, List[Optional[str]], None], ) -> List[Optional[str]]: if translated is None: return [None for _ in original] if not isinstance(translated, list): raise HTTPException(status_code=500, detail="Batch translation provider returned non-list result") return [translated[idx] if idx < len(translated) else None for idx, _ in enumerate(original)] def _text_preview(text: Optional[str], limit: int = 20) -> str: normalized = str(text or "").replace("\n", "\\n") return normalized[:limit] def _request_metrics(text: Union[str, List[str]]) -> dict: if isinstance(text, list): lengths = [len(str(item or "")) for item in text] return { "request_count": len(text), "lengths": lengths, "first_preview": _text_preview(text[0] if text else ""), } return { "request_count": 1, "lengths": [len(str(text or ""))], "first_preview": _text_preview(str(text or "")), } def _result_preview(translated: Union[str, List[Optional[str]], None]) -> str: if isinstance(translated, list): if not translated: return "" first = translated[0] return _text_preview("" if first is None else str(first)) if translated is None: return "" return _text_preview(str(translated)) def _resolve_request_id(http_request: Request) -> str: header_value = http_request.headers.get("X-Request-ID") if header_value and header_value.strip(): return header_value.strip()[:32] return str(uuid.uuid4())[:8] def _translate_batch( service: TranslationService, raw_text: List[str], *, target_lang: str, source_lang: Optional[str], model: str, scene: str, ) -> List[Optional[str]]: backend = service.get_backend(model) logger.info( "Translation batch dispatch | execution=%s count=%s lengths=%s first_preview=%s", "backend-batch" if getattr(backend, "supports_batch", False) else "per-item", len(raw_text), [len(str(item or "")) for item in raw_text], _text_preview(raw_text[0] if raw_text else ""), ) if getattr(backend, "supports_batch", False): try: translated = service.translate( text=raw_text, target_lang=target_lang, source_lang=source_lang, model=model, scene=scene, ) verbose_logger.info( "Translation batch result | model=%s scene=%s count=%s first_result=%s", model, scene, len(raw_text), _result_preview(translated), ) return _normalize_batch_result(raw_text, translated) except ValueError: raise except Exception as exc: logger.error("Batch translation failed: %s", exc, exc_info=True) results: List[Optional[str]] = [] for item in raw_text: if item is None or not str(item).strip(): results.append(item) # type: ignore[arg-type] continue try: out = service.translate( text=str(item), target_lang=target_lang, source_lang=source_lang, model=model, scene=scene, ) except ValueError: raise except Exception as exc: logger.warning( "Per-item translation failed | model=%s scene=%s target_lang=%s source_lang=%s item_len=%s item_preview=%s error=%s", model, scene, target_lang, source_lang or "auto", len(str(item or "")), _text_preview(str(item or "")), exc, exc_info=True, ) out = None results.append(out) return results @asynccontextmanager async def lifespan(_: FastAPI): """Initialize all enabled translation backends on process startup.""" logger.info("Starting Translation Service API") service = get_translation_service() logger.info( "Translation service ready | default_model=%s default_scene=%s available_models=%s loaded_models=%s", service.config["default_model"], service.config["default_scene"], service.available_models, service.loaded_models, ) logger.info( "Translation backends initialized on startup | models=%s", service.loaded_models, ) verbose_logger.info( "Translation startup detail | capabilities=%s cache_ttl_seconds=%s cache_sliding_expiration=%s", service.available_models, service.config["cache"]["ttl_seconds"], service.config["cache"]["sliding_expiration"], ) yield # Create FastAPI app app = FastAPI( title="Translation Service API", description="Translation service with pluggable capabilities and scene routing", version="1.0.0", docs_url="/docs", redoc_url="/redoc", lifespan=lifespan, ) # Add CORS middleware app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) @app.get("/health") async def health_check(): """Health check endpoint.""" try: service = get_translation_service() logger.info( "Health check | default_model=%s default_scene=%s loaded_models=%s", service.config["default_model"], service.config["default_scene"], service.loaded_models, ) return { "status": "healthy", "service": "translation", "default_model": service.config["default_model"], "default_scene": service.config["default_scene"], "available_models": service.available_models, "enabled_capabilities": get_enabled_translation_models(service.config), "loaded_models": service.loaded_models, } except Exception as e: logger.error(f"Health check failed: {e}") return JSONResponse( status_code=503, content={ "status": "unhealthy", "error": str(e) } ) @app.post("/translate", response_model=TranslationResponse) async def translate(request: TranslationRequest, http_request: Request): _ensure_valid_text(request.text) if not request.target_lang: raise HTTPException(status_code=400, detail="target_lang is required") _, request_token = bind_translation_request_id(_resolve_request_id(http_request)) request_started = time.perf_counter() try: service = get_translation_service() scene = _normalize_scene(service, request.scene) model = _normalize_model(service, request.model) translator = service.get_backend(model) raw_text = request.text metrics = _request_metrics(raw_text) logger.info( "Translation request | model=%s scene=%s target_lang=%s source_lang=%s count=%s lengths=%s first_preview=%s backend=%s", model, scene, request.target_lang, request.source_lang or "auto", metrics["request_count"], metrics["lengths"], metrics["first_preview"], getattr(translator, "model", model), ) verbose_logger.info( "Translation request detail | model=%s scene=%s target_lang=%s source_lang=%s payload=%s", model, scene, request.target_lang, request.source_lang or "auto", raw_text, ) if isinstance(raw_text, list): results = _translate_batch( service, raw_text, target_lang=request.target_lang, source_lang=request.source_lang, model=model, scene=scene, ) latency_ms = (time.perf_counter() - request_started) * 1000 logger.info( "Translation response | model=%s scene=%s count=%s first_result=%s latency_ms=%.2f", model, scene, len(raw_text), _result_preview(results), latency_ms, ) verbose_logger.info( "Translation response detail | model=%s scene=%s translated=%s latency_ms=%.2f", model, scene, results, latency_ms, ) return TranslationResponse( text=raw_text, target_lang=request.target_lang, source_lang=request.source_lang, translated_text=results, status="success", model=str(getattr(translator, "model", model)), scene=scene, ) translated_text = service.translate( text=raw_text, target_lang=request.target_lang, source_lang=request.source_lang, model=model, scene=scene, ) if translated_text is None: raise HTTPException(status_code=500, detail="Translation failed") latency_ms = (time.perf_counter() - request_started) * 1000 logger.info( "Translation response | model=%s scene=%s count=1 first_result=%s latency_ms=%.2f", model, scene, _result_preview(translated_text), latency_ms, ) verbose_logger.info( "Translation response detail | model=%s scene=%s translated=%s latency_ms=%.2f", model, scene, translated_text, latency_ms, ) return TranslationResponse( text=raw_text, target_lang=request.target_lang, source_lang=request.source_lang, translated_text=translated_text, status="success", model=str(getattr(translator, "model", model)), scene=scene, ) except HTTPException as exc: latency_ms = (time.perf_counter() - request_started) * 1000 logger.warning( "Translation request failed | status_code=%s detail=%s latency_ms=%.2f", exc.status_code, exc.detail, latency_ms, ) raise except ValueError as e: latency_ms = (time.perf_counter() - request_started) * 1000 logger.warning("Translation validation error | error=%s latency_ms=%.2f", e, latency_ms) raise HTTPException(status_code=400, detail=str(e)) from e except Exception as e: latency_ms = (time.perf_counter() - request_started) * 1000 logger.error("Translation error | error=%s latency_ms=%.2f", e, latency_ms, exc_info=True) raise HTTPException(status_code=500, detail=f"Translation error: {str(e)}") finally: reset_translation_request_id(request_token) @app.get("/") async def root(): """Root endpoint with API information.""" return { "service": "Translation Service API", "version": "1.0.0", "status": "running", "endpoints": { "translate": "POST /translate", "health": "GET /health", "docs": "GET /docs" } } if __name__ == "__main__": parser = argparse.ArgumentParser(description='Start translation API service') parser.add_argument('--host', default='0.0.0.0', help='Host to bind to') parser.add_argument('--port', type=int, default=6006, help='Port to bind to') parser.add_argument('--reload', action='store_true', help='Enable auto-reload') args = parser.parse_args() # Run server uvicorn.run( "api.translator_app:app", host=args.host, port=args.port, reload=args.reload )