Blame view

api/translator_app.py 16.8 KB
0fd2f875   tangwang   translate
1
  """Translator service HTTP app."""
768ad710   tangwang   MySQL到ES字段映射说明-业务...
2
  
768ad710   tangwang   MySQL到ES字段映射说明-业务...
3
  import argparse
0fd2f875   tangwang   translate
4
  import logging
cd4ce66d   tangwang   trans logs
5
6
7
  import os
  import pathlib
  import time
14e67b71   tangwang   分句后的 batching 现在是...
8
  import uuid
0fd2f875   tangwang   translate
9
10
  from contextlib import asynccontextmanager
  from functools import lru_cache
cd4ce66d   tangwang   trans logs
11
  from logging.handlers import TimedRotatingFileHandler
0fd2f875   tangwang   translate
12
13
  from typing import List, Optional, Union
  
768ad710   tangwang   MySQL到ES字段映射说明-业务...
14
  import uvicorn
14e67b71   tangwang   分句后的 batching 现在是...
15
  from fastapi import FastAPI, HTTPException, Request
768ad710   tangwang   MySQL到ES字段映射说明-业务...
16
  from fastapi.middleware.cors import CORSMiddleware
0fd2f875   tangwang   translate
17
18
  from fastapi.responses import JSONResponse
  from pydantic import BaseModel, ConfigDict, Field
768ad710   tangwang   MySQL到ES字段映射说明-业务...
19
  
d4cadc13   tangwang   翻译重构
20
  from config.services_config import get_translation_config
14e67b71   tangwang   分句后的 batching 现在是...
21
22
23
24
25
  from translation.logging_utils import (
      TranslationRequestFilter,
      bind_translation_request_id,
      reset_translation_request_id,
  )
5e4dc8e4   tangwang   翻译架构按“一个翻译服务 +
26
  from translation.service import TranslationService
0fd2f875   tangwang   translate
27
28
29
30
31
  from translation.settings import (
      get_enabled_translation_models,
      normalize_translation_model,
      normalize_translation_scene,
  )
768ad710   tangwang   MySQL到ES字段映射说明-业务...
32
  
cd4ce66d   tangwang   trans logs
33
34
35
36
37
38
39
40
41
  
  def configure_translator_logging() -> None:
      log_dir = pathlib.Path("logs")
      verbose_dir = log_dir / "verbose"
      log_dir.mkdir(exist_ok=True)
      verbose_dir.mkdir(parents=True, exist_ok=True)
  
      log_level = os.getenv("LOG_LEVEL", "INFO").upper()
      numeric_level = getattr(logging, log_level, logging.INFO)
14e67b71   tangwang   分句后的 batching 现在是...
42
43
      formatter = logging.Formatter("%(asctime)s | reqid:%(reqid)s | %(name)s | %(levelname)s | %(message)s")
      request_filter = TranslationRequestFilter()
cd4ce66d   tangwang   trans logs
44
45
46
47
48
49
50
51
  
      root_logger = logging.getLogger()
      root_logger.setLevel(numeric_level)
      root_logger.handlers.clear()
  
      console_handler = logging.StreamHandler()
      console_handler.setLevel(numeric_level)
      console_handler.setFormatter(formatter)
14e67b71   tangwang   分句后的 batching 现在是...
52
      console_handler.addFilter(request_filter)
cd4ce66d   tangwang   trans logs
53
54
55
56
57
58
59
60
61
62
63
      root_logger.addHandler(console_handler)
  
      file_handler = TimedRotatingFileHandler(
          filename=log_dir / "translator_api.log",
          when="midnight",
          interval=1,
          backupCount=30,
          encoding="utf-8",
      )
      file_handler.setLevel(numeric_level)
      file_handler.setFormatter(formatter)
14e67b71   tangwang   分句后的 batching 现在是...
64
      file_handler.addFilter(request_filter)
