diff --git a/docs/亚马逊格式数据转店匠商品导入模板.md b/docs/亚马逊格式数据转店匠商品导入模板.md index 11c24d3..7c60870 100644 --- a/docs/亚马逊格式数据转店匠商品导入模板.md +++ b/docs/亚马逊格式数据转店匠商品导入模板.md @@ -116,6 +116,12 @@ python scripts/amazon_xlsx_to_shoplazza_xlsx.py \ --max-files 1 --max-rows-per-file 2000 --max-products 50 ``` +### 性能提示(很重要) + +- 旧实现如果用 `ws.cell()` 逐格读取/写入,处理 1 个 xlsx 就可能非常慢(分钟级甚至更久)。 +- 当前脚本已经使用 **`iter_rows(values_only=True)`** 做快速读取,并默认启用 **fast writer**(写出时不逐格写模板)。 +- 如需使用慢速的“按模板逐格写入”(不推荐),可加:`--no-fast-write` + ### 2)生成全量 ```bash diff --git a/scripts/amazon_xlsx_to_shoplazza_xlsx.py b/scripts/amazon_xlsx_to_shoplazza_xlsx.py index 90f2e90..276a45b 100644 --- a/scripts/amazon_xlsx_to_shoplazza_xlsx.py +++ b/scripts/amazon_xlsx_to_shoplazza_xlsx.py @@ -36,7 +36,7 @@ from openpyxl import load_workbook # Allow running as `python scripts/xxx.py` without installing as a package sys.path.insert(0, str(Path(__file__).resolve().parent)) -from shoplazza_excel_template import create_excel_from_template +from shoplazza_excel_template import create_excel_from_template, create_excel_from_template_fast PREFERRED_OPTION_KEYS = [ @@ -210,35 +210,39 @@ def read_amazon_rows_from_file(xlsx_path, max_rows=None): if k not in idx: raise RuntimeError("Missing column '{}' in {} sheet {}".format(k, xlsx_path, sheet_name)) + # OPT: use iter_rows(values_only=True) instead of ws.cell() per field. + # openpyxl cell access is relatively expensive; values_only is much faster. + pos = {k: idx[k] - 1 for k in required} # 0-based positions in row tuple + rows = [] end_row = ws.max_row if max_rows is not None: end_row = min(end_row, 1 + int(max_rows)) - for r in range(2, end_row + 1): - asin = clean_str(ws.cell(r, idx["ASIN"]).value) + for tup in ws.iter_rows(min_row=2, max_row=end_row, values_only=True): + asin = clean_str(tup[pos["ASIN"]]) if not asin: continue - parent = clean_str(ws.cell(r, idx["父ASIN"]).value) or asin + parent = clean_str(tup[pos["父ASIN"]]) or asin rows.append({ "ASIN": asin, "父ASIN": parent, - "SKU": clean_str(ws.cell(r, idx["SKU"]).value), - "详细参数": clean_str(ws.cell(r, idx["详细参数"]).value), - "商品标题": clean_str(ws.cell(r, idx["商品标题"]).value), - "商品主图": clean_str(ws.cell(r, idx["商品主图"]).value), - "价格($)": ws.cell(r, idx["价格($)"]).value, - "prime价格($)": ws.cell(r, idx["prime价格($)"]).value, - "上架时间": clean_str(ws.cell(r, idx["上架时间"]).value), - "类目路径": clean_str(ws.cell(r, idx["类目路径"]).value), - "大类目": clean_str(ws.cell(r, idx["大类目"]).value), - "小类目": clean_str(ws.cell(r, idx["小类目"]).value), - "品牌": clean_str(ws.cell(r, idx["品牌"]).value), - "品牌链接": clean_str(ws.cell(r, idx["品牌链接"]).value), - "商品详情页链接": clean_str(ws.cell(r, idx["商品详情页链接"]).value), - "商品重量(单位换算)": clean_str(ws.cell(r, idx["商品重量(单位换算)"]).value), - "商品重量": clean_str(ws.cell(r, idx["商品重量"]).value), - "商品尺寸": clean_str(ws.cell(r, idx["商品尺寸"]).value), + "SKU": clean_str(tup[pos["SKU"]]), + "详细参数": clean_str(tup[pos["详细参数"]]), + "商品标题": clean_str(tup[pos["商品标题"]]), + "商品主图": clean_str(tup[pos["商品主图"]]), + "价格($)": tup[pos["价格($)"]], + "prime价格($)": tup[pos["prime价格($)"]], + "上架时间": clean_str(tup[pos["上架时间"]]), + "类目路径": clean_str(tup[pos["类目路径"]]), + "大类目": clean_str(tup[pos["大类目"]]), + "小类目": clean_str(tup[pos["小类目"]]), + "品牌": clean_str(tup[pos["品牌"]]), + "品牌链接": clean_str(tup[pos["品牌链接"]]), + "商品详情页链接": clean_str(tup[pos["商品详情页链接"]]), + "商品重量(单位换算)": clean_str(tup[pos["商品重量(单位换算)"]]), + "商品重量": clean_str(tup[pos["商品重量"]]), + "商品尺寸": clean_str(tup[pos["商品尺寸"]]), }) return rows @@ -417,6 +421,7 @@ def main(): parser.add_argument("--input-dir", default="data/mai_jia_jing_ling/products_data", help="Directory containing Amazon-format xlsx files") parser.add_argument("--template", default="docs/商品导入模板.xlsx", help="Shoplazza import template xlsx") parser.add_argument("--output", default="amazon_shoplazza_import.xlsx", help="Output xlsx file path") + parser.add_argument("--no-fast-write", action="store_true", help="Disable fast writer (use template cell-by-cell write; slower)") parser.add_argument("--max-files", type=int, default=None, help="Limit number of xlsx files to read (for testing)") parser.add_argument("--max-rows-per-file", type=int, default=None, help="Limit rows per xlsx file (for testing)") parser.add_argument("--max-products", type=int, default=None, help="Limit number of SPU groups to output (for testing)") @@ -471,7 +476,10 @@ def main(): excel_rows.extend(build_m_p_rows(variants)) print("Generated Excel rows: {} (SPU groups output: {})".format(len(excel_rows), min(spu_count, len(groups))), flush=True) - create_excel_from_template(args.template, args.output, excel_rows) + if args.no_fast_write: + create_excel_from_template(args.template, args.output, excel_rows) + else: + create_excel_from_template_fast(args.template, args.output, excel_rows) if __name__ == "__main__": diff --git a/scripts/competitor_xlsx_to_shoplazza_xlsx.py b/scripts/competitor_xlsx_to_shoplazza_xlsx.py index 1048503..5812357 100644 --- a/scripts/competitor_xlsx_to_shoplazza_xlsx.py +++ b/scripts/competitor_xlsx_to_shoplazza_xlsx.py @@ -1,514 +1,27 @@ #!/usr/bin/env python3 """ -DEPRECATED NAME (kept for backward compatibility). +DEPRECATED SCRIPT NAME (kept for backward compatibility). -The input `products_data/*.xlsx` files are **Amazon-format exports** (with Parent/Child ASIN), -not “competitor data”. Please use: +The input `data/mai_jia_jing_ling/products_data/*.xlsx` files are Amazon-format exports +(Parent/Child ASIN), not “competitor data”. +Please use: - `scripts/amazon_xlsx_to_shoplazza_xlsx.py` -This script keeps the same logic but updates user-facing naming gradually. +This wrapper simply forwards all CLI args to the correctly named script, so you +automatically get the latest performance improvements (fast read/write). """ -import os -import re import sys -import argparse -from datetime import datetime -from collections import defaultdict, Counter from pathlib import Path -from openpyxl import load_workbook - # Allow running as `python scripts/xxx.py` without installing as a package sys.path.insert(0, str(Path(__file__).resolve().parent)) -from shoplazza_excel_template import create_excel_from_template - - -PREFERRED_OPTION_KEYS = [ - "Size", "Color", "Style", "Pattern", "Material", "Flavor", "Scent", - "Pack", "Pack of", "Number of Items", "Count", "Capacity", "Length", - "Width", "Height", "Model", "Configuration", -] - - -def clean_str(v): - if v is None: - return "" - return str(v).strip() - - -def html_escape(s): - s = clean_str(s) - return (s.replace("&", "&") - .replace("<", "<") - .replace(">", ">")) - - -def generate_handle(title): - """ - Generate URL-friendly handle from title (ASCII only). - Keep consistent with existing scripts. - """ - handle = clean_str(title).lower() - handle = re.sub(r"[^a-z0-9\\s-]", "", handle) - handle = re.sub(r"[-\\s]+", "-", handle).strip("-") - if len(handle) > 255: - handle = handle[:255] - return handle or "product" - - -def parse_date_to_template(dt_value): - """ - Template expects: YYYY-MM-DD HH:MM:SS - Input could be "2018-05-09" or datetime/date. - """ - if dt_value is None or dt_value == "": - return "" - if isinstance(dt_value, datetime): - return dt_value.strftime("%Y-%m-%d %H:%M:%S") - s = clean_str(dt_value) - # common formats - for fmt in ("%Y-%m-%d", "%Y/%m/%d", "%Y-%m-%d %H:%M:%S", "%Y/%m/%d %H:%M:%S"): - try: - d = datetime.strptime(s, fmt) - return d.strftime("%Y-%m-%d %H:%M:%S") - except Exception: - pass - return "" - - -def parse_weight(weight_conv, weight_raw): - """ - Return (weight_value, unit) where unit in {kg, lb, g, oz}. - Prefer '商品重量(单位换算)' like '68.04 g'. - Fallback to '商品重量' like '0.15 pounds'. - """ - s = clean_str(weight_conv) or clean_str(weight_raw) - if not s: - return ("", "") - m = re.search(r"([0-9]+(?:\\.[0-9]+)?)\\s*([a-zA-Z]+)", s) - if not m: - return ("", "") - val = float(m.group(1)) - unit = m.group(2).lower() - if unit in ("g", "gram", "grams"): - return (val, "g") - if unit in ("kg", "kilogram", "kilograms"): - return (val, "kg") - if unit in ("lb", "lbs", "pound", "pounds"): - return (val, "lb") - if unit in ("oz", "ounce", "ounces"): - return (val, "oz") - return ("", "") - - -def parse_dimensions_inches(dim_raw): - """ - Template '尺寸信息': 'L,W,H' in inches. - Input example: '7.9 x 7.9 x 2 inches' - """ - s = clean_str(dim_raw) - if not s: - return "" - # extract first 3 numbers in order - nums = re.findall(r"([0-9]+(?:\\.[0-9]+)?)", s) - if len(nums) < 3: - return "" - return "{},{},{}".format(nums[0], nums[1], nums[2]) - - -def parse_sku_options(sku_text): - """ - Parse 'SKU' column into {key: value}. - Example: - 'Size: One Size | Color: Black' -> {'Size':'One Size','Color':'Black'} - """ - s = clean_str(sku_text) - if not s: - return {} - parts = [p.strip() for p in s.split("|") if p.strip()] - out = {} - for p in parts: - if ":" not in p: - continue - k, v = p.split(":", 1) - k = clean_str(k) - v = clean_str(v) - if k and v: - out[k] = v - return out - - -def choose_option_keys(variant_dicts, max_keys=3): - """ - Choose up to 3 option keys for a product group. - Order by preference list first, then by frequency. - """ - freq = Counter() - for d in variant_dicts: - for k, v in d.items(): - if v: - freq[k] += 1 - if not freq: - return [] - - preferred_rank = {k: i for i, k in enumerate(PREFERRED_OPTION_KEYS)} - - def key_sort(k): - return (preferred_rank.get(k, 10 ** 6), -freq[k], k.lower()) - - keys = sorted(freq.keys(), key=key_sort) - return keys[:max_keys] - - -def build_description_html(title, details, product_url): - parts = [] - if title: - parts.append("

