#!/usr/bin/env python3 """ DEPRECATED NAME (kept for backward compatibility). The input `products_data/*.xlsx` files are **Amazon-format exports** (with Parent/Child ASIN), not “competitor data”. Please use: - `scripts/amazon_xlsx_to_shoplazza_xlsx.py` This script keeps the same logic but updates user-facing naming gradually. """ import os import re import sys import argparse from datetime import datetime from collections import defaultdict, Counter from pathlib import Path from openpyxl import load_workbook # Allow running as `python scripts/xxx.py` without installing as a package sys.path.insert(0, str(Path(__file__).resolve().parent)) from shoplazza_excel_template import create_excel_from_template PREFERRED_OPTION_KEYS = [ "Size", "Color", "Style", "Pattern", "Material", "Flavor", "Scent", "Pack", "Pack of", "Number of Items", "Count", "Capacity", "Length", "Width", "Height", "Model", "Configuration", ] def clean_str(v): if v is None: return "" return str(v).strip() def html_escape(s): s = clean_str(s) return (s.replace("&", "&") .replace("<", "<") .replace(">", ">")) def generate_handle(title): """ Generate URL-friendly handle from title (ASCII only). Keep consistent with existing scripts. """ handle = clean_str(title).lower() handle = re.sub(r"[^a-z0-9\\s-]", "", handle) handle = re.sub(r"[-\\s]+", "-", handle).strip("-") if len(handle) > 255: handle = handle[:255] return handle or "product" def parse_date_to_template(dt_value): """ Template expects: YYYY-MM-DD HH:MM:SS Input could be "2018-05-09" or datetime/date. """ if dt_value is None or dt_value == "": return "" if isinstance(dt_value, datetime): return dt_value.strftime("%Y-%m-%d %H:%M:%S") s = clean_str(dt_value) # common formats for fmt in ("%Y-%m-%d", "%Y/%m/%d", "%Y-%m-%d %H:%M:%S", "%Y/%m/%d %H:%M:%S"): try: d = datetime.strptime(s, fmt) return d.strftime("%Y-%m-%d %H:%M:%S") except Exception: pass return "" def parse_weight(weight_conv, weight_raw): """ Return (weight_value, unit) where unit in {kg, lb, g, oz}. Prefer '商品重量(单位换算)' like '68.04 g'. Fallback to '商品重量' like '0.15 pounds'. """ s = clean_str(weight_conv) or clean_str(weight_raw) if not s: return ("", "") m = re.search(r"([0-9]+(?:\\.[0-9]+)?)\\s*([a-zA-Z]+)", s) if not m: return ("", "") val = float(m.group(1)) unit = m.group(2).lower() if unit in ("g", "gram", "grams"): return (val, "g") if unit in ("kg", "kilogram", "kilograms"): return (val, "kg") if unit in ("lb", "lbs", "pound", "pounds"): return (val, "lb") if unit in ("oz", "ounce", "ounces"): return (val, "oz") return ("", "") def parse_dimensions_inches(dim_raw): """ Template '尺寸信息': 'L,W,H' in inches. Input example: '7.9 x 7.9 x 2 inches' """ s = clean_str(dim_raw) if not s: return "" # extract first 3 numbers in order nums = re.findall(r"([0-9]+(?:\\.[0-9]+)?)", s) if len(nums) < 3: return "" return "{},{},{}".format(nums[0], nums[1], nums[2]) def parse_sku_options(sku_text): """ Parse 'SKU' column into {key: value}. Example: 'Size: One Size | Color: Black' -> {'Size':'One Size','Color':'Black'} """ s = clean_str(sku_text) if not s: return {} parts = [p.strip() for p in s.split("|") if p.strip()] out = {} for p in parts: if ":" not in p: continue k, v = p.split(":", 1) k = clean_str(k) v = clean_str(v) if k and v: out[k] = v return out def choose_option_keys(variant_dicts, max_keys=3): """ Choose up to 3 option keys for a product group. Order by preference list first, then by frequency. """ freq = Counter() for d in variant_dicts: for k, v in d.items(): if v: freq[k] += 1 if not freq: return [] preferred_rank = {k: i for i, k in enumerate(PREFERRED_OPTION_KEYS)} def key_sort(k): return (preferred_rank.get(k, 10 ** 6), -freq[k], k.lower()) keys = sorted(freq.keys(), key=key_sort) return keys[:max_keys] def build_description_html(title, details, product_url): parts = [] if title: parts.append("
{}
".format(html_escape(title))) detail_items = [x.strip() for x in clean_str(details).split("|") if x.strip()] if detail_items: li = "".join(["Source: {0}
'.format(html_escape(product_url))) return "".join(parts) def competitor_sheet(ws): """ Build (header->col_index) for competitor sheet. Assumes header is row 1. """ headers = [] for c in range(1, ws.max_column + 1): v = ws.cell(1, c).value headers.append(clean_str(v)) idx = {h: i + 1 for i, h in enumerate(headers) if h} return idx def read_competitor_rows_from_file(xlsx_path, max_rows=None): wb = load_workbook(xlsx_path, read_only=True, data_only=True) # pick first non-Notes sheet sheet_name = None for name in wb.sheetnames: if str(name).lower() == "notes": continue sheet_name = name break if sheet_name is None: return [] ws = wb[sheet_name] idx = competitor_sheet(ws) required = ["ASIN", "父ASIN", "商品标题", "商品主图", "SKU", "详细参数", "价格($)", "prime价格($)", "上架时间", "类目路径", "大类目", "小类目", "品牌", "品牌链接", "商品详情页链接", "商品重量(单位换算)", "商品重量", "商品尺寸"] for k in required: if k not in idx: raise RuntimeError("Missing column '{}' in {} sheet {}".format(k, xlsx_path, sheet_name)) rows = [] end_row = ws.max_row if max_rows is not None: end_row = min(end_row, 1 + int(max_rows)) for r in range(2, end_row + 1): asin = clean_str(ws.cell(r, idx["ASIN"]).value) if not asin: continue parent = clean_str(ws.cell(r, idx["父ASIN"]).value) or asin row = { "ASIN": asin, "父ASIN": parent, "SKU": clean_str(ws.cell(r, idx["SKU"]).value), "详细参数": clean_str(ws.cell(r, idx["详细参数"]).value), "商品标题": clean_str(ws.cell(r, idx["商品标题"]).value), "商品主图": clean_str(ws.cell(r, idx["商品主图"]).value), "价格($)": ws.cell(r, idx["价格($)"]).value, "prime价格($)": ws.cell(r, idx["prime价格($)"]).value, "上架时间": clean_str(ws.cell(r, idx["上架时间"]).value), "类目路径": clean_str(ws.cell(r, idx["类目路径"]).value), "大类目": clean_str(ws.cell(r, idx["大类目"]).value), "小类目": clean_str(ws.cell(r, idx["小类目"]).value), "品牌": clean_str(ws.cell(r, idx["品牌"]).value), "品牌链接": clean_str(ws.cell(r, idx["品牌链接"]).value), "商品详情页链接": clean_str(ws.cell(r, idx["商品详情页链接"]).value), "商品重量(单位换算)": clean_str(ws.cell(r, idx["商品重量(单位换算)"]).value), "商品重量": clean_str(ws.cell(r, idx["商品重量"]).value), "商品尺寸": clean_str(ws.cell(r, idx["商品尺寸"]).value), } rows.append(row) return rows def to_price(v): if v is None or v == "": return None try: return float(v) except Exception: s = clean_str(v) m = re.search(r"([0-9]+(?:\\.[0-9]+)?)", s) if not m: return None return float(m.group(1)) def build_common_fields(base_row, spu_id): title = base_row.get("商品标题") or "Product" brand = base_row.get("品牌") or "" big_cat = base_row.get("大类目") or "" small_cat = base_row.get("小类目") or "" cat_path = base_row.get("类目路径") or "" handle = generate_handle(title) if handle and not handle.startswith("products/"): handle = "products/{}".format(handle) seo_title = title seo_desc_parts = [] if brand: seo_desc_parts.append(brand) seo_desc_parts.append(title) if big_cat: seo_desc_parts.append(big_cat) seo_description = " ".join([x for x in seo_desc_parts if x])[:5000] seo_keywords = ",".join([x for x in [title, brand, big_cat, small_cat] if x]) tags = ",".join([x for x in [brand, big_cat, small_cat] if x]) created_at = parse_date_to_template(base_row.get("上架时间")) description = build_description_html( title=title, details=base_row.get("详细参数"), product_url=base_row.get("商品详情页链接"), ) # default inventory settings (data source has no stock) inventory_qty = 100 weight_val, weight_unit = parse_weight(base_row.get("商品重量(单位换算)"), base_row.get("商品重量")) size_info = parse_dimensions_inches(base_row.get("商品尺寸")) album = big_cat or "" if not album and cat_path: album = cat_path.split(":")[0] common = { "商品ID": "", "创建时间": created_at, "商品标题*": title[:255], "商品副标题": "{} {}".format(brand, big_cat).strip()[:600], "商品描述": description, "SEO标题": seo_title[:5000], "SEO描述": seo_description, "SEO URL Handle": handle, "SEO URL 重定向": "N", "SEO关键词": seo_keywords[:5000], "商品上架": "Y", "需要物流": "Y", "商品收税": "N", "商品spu": spu_id[:100], "启用虚拟销量": "N", "虚拟销量值": "", "跟踪库存": "Y", "库存规则*": "1", "专辑名称": album, "标签": tags, "供应商名称": "Amazon", "供应商URL": base_row.get("商品详情页链接") or base_row.get("品牌链接") or "", "商品重量": weight_val if weight_val != "" else "", "重量单位": weight_unit, "商品库存": inventory_qty, "尺寸信息": size_info, "原产地国别": "", "HS(协调制度)代码": "", "商品备注": "ASIN:{}; ParentASIN:{}; CategoryPath:{}".format( base_row.get("ASIN", ""), spu_id, (cat_path[:200] if cat_path else "") )[:500], "款式备注": "", } return common def build_s_row(base_row): spu_id = base_row.get("父ASIN") or base_row.get("ASIN") common = build_common_fields(base_row, spu_id=spu_id) price = to_price(base_row.get("prime价格($)")) or to_price(base_row.get("价格($)")) or 9.99 image = base_row.get("商品主图") or "" row = {} row.update(common) row.update({ "商品属性*": "S", "款式1": "", "款式2": "", "款式3": "", "商品售价*": price, "商品原价": price, "成本价": "", "商品SKU": base_row.get("ASIN") or "", "商品条形码": "", "商品图片*": image, "商品主图": image, }) return row def build_m_p_rows(variant_rows): """ variant_rows: List[dict] with same 父ASIN. """ base = variant_rows[0] spu_id = base.get("父ASIN") or base.get("ASIN") common = build_common_fields(base, spu_id=spu_id) option_dicts = [parse_sku_options(v.get("SKU")) for v in variant_rows] option_keys = choose_option_keys(option_dicts, max_keys=3) if not option_keys: option_keys = ["Variant"] # M row m = {} m.update(common) m.update({ "商品属性*": "M", "款式1": option_keys[0] if len(option_keys) > 0 else "", "款式2": option_keys[1] if len(option_keys) > 1 else "", "款式3": option_keys[2] if len(option_keys) > 2 else "", "商品售价*": "", "商品原价": "", "成本价": "", "商品SKU": "", "商品条形码": "", "商品图片*": base.get("商品主图") or "", "商品主图": base.get("商品主图") or "", }) # For M row, these SKU-level fields should be empty per template guidance m["商品重量"] = "" m["重量单位"] = "" m["商品库存"] = "" m["尺寸信息"] = "" rows = [m] # P rows for v in variant_rows: v_common = build_common_fields(v, spu_id=spu_id) # wipe SPU-only fields for P row v_common.update({ "商品副标题": "", "商品描述": "", "SEO标题": "", "SEO描述": "", "SEO URL Handle": "", "SEO URL 重定向": "", "SEO关键词": "", "专辑名称": "", "标签": "", "供应商名称": "", "供应商URL": "", "商品备注": "", }) opt = parse_sku_options(v.get("SKU")) if option_keys == ["Variant"]: opt_vals = [v.get("ASIN")] else: opt_vals = [opt.get(k, "") for k in option_keys] price = to_price(v.get("prime价格($)")) or to_price(v.get("价格($)")) or 9.99 image = v.get("商品主图") or "" p = {} p.update(v_common) p.update({ "商品属性*": "P", "款式1": opt_vals[0] if len(opt_vals) > 0 else "", "款式2": opt_vals[1] if len(opt_vals) > 1 else "", "款式3": opt_vals[2] if len(opt_vals) > 2 else "", "商品售价*": price, "商品原价": price, "成本价": "", "商品SKU": v.get("ASIN") or "", "商品条形码": "", # P row supports one variant image; we use variant's main image "商品图片*": image, "商品主图": "", }) rows.append(p) return rows def main(): parser = argparse.ArgumentParser(description="Convert Amazon-format xlsx files to Shoplazza import xlsx (deprecated script name)") parser.add_argument("--input-dir", default="data/mai_jia_jing_ling/products_data", help="Directory containing Amazon-format xlsx files") parser.add_argument("--template", default="docs/商品导入模板.xlsx", help="Shoplazza import template xlsx") parser.add_argument("--output", default="amazon_shoplazza_import.xlsx", help="Output xlsx file path") parser.add_argument("--max-files", type=int, default=None, help="Limit number of xlsx files to read (for testing)") parser.add_argument("--max-rows-per-file", type=int, default=None, help="Limit rows per xlsx file (for testing)") parser.add_argument("--max-products", type=int, default=None, help="Limit number of SPU groups to output (for testing)") args = parser.parse_args() input_dir = args.input_dir if not os.path.isdir(input_dir): raise RuntimeError("input-dir not found: {}".format(input_dir)) if not os.path.exists(args.template): raise RuntimeError("template not found: {}".format(args.template)) files = [os.path.join(input_dir, f) for f in os.listdir(input_dir) if f.lower().endswith(".xlsx")] files.sort() if args.max_files is not None: files = files[: int(args.max_files)] print("Reading Amazon-format files: {} (from {})".format(len(files), input_dir), flush=True) groups = defaultdict(list) # spu_id -> [variant rows] seen_asin = set() for fp in files: print(" - loading: {}".format(fp), flush=True) try: rows = read_competitor_rows_from_file(fp, max_rows=args.max_rows_per_file) except Exception as e: print("WARN: failed to read {}: {}".format(fp, e)) continue print(" loaded rows: {}".format(len(rows)), flush=True) for r in rows: asin = r.get("ASIN") if asin in seen_asin: continue seen_asin.add(asin) spu_id = r.get("父ASIN") or asin groups[spu_id].append(r) print("Collected variants: {}, SPU groups: {}".format(len(seen_asin), len(groups)), flush=True) excel_rows = [] spu_count = 0 for spu_id, variants in groups.items(): if not variants: continue spu_count += 1 if args.max_products is not None and spu_count > int(args.max_products): break if len(variants) == 1: excel_rows.append(build_s_row(variants[0])) else: excel_rows.extend(build_m_p_rows(variants)) print("Generated Excel rows: {} (SPU groups output: {})".format(len(excel_rows), min(spu_count, len(groups))), flush=True) create_excel_from_template(args.template, args.output, excel_rows) if __name__ == "__main__": main()