cd4ce66d   tangwang   trans logs
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
      root_logger.addHandler(file_handler)
  
      verbose_logger = logging.getLogger("translator.verbose")
      verbose_logger.setLevel(numeric_level)
      verbose_logger.handlers.clear()
      verbose_logger.propagate = False
  
      verbose_handler = TimedRotatingFileHandler(
          filename=verbose_dir / "translator_verbose.log",
          when="midnight",
          interval=1,
          backupCount=30,
          encoding="utf-8",
      )
      verbose_handler.setLevel(numeric_level)
      verbose_handler.setFormatter(formatter)
14e67b71   tangwang   分句后的 batching 现在是...
81
      verbose_handler.addFilter(request_filter)
cd4ce66d   tangwang   trans logs
82
83
84
85
      verbose_logger.addHandler(verbose_handler)
  
  
  configure_translator_logging()
768ad710   tangwang   MySQL到ES字段映射说明-业务...
86
  logger = logging.getLogger(__name__)
cd4ce66d   tangwang   trans logs
87
  verbose_logger = logging.getLogger("translator.verbose")
768ad710   tangwang   MySQL到ES字段映射说明-业务...
88
  
768ad710   tangwang   MySQL到ES字段映射说明-业务...
89
  
0fd2f875   tangwang   translate
90
  @lru_cache(maxsize=1)
5e4dc8e4   tangwang   翻译架构按“一个翻译服务 +
91
  def get_translation_service() -> TranslationService:
0fd2f875   tangwang   translate
92
      return TranslationService(get_translation_config())
768ad710   tangwang   MySQL到ES字段映射说明-业务...
93
94
95
96
97
  
  
  # Request/Response models
  class TranslationRequest(BaseModel):
      """Translation request model."""
768ad710   tangwang   MySQL到ES字段映射说明-业务...
98
  
0fd2f875   tangwang   translate
99
100
      model_config = ConfigDict(
          json_schema_extra={
768ad710   tangwang   MySQL到ES字段映射说明-业务...
101
102
103
              "example": {
                  "text": "商品名称",
                  "target_lang": "en",
3cd09b3b   tangwang   翻译接口改为调用qwen-mt-f...
104
                  "source_lang": "zh",
d4cadc13   tangwang   翻译重构
105
                  "model": "llm",
0fd2f875   tangwang   translate
106
                  "scene": "sku_name",
768ad710   tangwang   MySQL到ES字段映射说明-业务...
107
108
              }
          }
0fd2f875   tangwang   translate
109
110
111
112
113
114
115
      )
  
      text: Union[str, List[str]] = Field(..., description="Text to translate (string or list of strings)")
      target_lang: str = Field(..., description="Target language code (zh, en, ru, etc.)")
      source_lang: Optional[str] = Field(None, description="Source language code (optional, auto-detect if not provided)")
      model: Optional[str] = Field(None, description="Enabled translation capability name")
      scene: Optional[str] = Field(None, description="Translation scene, paired with model routing")
768ad710   tangwang   MySQL到ES字段映射说明-业务...
116
117
118
119
  
  
  class TranslationResponse(BaseModel):
      """Translation response model."""
6f7840cf   tangwang   refactor: rename ...
120
      text: Union[str, List[str]] = Field(..., description="Original text (string or list)")
768ad710   tangwang   MySQL到ES字段映射说明-业务...
121
122
      target_lang: str = Field(..., description="Target language code")
      source_lang: Optional[str] = Field(None, description="Source language code (detected or provided)")
6f7840cf   tangwang   refactor: rename ...
123
124
125
126
      translated_text: Union[str, List[Optional[str]]] = Field(
          ...,
          description="Translated text (string or list; list elements may be null on failure)",
      )
768ad710   tangwang   MySQL到ES字段映射说明-业务...
127
      status: str = Field(..., description="Translation status")
3cd09b3b   tangwang   翻译接口改为调用qwen-mt-f...
128
      model: str = Field(..., description="Translation model used")
5e4dc8e4   tangwang   翻译架构按“一个翻译服务 +
129
      scene: str = Field(..., description="Translation scene used")
