#!/usr/bin/env python3 from __future__ import annotations import argparse import json import math import random import sys from dataclasses import dataclass from pathlib import Path from typing import Any, Dict, Iterable, List, Sequence import numpy as np import pandas as pd import torch from sklearn.model_selection import GroupKFold from sklearn.preprocessing import StandardScaler PROJECT_ROOT = Path(__file__).resolve().parents[2] if str(PROJECT_ROOT) not in sys.path: sys.path.insert(0, str(PROJECT_ROOT)) from scripts.evaluation.eval_framework.constants import ( DEFAULT_ARTIFACT_ROOT, RELEVANCE_GRADE_MAP, RELEVANCE_LV0, RELEVANCE_LV1, RELEVANCE_LV2, RELEVANCE_LV3, ) from scripts.evaluation.eval_framework.metrics import aggregate_metrics, compute_query_metrics from scripts.evaluation.eval_framework.store import EvalStore from scripts.evaluation.eval_framework.utils import ensure_dir, utc_timestamp LABELS_BY_GRADE = { 3: RELEVANCE_LV3, 2: RELEVANCE_LV2, 1: RELEVANCE_LV1, 0: RELEVANCE_LV0, } @dataclass class FoldArtifacts: fold_id: int train_queries: list[str] test_queries: list[str] best_epoch: int pair_count_train: int pair_count_eval: int metrics_fm: dict[str, float] metrics_baseline: dict[str, dict[str, float]] class FactorizationMachine(torch.nn.Module): def __init__(self, num_features: int, k: int) -> None: super().__init__() self.bias = torch.nn.Parameter(torch.zeros(1)) self.linear = torch.nn.Parameter(torch.zeros(num_features)) self.v = torch.nn.Parameter(torch.empty(num_features, k)) torch.nn.init.xavier_uniform_(self.v) def forward(self, x: torch.Tensor) -> torch.Tensor: linear_term = self.bias + x @ self.linear xv = x @ self.v x2v2 = (x * x) @ (self.v * self.v) interactions = 0.5 * torch.sum(xv * xv - x2v2, dim=1) return linear_term + interactions def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(description="Offline LTR fitting from backend_verbose.log using FM + RankNet loss") parser.add_argument("--tenant-id", default="163") parser.add_argument("--log-path", default=str(PROJECT_ROOT / "logs" / "backend_verbose.log")) parser.add_argument( "--db-path", default=str(DEFAULT_ARTIFACT_ROOT / "search_eval.sqlite3"), ) parser.add_argument("--top-k", type=int, default=100) parser.add_argument("--folds", type=int, default=5) parser.add_argument("--epochs", type=int, default=60) parser.add_argument("--batch-size", type=int, default=4096) parser.add_argument("--lr", type=float, default=0.01) parser.add_argument("--weight-decay", type=float, default=1e-5) parser.add_argument("--fm-dim", type=int, default=8) parser.add_argument("--seed", type=int, default=20260402) parser.add_argument("--holdout-query-count", type=int, default=10) return parser def set_seed(seed: int) -> None: random.seed(seed) np.random.seed(seed) torch.manual_seed(seed) def choose_holdout_queries(queries: Sequence[str], holdout_count: int, seed: int) -> tuple[list[str], list[str]]: uniq = list(dict.fromkeys(queries)) if len(uniq) <= 1: return uniq, [] count = max(1, min(int(holdout_count), len(uniq) - 1)) rng = random.Random(seed) shuffled = uniq[:] rng.shuffle(shuffled) holdout = sorted(shuffled[:count]) holdout_set = set(holdout) train = sorted(q for q in uniq if q not in holdout_set) return train, holdout def split_by_queries( df: pd.DataFrame, train_queries: Sequence[str], test_queries: Sequence[str], ) -> tuple[pd.DataFrame, pd.DataFrame]: train_df = df[df["query"].isin(set(train_queries))].copy() test_df = df[df["query"].isin(set(test_queries))].copy() return train_df, test_df def _json_payload_from_log_line(line: str) -> dict[str, Any] | None: marker = " | backend.verbose | " if marker not in line: return None try: return json.loads(line.split(marker, 1)[1]) except json.JSONDecodeError: return None def load_log_rows(log_path: Path, top_k: int) -> pd.DataFrame: rows: list[dict[str, Any]] = [] for raw_line in log_path.read_text(encoding="utf-8").splitlines(): if '"event":"search_response"' not in raw_line: continue payload = _json_payload_from_log_line(raw_line) if not payload: continue response = payload.get("response") or {} debug_info = response.get("debug_info") or {} query_analysis = debug_info.get("query_analysis") or {} query = ( query_analysis.get("original_query") or response.get("query_info", {}).get("original_query") or response.get("query_info", {}).get("rewritten_query") ) if not query: continue retrieval_plan = debug_info.get("retrieval_plan") or {} text_knn_plan = retrieval_plan.get("text_knn") or {} image_knn_plan = retrieval_plan.get("image_knn") or {} for row in (debug_info.get("per_result") or [])[:top_k]: ltr = dict(row.get("ltr_features") or {}) ranking_funnel = row.get("ranking_funnel") or {} coarse = (ranking_funnel.get("coarse_rank") or {}).get("ltr_features") or {} rerank = (ranking_funnel.get("rerank") or {}).get("ltr_features") or {} final_rank = int(row.get("final_rank") or top_k + 1) initial_rank = int(row.get("initial_rank") or final_rank) coarse_rank = (ranking_funnel.get("coarse_rank") or {}).get("rank") rerank_rank = (ranking_funnel.get("rerank") or {}).get("rank") title_zh = None title_multilingual = row.get("title_multilingual") if isinstance(title_multilingual, dict): title_zh = title_multilingual.get("zh") or title_multilingual.get("en") rows.append( { "query": str(query), "spu_id": str(row.get("spu_id") or ""), "title": title_zh, "final_rank": final_rank, "initial_rank": initial_rank, "coarse_rank": None if coarse_rank is None else int(coarse_rank), "rerank_rank": None if rerank_rank is None else int(rerank_rank), "es_score_normalized": float(row.get("es_score_normalized") or 0.0), "coarse_score": float(row.get("coarse_score") or 0.0), "fused_score": float(row.get("fused_score") or row.get("score") or 0.0), "text_knn_enabled": float(bool(text_knn_plan.get("enabled"))), "text_knn_long_plan": float(bool(text_knn_plan.get("is_long_query_plan"))), "text_knn_k": float(text_knn_plan.get("k") or 0.0), "text_knn_num_candidates": float(text_knn_plan.get("num_candidates") or 0.0), "image_knn_enabled": float(bool(image_knn_plan.get("enabled"))), "image_knn_k": float(image_knn_plan.get("k") or 0.0), "image_knn_num_candidates": float(image_knn_plan.get("num_candidates") or 0.0), "coarse_stage_score": float(coarse.get("stage_score") or row.get("coarse_score") or 0.0), "rerank_stage_score": float(rerank.get("stage_score") or ltr.get("stage_score") or row.get("fused_score") or 0.0), **{k: _safe_float(v) for k, v in ltr.items()}, } ) df = pd.DataFrame(rows) if df.empty: raise RuntimeError(f"no search_response rows found in {log_path}") return df def _safe_float(value: Any) -> float: if value is None: return 0.0 if isinstance(value, bool): return float(value) try: if isinstance(value, (int, float)): if math.isnan(float(value)) or math.isinf(float(value)): return 0.0 return float(value) return float(value) except (TypeError, ValueError): return 0.0 def attach_labels(df: pd.DataFrame, store: EvalStore, tenant_id: str) -> pd.DataFrame: labels_by_query: dict[str, dict[str, str]] = {} rows: list[dict[str, Any]] = [] for query, group in df.groupby("query", sort=False): labels_by_query[query] = store.get_labels(tenant_id, query) label_map = labels_by_query[query] for row in group.to_dict("records"): label = label_map.get(row["spu_id"]) if label is None: continue row["label"] = label row["grade"] = int(RELEVANCE_GRADE_MAP.get(label, 0)) rows.append(row) labeled = pd.DataFrame(rows) if labeled.empty: raise RuntimeError("no labeled rows matched the log rows") return labeled def add_engineered_features(df: pd.DataFrame) -> tuple[pd.DataFrame, list[str]]: base_numeric = [ "es_score", "text_score", "knn_score", "rerank_score", "fine_score", "source_score", "translation_score", "text_primary_score", "text_support_score", "text_knn_score", "image_knn_score", "knn_primary_score", "knn_support_score", "style_boost", "stage_score", "es_score_normalized", "coarse_score", "fused_score", "coarse_stage_score", "rerank_stage_score", "text_knn_k", "text_knn_num_candidates", "image_knn_k", "image_knn_num_candidates", ] base_binary = [ "has_text_match", "has_translation_match", "has_text_knn", "has_image_knn", "text_score_fallback_to_es", "has_style_boost", "text_knn_enabled", "text_knn_long_plan", "image_knn_enabled", ] out = df.copy() engineered: dict[str, Any] = {} feature_names: list[str] = [] for name in base_numeric: if name not in out.columns: out[name] = 0.0 out[name] = out[name].astype(float).fillna(0.0) positive = np.clip(out[name].to_numpy(dtype=np.float64), a_min=0.0, a_max=None) clipped = np.clip(positive, a_min=0.0, a_max=1e6) engineered[f"{name}__raw"] = clipped engineered[f"{name}__log1p"] = np.log1p(clipped) engineered[f"{name}__sqrt"] = np.sqrt(clipped) engineered[f"{name}__square"] = np.square(np.clip(clipped, 0.0, 1e3)) engineered[f"{name}__inv"] = 1.0 / (1.0 + clipped) feature_names.extend( [ f"{name}__raw", f"{name}__log1p", f"{name}__sqrt", f"{name}__square", f"{name}__inv", ] ) for name in base_binary: if name not in out.columns: out[name] = 0.0 out[name] = out[name].astype(float).fillna(0.0) feature_names.append(name) for rank_name in ("initial_rank", "coarse_rank", "rerank_rank", "final_rank"): if rank_name not in out.columns: out[rank_name] = 0.0 rank = out[rank_name].fillna(out["final_rank"]).astype(float) engineered[f"{rank_name}__inv"] = 1.0 / np.maximum(rank, 1.0) engineered[f"{rank_name}__log"] = np.log1p(np.maximum(rank, 1.0)) feature_names.extend([f"{rank_name}__inv", f"{rank_name}__log"]) eps = 1e-6 engineered["translation_share"] = out["translation_score"] / (out["text_score"] + eps) engineered["source_share"] = out["source_score"] / (out["text_score"] + eps) engineered["image_knn_share"] = out["image_knn_score"] / (out["knn_score"] + eps) engineered["text_knn_share"] = out["text_knn_score"] / (out["knn_score"] + eps) engineered["rerank_x_text"] = out["rerank_score"] * out["text_score"] engineered["rerank_x_knn"] = out["rerank_score"] * out["knn_score"] engineered["rerank_x_es"] = out["rerank_score"] * out["es_score"] engineered["text_minus_es"] = out["text_score"] - out["es_score"] engineered["knn_minus_text"] = out["knn_score"] - out["text_score"] engineered["coarse_minus_rerank"] = out["coarse_stage_score"] - out["rerank_stage_score"] feature_names.extend( [ "translation_share", "source_share", "image_knn_share", "text_knn_share", "rerank_x_text", "rerank_x_knn", "rerank_x_es", "text_minus_es", "knn_minus_text", "coarse_minus_rerank", ] ) out = pd.concat([out, pd.DataFrame(engineered, index=out.index)], axis=1) out[feature_names] = out[feature_names].replace([np.inf, -np.inf], 0.0).fillna(0.0) return out, feature_names def build_pair_indices(grades: np.ndarray, qids: np.ndarray) -> np.ndarray: pairs: list[list[int]] = [] for qid in np.unique(qids): idx = np.flatnonzero(qids == qid) g = grades[idx] order = np.argsort(-g) idx = idx[order] g = g[order] for left in range(len(idx)): for right in range(left + 1, len(idx)): if g[left] <= g[right]: continue pairs.append([int(idx[left]), int(idx[right])]) if not pairs: return np.zeros((0, 2), dtype=np.int64) return np.asarray(pairs, dtype=np.int64) def train_one_fold( *, x_train: np.ndarray, grades_train: np.ndarray, qids_train: np.ndarray, x_eval: np.ndarray, grades_eval: np.ndarray, qids_eval: np.ndarray, fm_dim: int, epochs: int, batch_size: int, lr: float, weight_decay: float, seed: int, ) -> tuple[FactorizationMachine, int, int, int]: set_seed(seed) model = FactorizationMachine(num_features=x_train.shape[1], k=fm_dim) optimizer = torch.optim.Adam(model.parameters(), lr=lr, weight_decay=weight_decay) train_pairs = build_pair_indices(grades_train, qids_train) eval_pairs = build_pair_indices(grades_eval, qids_eval) if train_pairs.size == 0: raise RuntimeError("train fold contains no valid label-different pairs") x_train_t = torch.tensor(x_train, dtype=torch.float32) x_eval_t = torch.tensor(x_eval, dtype=torch.float32) pair_train_t = torch.tensor(train_pairs, dtype=torch.long) pair_eval_t = torch.tensor(eval_pairs, dtype=torch.long) best_state = None best_epoch = 0 best_eval_loss = float("inf") for epoch in range(1, epochs + 1): model.train() perm = torch.randperm(pair_train_t.shape[0]) pair_train_t = pair_train_t[perm] for start in range(0, pair_train_t.shape[0], batch_size): batch_pairs = pair_train_t[start : start + batch_size] scores = model(x_train_t) diff = scores[batch_pairs[:, 0]] - scores[batch_pairs[:, 1]] loss = torch.nn.functional.softplus(-diff).mean() optimizer.zero_grad() loss.backward() optimizer.step() current_eval_loss = evaluate_pair_loss(model, x_eval_t, pair_eval_t) if current_eval_loss <= best_eval_loss: best_eval_loss = current_eval_loss best_epoch = epoch best_state = {k: v.detach().cpu().clone() for k, v in model.state_dict().items()} if best_state is not None: model.load_state_dict(best_state) return model, best_epoch, int(train_pairs.shape[0]), int(eval_pairs.shape[0]) def evaluate_pair_loss(model: FactorizationMachine, x: torch.Tensor, pairs: torch.Tensor) -> float: if pairs.numel() == 0: return 0.0 model.eval() with torch.no_grad(): scores = model(x) diff = scores[pairs[:, 0]] - scores[pairs[:, 1]] return float(torch.nn.functional.softplus(-diff).mean().item()) def rank_and_score(group: pd.DataFrame, score_column: str) -> list[str]: ranked = group.sort_values(score_column, ascending=False, kind="mergesort") return [LABELS_BY_GRADE[int(grade)] for grade in ranked["grade"].tolist()] def compute_group_metrics(df: pd.DataFrame, score_column: str) -> dict[str, float]: per_query_metrics: list[dict[str, float]] = [] for _query, group in df.groupby("query", sort=False): ranked_labels = rank_and_score(group, score_column=score_column) ideal_labels = [ LABELS_BY_GRADE[int(grade)] for grade in sorted(group["grade"].tolist(), reverse=True) ] per_query_metrics.append(compute_query_metrics(ranked_labels, ideal_labels=ideal_labels)) return aggregate_metrics(per_query_metrics) def summarize_dataset(df: pd.DataFrame) -> dict[str, Any]: query_sizes = df.groupby("query")["spu_id"].count() grade_counts = df["grade"].value_counts().sort_index().to_dict() pair_count = 0 for _query, group in df.groupby("query", sort=False): grades = group["grade"].to_numpy(dtype=np.int64) for left in range(len(grades)): for right in range(left + 1, len(grades)): if grades[left] != grades[right]: pair_count += 1 return { "query_count": int(df["query"].nunique()), "doc_count": int(len(df)), "avg_docs_per_query": round(float(query_sizes.mean()), 3), "min_docs_per_query": int(query_sizes.min()), "max_docs_per_query": int(query_sizes.max()), "grade_counts": {str(k): int(v) for k, v in grade_counts.items()}, "pair_count": int(pair_count), } def fit_final_model( *, x: np.ndarray, grades: np.ndarray, qids: np.ndarray, fm_dim: int, epochs: int, batch_size: int, lr: float, weight_decay: float, seed: int, ) -> tuple[FactorizationMachine, int, int]: model, best_epoch, pair_count_train, _ = train_one_fold( x_train=x, grades_train=grades, qids_train=qids, x_eval=x, grades_eval=grades, qids_eval=qids, fm_dim=fm_dim, epochs=epochs, batch_size=batch_size, lr=lr, weight_decay=weight_decay, seed=seed, ) return model, best_epoch, pair_count_train def export_feature_importance( model: FactorizationMachine, feature_names: Sequence[str], output_dir: Path, *, top_k_interactions: int = 300, ) -> dict[str, str]: linear = model.linear.detach().cpu().numpy() v = model.v.detach().cpu().numpy() linear_rows: list[dict[str, Any]] = [] for feature_name, weight in zip(feature_names, linear, strict=False): linear_rows.append( { "feature": feature_name, "weight": float(weight), "importance": float(abs(weight)), } ) linear_rows.sort(key=lambda row: row["importance"], reverse=True) linear_path = output_dir / "feature_importance_linear.csv" pd.DataFrame(linear_rows).to_csv(linear_path, index=False, encoding="utf-8") interaction_rows: list[dict[str, Any]] = [] for left_idx, left_name in enumerate(feature_names): left_vec = v[left_idx] for right_idx in range(left_idx + 1, len(feature_names)): right_name = feature_names[right_idx] interaction_weight = float(np.dot(left_vec, v[right_idx])) interaction_rows.append( { "feature_left": left_name, "feature_right": right_name, "interaction_feature": f"{left_name} * {right_name}", "weight": interaction_weight, "importance": abs(interaction_weight), } ) interaction_rows.sort(key=lambda row: row["importance"], reverse=True) interaction_path = output_dir / "feature_importance_interactions.csv" pd.DataFrame(interaction_rows[:top_k_interactions]).to_csv(interaction_path, index=False, encoding="utf-8") return { "linear_importance_path": str(linear_path), "interaction_importance_path": str(interaction_path), } def main() -> None: args = build_parser().parse_args() set_seed(args.seed) log_path = Path(args.log_path) db_path = Path(args.db_path) run_id = f"offline_ltr_{utc_timestamp()}" output_dir = ensure_dir(DEFAULT_ARTIFACT_ROOT / "ltr_runs" / run_id) store = EvalStore(db_path) raw_df = load_log_rows(log_path=log_path, top_k=args.top_k) labeled_df = attach_labels(raw_df, store=store, tenant_id=str(args.tenant_id)) feat_df, feature_names = add_engineered_features(labeled_df) feat_df = feat_df.reset_index(drop=True) queries = feat_df["query"].drop_duplicates().tolist() query_to_id = {query: idx for idx, query in enumerate(queries)} qids = feat_df["query"].map(query_to_id).to_numpy(dtype=np.int64) grades = feat_df["grade"].to_numpy(dtype=np.int64) x_all = feat_df[feature_names].to_numpy(dtype=np.float64) train_queries_holdout, test_queries_holdout = choose_holdout_queries( queries, holdout_count=args.holdout_query_count, seed=args.seed, ) baseline_metrics = { "current_fused_score": compute_group_metrics(feat_df, "fused_score"), "rerank_score_only": compute_group_metrics(feat_df, "rerank_score"), "es_score_only": compute_group_metrics(feat_df, "es_score"), "text_score_only": compute_group_metrics(feat_df, "text_score"), } splitter = GroupKFold(n_splits=min(args.folds, len(queries))) folds: list[FoldArtifacts] = [] fold_metric_items: list[dict[str, float]] = [] for fold_id, (train_idx, test_idx) in enumerate(splitter.split(x_all, grades, groups=qids), start=1): scaler = StandardScaler() x_train = scaler.fit_transform(x_all[train_idx]) x_test = scaler.transform(x_all[test_idx]) model, best_epoch, pair_count_train, pair_count_eval = train_one_fold( x_train=x_train, grades_train=grades[train_idx], qids_train=qids[train_idx], x_eval=x_test, grades_eval=grades[test_idx], qids_eval=qids[test_idx], fm_dim=args.fm_dim, epochs=args.epochs, batch_size=args.batch_size, lr=args.lr, weight_decay=args.weight_decay, seed=args.seed + fold_id, ) with torch.no_grad(): fm_scores = model(torch.tensor(x_test, dtype=torch.float32)).cpu().numpy() fold_df = feat_df.iloc[test_idx].copy() fold_df = fold_df.assign(fm_score=fm_scores) metrics_fm = compute_group_metrics(fold_df, "fm_score") fold_metric_items.append(metrics_fm) folds.append( FoldArtifacts( fold_id=fold_id, train_queries=sorted(feat_df.iloc[train_idx]["query"].unique().tolist()), test_queries=sorted(feat_df.iloc[test_idx]["query"].unique().tolist()), best_epoch=best_epoch, pair_count_train=pair_count_train, pair_count_eval=pair_count_eval, metrics_fm=metrics_fm, metrics_baseline={ "current_fused_score": compute_group_metrics(fold_df, "fused_score"), "rerank_score_only": compute_group_metrics(fold_df, "rerank_score"), "es_score_only": compute_group_metrics(fold_df, "es_score"), "text_score_only": compute_group_metrics(fold_df, "text_score"), }, ) ) cv_metrics = aggregate_metrics(fold_metric_items) holdout_train_df, holdout_test_df = split_by_queries( feat_df, train_queries=train_queries_holdout, test_queries=test_queries_holdout, ) holdout_metrics: dict[str, Any] | None = None if not holdout_test_df.empty: holdout_query_to_id = { query: idx for idx, query in enumerate(holdout_train_df["query"].drop_duplicates().tolist()) } holdout_train_qids = holdout_train_df["query"].map(holdout_query_to_id).to_numpy(dtype=np.int64) holdout_x_train = holdout_train_df[feature_names].to_numpy(dtype=np.float64) holdout_grades_train = holdout_train_df["grade"].to_numpy(dtype=np.int64) holdout_x_test = holdout_test_df[feature_names].to_numpy(dtype=np.float64) holdout_grades_test = holdout_test_df["grade"].to_numpy(dtype=np.int64) holdout_test_qids = holdout_test_df["query"].astype("category").cat.codes.to_numpy(dtype=np.int64) holdout_scaler = StandardScaler() holdout_x_train = holdout_scaler.fit_transform(holdout_x_train) holdout_x_test = holdout_scaler.transform(holdout_x_test) holdout_model, holdout_best_epoch, holdout_pair_count_train, holdout_pair_count_eval = train_one_fold( x_train=holdout_x_train, grades_train=holdout_grades_train, qids_train=holdout_train_qids, x_eval=holdout_x_test, grades_eval=holdout_grades_test, qids_eval=holdout_test_qids, fm_dim=args.fm_dim, epochs=args.epochs, batch_size=args.batch_size, lr=args.lr, weight_decay=args.weight_decay, seed=args.seed + 5000, ) with torch.no_grad(): holdout_scores = holdout_model(torch.tensor(holdout_x_test, dtype=torch.float32)).cpu().numpy() holdout_test_df = holdout_test_df.assign(fm_score=holdout_scores) holdout_metrics = { "train_query_count": len(train_queries_holdout), "test_query_count": len(test_queries_holdout), "train_queries": train_queries_holdout, "test_queries": test_queries_holdout, "best_epoch": holdout_best_epoch, "pair_count_train": holdout_pair_count_train, "pair_count_eval": holdout_pair_count_eval, "metrics_fm": compute_group_metrics(holdout_test_df, "fm_score"), "metrics_baseline": { "current_fused_score": compute_group_metrics(holdout_test_df, "fused_score"), "rerank_score_only": compute_group_metrics(holdout_test_df, "rerank_score"), "es_score_only": compute_group_metrics(holdout_test_df, "es_score"), "text_score_only": compute_group_metrics(holdout_test_df, "text_score"), }, } final_scaler = StandardScaler() x_final = final_scaler.fit_transform(x_all) final_model, final_best_epoch, final_pair_count = fit_final_model( x=x_final, grades=grades, qids=qids, fm_dim=args.fm_dim, epochs=args.epochs, batch_size=args.batch_size, lr=args.lr, weight_decay=args.weight_decay, seed=args.seed + 999, ) with torch.no_grad(): final_scores = final_model(torch.tensor(x_final, dtype=torch.float32)).cpu().numpy() feat_df = feat_df.copy() feat_df = feat_df.assign(fm_score=final_scores) final_metrics = compute_group_metrics(feat_df, "fm_score") model_payload = { "feature_names": feature_names, "scaler_mean": final_scaler.mean_.tolist(), "scaler_scale": final_scaler.scale_.tolist(), "fm_state_dict": {k: v.detach().cpu().tolist() for k, v in final_model.state_dict().items()}, "seed": args.seed, "fm_dim": args.fm_dim, "best_epoch": final_best_epoch, } model_path = output_dir / "fm_ranknet_model.json" model_path.write_text(json.dumps(model_payload, ensure_ascii=False), encoding="utf-8") importance_paths = export_feature_importance(final_model, feature_names, output_dir) pred_cols = [ "query", "spu_id", "title", "label", "grade", "final_rank", "fused_score", "rerank_score", "es_score", "text_score", "knn_score", "fm_score", ] feat_df[pred_cols].to_csv(output_dir / "predictions.csv", index=False, encoding="utf-8") summary = { "run_id": run_id, "tenant_id": str(args.tenant_id), "log_path": str(log_path), "db_path": str(db_path), "dataset": summarize_dataset(feat_df), "config": { "top_k": args.top_k, "folds": args.folds, "epochs": args.epochs, "batch_size": args.batch_size, "lr": args.lr, "weight_decay": args.weight_decay, "fm_dim": args.fm_dim, "seed": args.seed, "holdout_query_count": args.holdout_query_count, "feature_count": len(feature_names), }, "baseline_metrics": baseline_metrics, "cv_metrics": cv_metrics, "holdout_test_metrics": holdout_metrics, "final_metrics_all_queries": final_metrics, "folds": [ { "fold_id": fold.fold_id, "train_query_count": len(fold.train_queries), "test_query_count": len(fold.test_queries), "best_epoch": fold.best_epoch, "pair_count_train": fold.pair_count_train, "pair_count_eval": fold.pair_count_eval, "metrics_fm": fold.metrics_fm, "metrics_baseline": fold.metrics_baseline, "test_queries": fold.test_queries, } for fold in folds ], "artifacts": { "model_path": str(model_path), "predictions_path": str(output_dir / "predictions.csv"), **importance_paths, }, "final_pair_count": final_pair_count, } summary_path = output_dir / "summary.json" summary_path.write_text(json.dumps(summary, ensure_ascii=False, indent=2), encoding="utf-8") print(f"run_id={run_id}") print(f"summary_path={summary_path}") print(f"model_path={model_path}") print("dataset=", json.dumps(summary["dataset"], ensure_ascii=False)) print("baseline_current_fused_score=", json.dumps(baseline_metrics["current_fused_score"], ensure_ascii=False)) print("cv_metrics_fm=", json.dumps(cv_metrics, ensure_ascii=False)) print("holdout_test_metrics=", json.dumps(holdout_metrics, ensure_ascii=False)) print("final_metrics_fm_all_queries=", json.dumps(final_metrics, ensure_ascii=False)) if __name__ == "__main__": main()