translator_app.py 12.6 KB

"""

# 方式1:直接运行
python api/translator_app.py

# 方式2:使用 uvicorn
uvicorn api.translator_app:app --host 0.0.0.0 --port 6006 --reload


使用说明:
Translation HTTP Service

This service provides a RESTful API for text translation using Qwen (default) or DeepL API.
The service runs on port 6006 and provides a simple translation endpoint.

API Endpoint:
    POST /translate

Request Body (JSON):
    {
        "text": "要翻译的文本",
        "target_lang": "en",  # Required: target language code (zh, en, ru, etc.)
        "source_lang": "zh",  # Optional: source language code (auto-detect if not provided)
        "model": "qwen"       # Optional: translation model ("qwen" or "deepl", default: "qwen")
    }

Response (JSON):
    {
        "text": "要翻译的文本",
        "target_lang": "en",
        "source_lang": "zh",
        "translated_text": "Text to translate",
        "status": "success"
    }

Usage Examples:

1. Translate Chinese to English:
   curl -X POST http://localhost:6006/translate \
     -H "Content-Type: application/json" \
     -d '{
       "text": "商品名称",
       "target_lang": "en",
       "source_lang": "zh"
     }'

2. Translate with auto-detection:
   curl -X POST http://localhost:6006/translate \
     -H "Content-Type: application/json" \
     -d '{
       "text": "Product name",
       "target_lang": "zh"
     }'

3. Translate using DeepL model:
   curl -X POST http://localhost:6006/translate \
     -H "Content-Type: application/json" \
     -d '{
       "text": "商品名称",
       "target_lang": "en",
       "source_lang": "zh",
       "model": "deepl"
     }'

4. Translate Russian to English:
   curl -X POST http://localhost:6006/translate \
     -H "Content-Type: application/json" \
     -d '{
       "text": "Название товара",
       "target_lang": "en",
       "source_lang": "ru"
     }'

Health Check:
    GET /health

    curl http://localhost:6006/health

Start the service:
    python api/translator_app.py
    # or
    uvicorn api.translator_app:app --host 0.0.0.0 --port 6006 --reload
"""

import logging
import argparse
import uvicorn
from typing import Dict, List, Optional, Union
from fastapi import FastAPI, HTTPException
from fastapi.responses import JSONResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field

from config.services_config import get_translation_config
from translation.service import TranslationService

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

_translation_service: Optional[TranslationService] = None


def get_translation_service() -> TranslationService:
    global _translation_service
    if _translation_service is None:
        _translation_service = TranslationService(get_translation_config())
    return _translation_service


# Request/Response models
class TranslationRequest(BaseModel):
    """Translation request model."""
    text: Union[str, List[str]] = Field(..., description="Text to translate (string or list of strings)")
    target_lang: str = Field(..., description="Target language code (zh, en, ru, etc.)")
    source_lang: Optional[str] = Field(None, description="Source language code (optional, auto-detect if not provided)")
    model: Optional[str] = Field(None, description="Translation model: qwen-mt | deepl | llm")
    scene: Optional[str] = Field(None, description="Translation scene, paired with model routing")
    context: Optional[str] = Field(None, description="Deprecated alias of scene")
    prompt: Optional[str] = Field(None, description="Optional prompt override")

    class Config:
        json_schema_extra = {
            "example": {
                "text": "商品名称",
                "target_lang": "en",
                "source_lang": "zh",
                "model": "llm",
                "scene": "sku_name"
            }
        }


class TranslationResponse(BaseModel):
    """Translation response model."""
    text: Union[str, List[str]] = Field(..., description="Original text (string or list)")
    target_lang: str = Field(..., description="Target language code")
    source_lang: Optional[str] = Field(None, description="Source language code (detected or provided)")
    translated_text: Union[str, List[Optional[str]]] = Field(
        ...,
        description="Translated text (string or list; list elements may be null on failure)",
    )
    status: str = Field(..., description="Translation status")
    model: str = Field(..., description="Translation model used")
    scene: str = Field(..., description="Translation scene used")


# Create FastAPI app
app = FastAPI(
    title="Translation Service API",
    description="RESTful API for text translation using Qwen (default) or DeepL",
    version="1.0.0",
    docs_url="/docs",
    redoc_url="/redoc"
)

# Add CORS middleware
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)


@app.on_event("startup")
async def startup_event():
    """Initialize translator on startup."""
    logger.info("Starting Translation Service API on port 6006")
    try:
        service = get_translation_service()
        logger.info(
            "Translation service ready | default_model=%s available_models=%s",
            service.config.default_model,
            service.available_models,
        )
    except Exception as e:
        logger.error(f"Failed to initialize translator: {e}", exc_info=True)
        raise


@app.get("/health")
async def health_check():
    """Health check endpoint."""
    try:
        service = get_translation_service()
        return {
            "status": "healthy",
            "service": "translation",
            "default_model": service.config.default_model,
            "default_scene": service.config.default_scene,
            "available_models": service.available_models,
            "enabled_capabilities": service.config.enabled_models,
        }
    except Exception as e:
        logger.error(f"Health check failed: {e}")
        return JSONResponse(
            status_code=503,
            content={
                "status": "unhealthy",
                "error": str(e)
            }
        )