768ad710   tangwang   MySQL到ES字段映射说明-业务...
130
131
  
  
0fd2f875   tangwang   translate
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
  def _normalize_scene(service: TranslationService, scene: Optional[str]) -> str:
      return normalize_translation_scene(service.config, scene)
  
  
  def _normalize_model(service: TranslationService, model: Optional[str]) -> str:
      return normalize_translation_model(service.config, model or service.config["default_model"])
  
  
  def _ensure_valid_text(text: Union[str, List[str]]) -> None:
      if isinstance(text, list):
          if not text:
              raise HTTPException(status_code=400, detail="Text list cannot be empty")
          return
      if not text or not text.strip():
          raise HTTPException(status_code=400, detail="Text cannot be empty")
  
  
  def _normalize_batch_result(
      original: List[str],
      translated: Union[str, List[Optional[str]], None],
  ) -> List[Optional[str]]:
      if translated is None:
          return [None for _ in original]
      if not isinstance(translated, list):
          raise HTTPException(status_code=500, detail="Batch translation provider returned non-list result")
      return [translated[idx] if idx < len(translated) else None for idx, _ in enumerate(original)]
  
  
cd4ce66d   tangwang   trans logs
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
  def _text_preview(text: Optional[str], limit: int = 20) -> str:
      normalized = str(text or "").replace("\n", "\\n")
      return normalized[:limit]
  
  
  def _request_metrics(text: Union[str, List[str]]) -> dict:
      if isinstance(text, list):
          lengths = [len(str(item or "")) for item in text]
          return {
              "request_count": len(text),
              "lengths": lengths,
              "first_preview": _text_preview(text[0] if text else ""),
          }
      return {
          "request_count": 1,
          "lengths": [len(str(text or ""))],
          "first_preview": _text_preview(str(text or "")),
      }
  
  
  def _result_preview(translated: Union[str, List[Optional[str]], None]) -> str:
      if isinstance(translated, list):
          if not translated:
              return ""
          first = translated[0]
          return _text_preview("" if first is None else str(first))
      if translated is None:
          return ""
      return _text_preview(str(translated))
  
  
14e67b71   tangwang   分句后的 batching 现在是...
191
192
193
194
195
196
197
  def _resolve_request_id(http_request: Request) -> str:
      header_value = http_request.headers.get("X-Request-ID")
      if header_value and header_value.strip():
          return header_value.strip()[:32]
      return str(uuid.uuid4())[:8]
  
  
0fd2f875   tangwang   translate
198
199
200
201
202
203
204
205
206
207
  def _translate_batch(
      service: TranslationService,
      raw_text: List[str],
      *,
      target_lang: str,
      source_lang: Optional[str],
      model: str,
      scene: str,
  ) -> List[Optional[str]]:
      backend = service.get_backend(model)
cd4ce66d   tangwang   trans logs
208
      logger.info(
14e67b71   tangwang   分句后的 batching 现在是...
209
210
          "Translation batch dispatch | execution=%s count=%s lengths=%s first_preview=%s",
          "backend-batch" if getattr(backend, "supports_batch", False) else "per-item",
cd4ce66d   tangwang   trans logs
211
212
213
          len(raw_text),
          [len(str(item or "")) for item in raw_text],
          _text_preview(raw_text[0] if raw_text else ""),
cd4ce66d   tangwang   trans logs
214
      )
0fd2f875   tangwang   translate
215
216
217
218
219
220
221
222
223
      if getattr(backend, "supports_batch", False):
          try:
              translated = service.translate(
                  text=raw_text,
                  target_lang=target_lang,
                  source_lang=source_lang,
                  model=model,
                  scene=scene,
              )
cd4ce66d   tangwang   trans logs
224
225
226
227
228
229
230
              verbose_logger.info(
                  "Translation batch result | model=%s scene=%s count=%s first_result=%s",
                  model,
                  scene,
                  len(raw_text),
                  _result_preview(translated),
              )
0fd2f875   tangwang   translate
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
              return _normalize_batch_result(raw_text, translated)
          except ValueError:
              raise
          except Exception as exc:
              logger.error("Batch translation failed: %s", exc, exc_info=True)
  
      results: List[Optional[str]] = []
      for item in raw_text:
          if item is None or not str(item).strip():
              results.append(item)  # type: ignore[arg-type]
              continue
          try:
              out = service.translate(
                  text=str(item),
                  target_lang=target_lang,
                  source_lang=source_lang,
                  model=model,
                  scene=scene,
              )
          except ValueError:
              raise
          except Exception as exc:
cd4ce66d   tangwang   trans logs
253
254
255
256
257
258
259
260
261
262
263
              logger.warning(
                  "Per-item translation failed | model=%s scene=%s target_lang=%s source_lang=%s item_len=%s item_preview=%s error=%s",
                  model,
                  scene,
                  target_lang,
                  source_lang or "auto",
                  len(str(item or "")),
                  _text_preview(str(item or "")),
                  exc,
                  exc_info=True,
              )
0fd2f875   tangwang   translate
264
265
266
267
268
269
270
              out = None
          results.append(out)
      return results
  
  
  @asynccontextmanager
  async def lifespan(_: FastAPI):
cd4ce66d   tangwang   trans logs
271
      """Initialize all enabled translation backends on process startup."""
0fd2f875   tangwang   translate
272
273
      logger.info("Starting Translation Service API")
      service = get_translation_service()
0fd2f875   tangwang   translate
274
      logger.info(
cd4ce66d   tangwang   trans logs
275
          "Translation service ready | default_model=%s default_scene=%s available_models=%s loaded_models=%s",
0fd2f875   tangwang   translate
276
          service.config["default_model"],
cd4ce66d   tangwang   trans logs
277
          service.config["default_scene"],
0fd2f875   tangwang   translate
278
279
280
281
          service.available_models,
          service.loaded_models,
      )
      logger.info(
cd4ce66d   tangwang   trans logs
282
283
284
285
286
287
288
289
          "Translation backends initialized on startup | models=%s",
          service.loaded_models,
      )
      verbose_logger.info(
          "Translation startup detail | capabilities=%s cache_ttl_seconds=%s cache_sliding_expiration=%s",
          service.available_models,
          service.config["cache"]["ttl_seconds"],
          service.config["cache"]["sliding_expiration"],
0fd2f875   tangwang   translate
290
291
292
293
      )
      yield
  
  
768ad710   tangwang   MySQL到ES字段映射说明-业务...
294
295
296
  # Create FastAPI app
  app = FastAPI(
      title="Translation Service API",
0fd2f875   tangwang   translate
297
      description="Translation service with pluggable capabilities and scene routing",
768ad710   tangwang   MySQL到ES字段映射说明-业务...
298
299
      version="1.0.0",
      docs_url="/docs",
0fd2f875   tangwang   translate
300
301
      redoc_url="/redoc",
      lifespan=lifespan,
768ad710   tangwang   MySQL到ES字段映射说明-业务...
302
303
304
305
306
307
308
309
310
311
312
313
  )
  
  # Add CORS middleware
  app.add_middleware(
      CORSMiddleware,
      allow_origins=["*"],
      allow_credentials=True,
      allow_methods=["*"],
      allow_headers=["*"],
  )
  
  
768ad710   tangwang   MySQL到ES字段映射说明-业务...
314
315
316
317
  @app.get("/health")
  async def health_check():
      """Health check endpoint."""
      try:
5e4dc8e4   tangwang   翻译架构按“一个翻译服务 +
318
          service = get_translation_service()
cd4ce66d   tangwang   trans logs
319
320
321
322
323
324
          logger.info(
              "Health check | default_model=%s default_scene=%s loaded_models=%s",
              service.config["default_model"],
              service.config["default_scene"],
              service.loaded_models,
          )
768ad710   tangwang   MySQL到ES字段映射说明-业务...
325
326
327
          return {
              "status": "healthy",
              "service": "translation",
0fd2f875   tangwang   translate
328
329
              "default_model": service.config["default_model"],
              "default_scene": service.config["default_scene"],
5e4dc8e4   tangwang   翻译架构按“一个翻译服务 +
330
              "available_models": service.available_models,
0fd2f875   tangwang   translate
331
332
              "enabled_capabilities": get_enabled_translation_models(service.config),
              "loaded_models": service.loaded_models,
768ad710   tangwang   MySQL到ES字段映射说明-业务...
333
334
335
336
337
338
339
340
341
342
343
344
345
          }
      except Exception as e:
          logger.error(f"Health check failed: {e}")
          return JSONResponse(
              status_code=503,
              content={
                  "status": "unhealthy",
                  "error": str(e)
              }
          )
  
  
  @app.post("/translate", response_model=TranslationResponse)