{}

".format(html_escape(title))) - detail_items = [x.strip() for x in clean_str(details).split("|") if x.strip()] - if detail_items: - li = "".join(["
  • {}
  • ".format(html_escape(x)) for x in detail_items[:30]]) - parts.append("".format(li)) - if product_url: - parts.append('

    Source: {0}

    '.format(html_escape(product_url))) - return "".join(parts) - - -def competitor_sheet(ws): - """ - Build (header->col_index) for competitor sheet. - Assumes header is row 1. - """ - headers = [] - for c in range(1, ws.max_column + 1): - v = ws.cell(1, c).value - headers.append(clean_str(v)) - idx = {h: i + 1 for i, h in enumerate(headers) if h} - return idx - - -def read_competitor_rows_from_file(xlsx_path, max_rows=None): - wb = load_workbook(xlsx_path, read_only=True, data_only=True) - # pick first non-Notes sheet - sheet_name = None - for name in wb.sheetnames: - if str(name).lower() == "notes": - continue - sheet_name = name - break - if sheet_name is None: - return [] - ws = wb[sheet_name] - idx = competitor_sheet(ws) - - required = ["ASIN", "父ASIN", "商品标题", "商品主图", "SKU", "详细参数", "价格($)", "prime价格($)", - "上架时间", "类目路径", "大类目", "小类目", "品牌", "品牌链接", "商品详情页链接", - "商品重量(单位换算)", "商品重量", "商品尺寸"] - for k in required: - if k not in idx: - raise RuntimeError("Missing column '{}' in {} sheet {}".format(k, xlsx_path, sheet_name)) - - rows = [] - end_row = ws.max_row - if max_rows is not None: - end_row = min(end_row, 1 + int(max_rows)) - - for r in range(2, end_row + 1): - asin = clean_str(ws.cell(r, idx["ASIN"]).value) - if not asin: - continue - parent = clean_str(ws.cell(r, idx["父ASIN"]).value) or asin - row = { - "ASIN": asin, - "父ASIN": parent, - "SKU": clean_str(ws.cell(r, idx["SKU"]).value), - "详细参数": clean_str(ws.cell(r, idx["详细参数"]).value), - "商品标题": clean_str(ws.cell(r, idx["商品标题"]).value), - "商品主图": clean_str(ws.cell(r, idx["商品主图"]).value), - "价格($)": ws.cell(r, idx["价格($)"]).value, - "prime价格($)": ws.cell(r, idx["prime价格($)"]).value, - "上架时间": clean_str(ws.cell(r, idx["上架时间"]).value), - "类目路径": clean_str(ws.cell(r, idx["类目路径"]).value), - "大类目": clean_str(ws.cell(r, idx["大类目"]).value), - "小类目": clean_str(ws.cell(r, idx["小类目"]).value), - "品牌": clean_str(ws.cell(r, idx["品牌"]).value), - "品牌链接": clean_str(ws.cell(r, idx["品牌链接"]).value), - "商品详情页链接": clean_str(ws.cell(r, idx["商品详情页链接"]).value), - "商品重量(单位换算)": clean_str(ws.cell(r, idx["商品重量(单位换算)"]).value), - "商品重量": clean_str(ws.cell(r, idx["商品重量"]).value), - "商品尺寸": clean_str(ws.cell(r, idx["商品尺寸"]).value), - } - rows.append(row) - return rows - - -def to_price(v): - if v is None or v == "": - return None - try: - return float(v) - except Exception: - s = clean_str(v) - m = re.search(r"([0-9]+(?:\\.[0-9]+)?)", s) - if not m: - return None - return float(m.group(1)) - - -def build_common_fields(base_row, spu_id): - title = base_row.get("商品标题") or "Product" - brand = base_row.get("品牌") or "" - big_cat = base_row.get("大类目") or "" - small_cat = base_row.get("小类目") or "" - cat_path = base_row.get("类目路径") or "" - - handle = generate_handle(title) - if handle and not handle.startswith("products/"): - handle = "products/{}".format(handle) - - seo_title = title - seo_desc_parts = [] - if brand: - seo_desc_parts.append(brand) - seo_desc_parts.append(title) - if big_cat: - seo_desc_parts.append(big_cat) - seo_description = " ".join([x for x in seo_desc_parts if x])[:5000] - - seo_keywords = ",".join([x for x in [title, brand, big_cat, small_cat] if x]) - tags = ",".join([x for x in [brand, big_cat, small_cat] if x]) - - created_at = parse_date_to_template(base_row.get("上架时间")) - - description = build_description_html( - title=title, - details=base_row.get("详细参数"), - product_url=base_row.get("商品详情页链接"), - ) - - # default inventory settings (data source has no stock) - inventory_qty = 100 - - weight_val, weight_unit = parse_weight(base_row.get("商品重量(单位换算)"), base_row.get("商品重量")) - size_info = parse_dimensions_inches(base_row.get("商品尺寸")) - - album = big_cat or "" - if not album and cat_path: - album = cat_path.split(":")[0] - - common = { - "商品ID": "", - "创建时间": created_at, - "商品标题*": title[:255], - "商品副标题": "{} {}".format(brand, big_cat).strip()[:600], - "商品描述": description, - "SEO标题": seo_title[:5000], - "SEO描述": seo_description, - "SEO URL Handle": handle, - "SEO URL 重定向": "N", - "SEO关键词": seo_keywords[:5000], - "商品上架": "Y", - "需要物流": "Y", - "商品收税": "N", - "商品spu": spu_id[:100], - "启用虚拟销量": "N", - "虚拟销量值": "", - "跟踪库存": "Y", - "库存规则*": "1", - "专辑名称": album, - "标签": tags, - "供应商名称": "Amazon", - "供应商URL": base_row.get("商品详情页链接") or base_row.get("品牌链接") or "", - "商品重量": weight_val if weight_val != "" else "", - "重量单位": weight_unit, - "商品库存": inventory_qty, - "尺寸信息": size_info, - "原产地国别": "", - "HS(协调制度)代码": "", - "商品备注": "ASIN:{}; ParentASIN:{}; CategoryPath:{}".format( - base_row.get("ASIN", ""), spu_id, (cat_path[:200] if cat_path else "") - )[:500], - "款式备注": "", - } - return common - - -def build_s_row(base_row): - spu_id = base_row.get("父ASIN") or base_row.get("ASIN") - common = build_common_fields(base_row, spu_id=spu_id) - price = to_price(base_row.get("prime价格($)")) or to_price(base_row.get("价格($)")) or 9.99 - image = base_row.get("商品主图") or "" - - row = {} - row.update(common) - row.update({ - "商品属性*": "S", - "款式1": "", - "款式2": "", - "款式3": "", - "商品售价*": price, - "商品原价": price, - "成本价": "", - "商品SKU": base_row.get("ASIN") or "", - "商品条形码": "", - "商品图片*": image, - "商品主图": image, - }) - return row - - -def build_m_p_rows(variant_rows): - """ - variant_rows: List[dict] with same 父ASIN. - """ - base = variant_rows[0] - spu_id = base.get("父ASIN") or base.get("ASIN") - common = build_common_fields(base, spu_id=spu_id) - - option_dicts = [parse_sku_options(v.get("SKU")) for v in variant_rows] - option_keys = choose_option_keys(option_dicts, max_keys=3) - if not option_keys: - option_keys = ["Variant"] - - # M row - m = {} - m.update(common) - m.update({ - "商品属性*": "M", - "款式1": option_keys[0] if len(option_keys) > 0 else "", - "款式2": option_keys[1] if len(option_keys) > 1 else "", - "款式3": option_keys[2] if len(option_keys) > 2 else "", - "商品售价*": "", - "商品原价": "", - "成本价": "", - "商品SKU": "", - "商品条形码": "", - "商品图片*": base.get("商品主图") or "", - "商品主图": base.get("商品主图") or "", - }) - - # For M row, these SKU-level fields should be empty per template guidance - m["商品重量"] = "" - m["重量单位"] = "" - m["商品库存"] = "" - m["尺寸信息"] = "" - - rows = [m] - - # P rows - for v in variant_rows: - v_common = build_common_fields(v, spu_id=spu_id) - # wipe SPU-only fields for P row - v_common.update({ - "商品副标题": "", - "商品描述": "", - "SEO标题": "", - "SEO描述": "", - "SEO URL Handle": "", - "SEO URL 重定向": "", - "SEO关键词": "", - "专辑名称": "", - "标签": "", - "供应商名称": "", - "供应商URL": "", - "商品备注": "", - }) - - opt = parse_sku_options(v.get("SKU")) - if option_keys == ["Variant"]: - opt_vals = [v.get("ASIN")] - else: - opt_vals = [opt.get(k, "") for k in option_keys] - - price = to_price(v.get("prime价格($)")) or to_price(v.get("价格($)")) or 9.99 - image = v.get("商品主图") or "" - - p = {} - p.update(v_common) - p.update({ - "商品属性*": "P", - "款式1": opt_vals[0] if len(opt_vals) > 0 else "", - "款式2": opt_vals[1] if len(opt_vals) > 1 else "", - "款式3": opt_vals[2] if len(opt_vals) > 2 else "", - "商品售价*": price, - "商品原价": price, - "成本价": "", - "商品SKU": v.get("ASIN") or "", - "商品条形码": "", - # P row supports one variant image; we use variant's main image - "商品图片*": image, - "商品主图": "", - }) - rows.append(p) - - return rows - - -def main(): - parser = argparse.ArgumentParser(description="Convert Amazon-format xlsx files to Shoplazza import xlsx (deprecated script name)") - parser.add_argument("--input-dir", default="data/mai_jia_jing_ling/products_data", help="Directory containing Amazon-format xlsx files") - parser.add_argument("--template", default="docs/商品导入模板.xlsx", help="Shoplazza import template xlsx") - parser.add_argument("--output", default="amazon_shoplazza_import.xlsx", help="Output xlsx file path") - parser.add_argument("--max-files", type=int, default=None, help="Limit number of xlsx files to read (for testing)") - parser.add_argument("--max-rows-per-file", type=int, default=None, help="Limit rows per xlsx file (for testing)") - parser.add_argument("--max-products", type=int, default=None, help="Limit number of SPU groups to output (for testing)") - args = parser.parse_args() - - input_dir = args.input_dir - if not os.path.isdir(input_dir): - raise RuntimeError("input-dir not found: {}".format(input_dir)) - if not os.path.exists(args.template): - raise RuntimeError("template not found: {}".format(args.template)) - - files = [os.path.join(input_dir, f) for f in os.listdir(input_dir) if f.lower().endswith(".xlsx")] - files.sort() - if args.max_files is not None: - files = files[: int(args.max_files)] - - print("Reading Amazon-format files: {} (from {})".format(len(files), input_dir), flush=True) - - groups = defaultdict(list) # spu_id -> [variant rows] - seen_asin = set() - - for fp in files: - print(" - loading: {}".format(fp), flush=True) - try: - rows = read_competitor_rows_from_file(fp, max_rows=args.max_rows_per_file) - except Exception as e: - print("WARN: failed to read {}: {}".format(fp, e)) - continue - print(" loaded rows: {}".format(len(rows)), flush=True) - - for r in rows: - asin = r.get("ASIN") - if asin in seen_asin: - continue - seen_asin.add(asin) - spu_id = r.get("父ASIN") or asin - groups[spu_id].append(r) - - print("Collected variants: {}, SPU groups: {}".format(len(seen_asin), len(groups)), flush=True) - - excel_rows = [] - spu_count = 0 - - for spu_id, variants in groups.items(): - if not variants: - continue - spu_count += 1 - if args.max_products is not None and spu_count > int(args.max_products): - break - if len(variants) == 1: - excel_rows.append(build_s_row(variants[0])) - else: - excel_rows.extend(build_m_p_rows(variants)) - print("Generated Excel rows: {} (SPU groups output: {})".format(len(excel_rows), min(spu_count, len(groups))), flush=True) - create_excel_from_template(args.template, args.output, excel_rows) +from amazon_xlsx_to_shoplazza_xlsx import main as amazon_main if __name__ == "__main__": - main() + amazon_main() diff --git a/scripts/shoplazza_excel_template.py b/scripts/shoplazza_excel_template.py index 2005e04..c2bbec2 100644 --- a/scripts/shoplazza_excel_template.py +++ b/scripts/shoplazza_excel_template.py @@ -6,6 +6,7 @@ based on the provided template `docs/商品导入模板.xlsx`. We keep this in `scripts/` to maximize reuse by existing ad-hoc pipeline scripts. """ +from openpyxl import Workbook from openpyxl import load_workbook from openpyxl.styles import Alignment @@ -46,14 +47,15 @@ def create_excel_from_template(template_file, output_file, excel_rows, header_ro for col in range(1, ws.max_column + 1): ws.cell(row=row, column=col).value = None - # Write data rows + # Write data rows (OPT: only write fields that actually exist in excel_row) + # This avoids looping over all 42 template columns for every output row. for row_idx, excel_row in enumerate(excel_rows): excel_row_num = data_start_row + row_idx - for field_name, col_idx in column_mapping.items(): - if field_name not in excel_row: + for field_name, value in excel_row.items(): + col_idx = column_mapping.get(field_name) + if not col_idx: continue cell = ws.cell(row=excel_row_num, column=col_idx) - value = excel_row[field_name] cell.value = value if isinstance(value, str): cell.alignment = Alignment(vertical='top', wrap_text=True) @@ -65,3 +67,67 @@ def create_excel_from_template(template_file, output_file, excel_rows, header_ro print(" - Total rows: {}".format(len(excel_rows))) +def create_excel_from_template_fast(template_file, output_file, excel_rows, header_row_idx=2, data_start_row=4): + """ + Faster writer for large datasets. + + Instead of opening the template workbook in write mode and assigning cells one by one, + we: + - read the template's first (data_start_row-1) rows as values + - build a header->index mapping from header_row_idx + - create a new write_only workbook and append rows + + This is much faster for tens/hundreds of thousands of cells. + """ + tpl_wb = load_workbook(template_file, read_only=True, data_only=True) + tpl_ws = tpl_wb.active + + max_col = tpl_ws.max_column + + # Copy template "instruction" rows (typically rows 1-3) into output + prefix_rows = list(tpl_ws.iter_rows(min_row=1, max_row=data_start_row - 1, values_only=True)) + + header_values = None + if 1 <= header_row_idx <= len(prefix_rows): + header_values = prefix_rows[header_row_idx - 1] + else: + # Fallback: read header row directly + header_values = next(tpl_ws.iter_rows(min_row=header_row_idx, max_row=header_row_idx, values_only=True)) + + header_values = list(header_values)[:max_col] + col_map = {} + for i, v in enumerate(header_values): + if v is None: + continue + col_map[str(v).strip()] = i # 0-based + + wb = Workbook(write_only=True) + ws = wb.create_sheet(title=tpl_ws.title) + # remove default sheet if present (openpyxl may create one) + if "Sheet" in wb.sheetnames and wb["Sheet"] is not ws: + try: + wb.remove(wb["Sheet"]) + except Exception: + pass + + # Write prefix rows, normalized to max_col + for r in prefix_rows: + r = list(r)[:max_col] + if len(r) < max_col: + r = r + [None] * (max_col - len(r)) + ws.append(r) + + # Write data rows + for excel_row in excel_rows: + row_vals = [None] * max_col + for field_name, value in excel_row.items(): + if field_name not in col_map: + continue + row_vals[col_map[field_name]] = value + ws.append(row_vals) + + wb.save(output_file) + print("Excel file created (fast): {}".format(output_file)) + print(" - Total rows: {}".format(len(excel_rows))) + + -- libgit2 0.21.2