@app.post("/translate", response_model=TranslationResponse)
async def translate(request: TranslationRequest):
    """
    Translate text to target language.
    
    Uses a fixed prompt optimized for product SKU name translation.
    The translation is cached in Redis for performance.
    
    Supports both Qwen (default) and DeepL models via the 'model' parameter.
    """
    # 允许 text 为字符串或字符串列表
    if isinstance(request.text, list):
        if not request.text:
            raise HTTPException(
                status_code=400,
                detail="Text list cannot be empty"
            )
    else:
        if not request.text or not request.text.strip():
            raise HTTPException(
                status_code=400,
                detail="Text cannot be empty"
            )
    
    if not request.target_lang:
        raise HTTPException(
            status_code=400,
            detail="target_lang is required"
        )
    
    try:
        service = get_translation_service()
        scene = (request.scene or request.context or service.config.default_scene).strip() or "general"
        model = service.config.normalize_model_name(request.model or service.config.default_model)
        translator = service.get_backend(model)
        raw_text = request.text

        # 如果是列表,并且底层 provider 声明支持 batch,则直接传 list
        if isinstance(raw_text, list) and getattr(translator, "supports_batch", False):
            try:
                translated_list = service.translate(
                    text=raw_text,
                    target_lang=request.target_lang,
                    source_lang=request.source_lang,
                    model=model,
                    scene=scene,
                    prompt=request.prompt,
                )
            except Exception as exc:
                logger.error("Batch translation failed: %s", exc, exc_info=True)
                # 回退到逐条拆分逻辑
                translated_list = None

            if translated_list is not None:
                # 规范化为 List[Optional[str]],并保证长度对应
                if not isinstance(translated_list, list):
                    raise HTTPException(
                        status_code=500,
                        detail="Batch translation provider returned non-list result",
                    )
                normalized: List[Optional[str]] = []
                for idx, item in enumerate(raw_text):
                    if idx < len(translated_list):
                        val = translated_list[idx]
                    else:
                        val = None
                    # 失败语义:失败位置为 None
                    normalized.append(val)

                return TranslationResponse(
                    text=raw_text,
                    target_lang=request.target_lang,
                    source_lang=request.source_lang,
                    translated_text=normalized,
                    status="success",
                    model=str(getattr(translator, "model", model)),
                    scene=scene,
                )

        # 否则:统一走逐条拆分逻辑(包括不支持 batch 的 provider)
        if isinstance(raw_text, list):
            results: List[Optional[str]] = []
            for item in raw_text:
                if item is None or not str(item).strip():
                    # 空元素不视为失败,直接返回原值
                    results.append(item)  # type: ignore[arg-type]
                    continue
                try:
                    out = service.translate(
                        text=str(item),
                        target_lang=request.target_lang,
                        source_lang=request.source_lang,
                        model=model,
                        scene=scene,
                        prompt=request.prompt,
                    )
                except Exception as exc:
                    logger.warning("Per-item translation failed: %s", exc, exc_info=True)
                    out = None
                # 失败语义:该元素为 None
                results.append(out)

            return TranslationResponse(
                text=raw_text,
                target_lang=request.target_lang,
                source_lang=request.source_lang,
                translated_text=results,
                status="success",
                model=str(getattr(translator, "model", model)),
                scene=scene,
            )

        # 单文本模式:保持原有严格失败语义
        translated_text = service.translate(
            text=raw_text,
            target_lang=request.target_lang,
            source_lang=request.source_lang,
            model=model,
            scene=scene,
            prompt=request.prompt,
        )

        if translated_text is None:
            raise HTTPException(
                status_code=500,
                detail="Translation failed"
            )

        return TranslationResponse(
            text=raw_text,
            target_lang=request.target_lang,
            source_lang=request.source_lang,
            translated_text=translated_text,
            status="success",
            model=str(getattr(translator, "model", model)),
            scene=scene,
        )
    
    except HTTPException:
        raise
    except Exception as e:
        logger.error(f"Translation error: {e}", exc_info=True)
        raise HTTPException(
            status_code=500,
            detail=f"Translation error: {str(e)}"
        )


@app.get("/")
async def root():
    """Root endpoint with API information."""
    return {
        "service": "Translation Service API",
        "version": "1.0.0",
        "status": "running",
        "endpoints": {
            "translate": "POST /translate",
            "health": "GET /health",
            "docs": "GET /docs"
        }
    }


if __name__ == "__main__":
    parser = argparse.ArgumentParser(description='Start translation API service')
    parser.add_argument('--host', default='0.0.0.0', help='Host to bind to')
    parser.add_argument('--port', type=int, default=6006, help='Port to bind to')
    parser.add_argument('--reload', action='store_true', help='Enable auto-reload')
    args = parser.parse_args()

    # Run server
    uvicorn.run(
        "api.translator_app:app",
        host=args.host,
        port=args.port,
        reload=args.reload
    )