14e67b71   tangwang   分句后的 batching 现在是...
346
  async def translate(request: TranslationRequest, http_request: Request):
0fd2f875   tangwang   translate
347
348
      _ensure_valid_text(request.text)
  
768ad710   tangwang   MySQL到ES字段映射说明-业务...
349
      if not request.target_lang:
0fd2f875   tangwang   translate
350
351
          raise HTTPException(status_code=400, detail="target_lang is required")
  
14e67b71   tangwang   分句后的 batching 现在是...
352
      _, request_token = bind_translation_request_id(_resolve_request_id(http_request))
cd4ce66d   tangwang   trans logs
353
      request_started = time.perf_counter()
768ad710   tangwang   MySQL到ES字段映射说明-业务...
354
      try:
5e4dc8e4   tangwang   翻译架构按“一个翻译服务 +
355
          service = get_translation_service()
0fd2f875   tangwang   translate
356
357
          scene = _normalize_scene(service, request.scene)
          model = _normalize_model(service, request.model)
5e4dc8e4   tangwang   翻译架构按“一个翻译服务 +
358
          translator = service.get_backend(model)
6f7840cf   tangwang   refactor: rename ...
359
          raw_text = request.text
cd4ce66d   tangwang   trans logs
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
          metrics = _request_metrics(raw_text)
          logger.info(
              "Translation request | model=%s scene=%s target_lang=%s source_lang=%s count=%s lengths=%s first_preview=%s backend=%s",
              model,
              scene,
              request.target_lang,
              request.source_lang or "auto",
              metrics["request_count"],
              metrics["lengths"],
              metrics["first_preview"],
              getattr(translator, "model", model),
          )
          verbose_logger.info(
              "Translation request detail | model=%s scene=%s target_lang=%s source_lang=%s payload=%s",
              model,
              scene,
              request.target_lang,
              request.source_lang or "auto",
              raw_text,
          )
6f7840cf   tangwang   refactor: rename ...
380
  
6f7840cf   tangwang   refactor: rename ...
381
          if isinstance(raw_text, list):
0fd2f875   tangwang   translate
382
383
384
385
386
387
388
389
              results = _translate_batch(
                  service,
                  raw_text,
                  target_lang=request.target_lang,
                  source_lang=request.source_lang,
                  model=model,
                  scene=scene,
              )
cd4ce66d   tangwang   trans logs
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
              latency_ms = (time.perf_counter() - request_started) * 1000
              logger.info(
                  "Translation response | model=%s scene=%s count=%s first_result=%s latency_ms=%.2f",
                  model,
                  scene,
                  len(raw_text),
                  _result_preview(results),
                  latency_ms,
              )
              verbose_logger.info(
                  "Translation response detail | model=%s scene=%s translated=%s latency_ms=%.2f",
                  model,
                  scene,
                  results,
                  latency_ms,
              )
6f7840cf   tangwang   refactor: rename ...
406
407
408
409
410
411
412
              return TranslationResponse(
                  text=raw_text,
                  target_lang=request.target_lang,
                  source_lang=request.source_lang,
                  translated_text=results,
                  status="success",
                  model=str(getattr(translator, "model", model)),
5e4dc8e4   tangwang   翻译架构按“一个翻译服务 +
413
                  scene=scene,
6f7840cf   tangwang   refactor: rename ...
414
415
              )
  
5e4dc8e4   tangwang   翻译架构按“一个翻译服务 +
416
          translated_text = service.translate(
6f7840cf   tangwang   refactor: rename ...
417
              text=raw_text,
768ad710   tangwang   MySQL到ES字段映射说明-业务...
418
419
              target_lang=request.target_lang,
              source_lang=request.source_lang,
5e4dc8e4   tangwang   翻译架构按“一个翻译服务 +
420
421
              model=model,
              scene=scene,
768ad710   tangwang   MySQL到ES字段映射说明-业务...
422
          )
6f7840cf   tangwang   refactor: rename ...
423
  
768ad710   tangwang   MySQL到ES字段映射说明-业务...
424
          if translated_text is None:
0fd2f875   tangwang   translate
425
              raise HTTPException(status_code=500, detail="Translation failed")
6f7840cf   tangwang   refactor: rename ...
426
  
cd4ce66d   tangwang   trans logs
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
          latency_ms = (time.perf_counter() - request_started) * 1000
          logger.info(
              "Translation response | model=%s scene=%s count=1 first_result=%s latency_ms=%.2f",
              model,
              scene,
              _result_preview(translated_text),
              latency_ms,
          )
          verbose_logger.info(
              "Translation response detail | model=%s scene=%s translated=%s latency_ms=%.2f",
              model,
              scene,
              translated_text,
              latency_ms,
          )
  
768ad710   tangwang   MySQL到ES字段映射说明-业务...
443
          return TranslationResponse(
6f7840cf   tangwang   refactor: rename ...
444
              text=raw_text,
768ad710   tangwang   MySQL到ES字段映射说明-业务...
445
446
447
              target_lang=request.target_lang,
              source_lang=request.source_lang,
              translated_text=translated_text,
3cd09b3b   tangwang   翻译接口改为调用qwen-mt-f...
448
              status="success",
5e4dc8e4   tangwang   翻译架构按“一个翻译服务 +
449
450
              model=str(getattr(translator, "model", model)),
              scene=scene,
768ad710   tangwang   MySQL到ES字段映射说明-业务...
451
452
          )
      
cd4ce66d   tangwang   trans logs
453
454
455
456
457
458
459
460
      except HTTPException as exc:
          latency_ms = (time.perf_counter() - request_started) * 1000
          logger.warning(
              "Translation request failed | status_code=%s detail=%s latency_ms=%.2f",
              exc.status_code,
              exc.detail,
              latency_ms,
          )
768ad710   tangwang   MySQL到ES字段映射说明-业务...
461
          raise
0fd2f875   tangwang   translate
462
      except ValueError as e:
cd4ce66d   tangwang   trans logs
463
          latency_ms = (time.perf_counter() - request_started) * 1000
14e67b71   tangwang   分句后的 batching 现在是...
464
          logger.warning("Translation validation error | error=%s latency_ms=%.2f", e, latency_ms)
0fd2f875   tangwang   translate
465
          raise HTTPException(status_code=400, detail=str(e)) from e
768ad710   tangwang   MySQL到ES字段映射说明-业务...
466
      except Exception as e:
cd4ce66d   tangwang   trans logs
467
468
          latency_ms = (time.perf_counter() - request_started) * 1000
          logger.error("Translation error | error=%s latency_ms=%.2f", e, latency_ms, exc_info=True)
0fd2f875   tangwang   translate
469
          raise HTTPException(status_code=500, detail=f"Translation error: {str(e)}")
14e67b71   tangwang   分句后的 batching 现在是...
470
471
      finally:
          reset_translation_request_id(request_token)
768ad710   tangwang   MySQL到ES字段映射说明-业务...
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
  
  
  @app.get("/")
  async def root():
      """Root endpoint with API information."""
      return {
          "service": "Translation Service API",
          "version": "1.0.0",
          "status": "running",
          "endpoints": {
              "translate": "POST /translate",
              "health": "GET /health",
              "docs": "GET /docs"
          }
      }
  
  
  if __name__ == "__main__":
      parser = argparse.ArgumentParser(description='Start translation API service')
      parser.add_argument('--host', default='0.0.0.0', help='Host to bind to')
      parser.add_argument('--port', type=int, default=6006, help='Port to bind to')
      parser.add_argument('--reload', action='store_true', help='Enable auto-reload')
      args = parser.parse_args()
  
      # Run server
      uvicorn.run(
          "api.translator_app:app",
          host=args.host,
          port=args.port,
          reload=args.reload
      )