from __future__ import annotations import os import sys # ============================================================ # Rocketbot portable boot (py_libs) # ============================================================ base_dir = os.path.dirname(sys.executable) # ...\Rocketbot libs_dir = os.path.join(base_dir, "py_libs", "py310") sys.path.insert(0, libs_dir) # Rocketbot a veces ya trae pyparsing viejo en memoria for k in list(sys.modules.keys()): if k == "pyparsing" or k.startswith("pyparsing."): del sys.modules[k] def run(): """ v12.9 arrays -> tabla + totales + regla 'Durante Obra' (última cuota pre = 48HS) NUEVO (según pedido): - Si {plan_de_pagos} == 'Durante Obra': sumar todas las cuotas PRE (type_array = 'Pre entrega'), EXCEPTO la última cuota pre. Si esa suma: >= 80% de {valor_de_compra} y < 100% de {valor_compra} entonces: NO mostrar la fecha real de la última cuota pre; en su lugar escribir '48HS ANTES DE LA ENTREGA' (y vaciar VENC). (Incluye: merge FECHA DE PAGO + VENC en esa fila para que el texto se vea bien.) - TOTAL PAGADO / SALDO A PAGAR / VALOR TOTAL COMPRA: se calculan por fechas (<=hoy vs >hoy) y se guardan en vars Rocketbot: {total_pagado}, {saldo_a_pagar}, {valor_total_compra} Siempre con 2 decimales (sin '$'). - Orden: SEÑA -> ENTREGA INICIAL -> TITULO CUOTAS PRE -> resto, insertando TITULO CUOTAS POST al empezar post. Títulos y REFUERZOS en mayúscula. """ # Rocketbot scope-safety imports import json import re import ast import unicodedata import datetime as dt from decimal import Decimal, ROUND_HALF_UP # ---------------------------- # Rocketbot vars helpers # ---------------------------- def _missing(v) -> bool: if v is None: return True if isinstance(v, str): s = v.strip() return s == "" or s == "ERROR_NOT_VAR" return False def _gv(name, default=None): try: v = GetVar(name) except Exception: return default return default if _missing(v) else v def _gvs(name, default="") -> str: v = _gv(name, default) if v is None: return default s = str(v).strip() return default if (s == "" or s == "ERROR_NOT_VAR") else s def _sv(name, value): try: SetVar(name, value) except Exception: pass # ---------------------------- # Safe parsing of possibly huge list strings # ---------------------------- def _parse_any(v, default=None): if v is None: return default if isinstance(v, (list, dict)): return v s = str(v).strip() if s == "" or s == "ERROR_NOT_VAR": return default s = s.replace("\u200b", "").replace("\ufeff", "") # unwrap quoted json-ish if len(s) >= 2 and (s[0] == s[-1]) and s[0] in ("'", '"'): inner = s[1:-1].strip() if inner.startswith("[") or inner.startswith("{"): s = inner # json try: return json.loads(s) except Exception: pass # python literal fixed = re.sub(r"\bnull\b", "None", s, flags=re.I) fixed = re.sub(r"\btrue\b", "True", fixed, flags=re.I) fixed = re.sub(r"\bfalse\b", "False", fixed, flags=re.I) try: return ast.literal_eval(fixed) except Exception: return default def _split_top_level_list(s: str): """Split list-string into top-level elements without full parsing.""" if not isinstance(s, str): return [] txt = s.strip() if not txt.startswith("[") or not txt.endswith("]"): return [] inner = txt[1:-1].strip() if inner == "": return [] items, buf = [], [] depth_sq = 0 # [] depth_cu = 0 # {} in_str = False str_ch = "" esc = False def flush(): chunk = "".join(buf).strip() if chunk: items.append(chunk) buf.clear() for ch in inner: if in_str: buf.append(ch) if esc: esc = False continue if ch == "\\": # escape esc = True continue if ch == str_ch: in_str = False str_ch = "" continue if ch in ("'", '"'): in_str = True str_ch = ch buf.append(ch) continue if ch == "[": depth_sq += 1 buf.append(ch) continue if ch == "]": depth_sq = max(0, depth_sq - 1) buf.append(ch) continue if ch == "{": depth_cu += 1 buf.append(ch) continue if ch == "}": depth_cu = max(0, depth_cu - 1) buf.append(ch) continue if ch == "," and depth_sq == 0 and depth_cu == 0: flush() continue buf.append(ch) flush() return items def _as_list_any(raw): if raw is None: return [] if isinstance(raw, list): return raw parsed = _parse_any(raw, None) if isinstance(parsed, list): return parsed if isinstance(parsed, dict): return [parsed] if isinstance(raw, str): parts = _split_top_level_list(raw) if parts: return parts return [] # ---------------------------- # Extraction helpers # ---------------------------- DATE_RE1 = re.compile(r"\b(\d{1,2}/\d{1,2}/\d{4})\b") DATE_RE2 = re.compile(r"\b(\d{4}-\d{2}-\d{2})\b") DATE_RE3 = re.compile(r"\b(\d{8})\b") def _first_text(obj) -> str: if obj is None: return "" if isinstance(obj, str): s = obj.strip() return "" if s == "None" else s if isinstance(obj, (int, float, Decimal)): return str(obj).strip() if isinstance(obj, dict): for k in ("text", "name", "label", "value", "display_name"): if k in obj and obj.get(k) not in (None, ""): s = _first_text(obj.get(k)) if s: return s for v in obj.values(): s = _first_text(v) if s: return s return "" if isinstance(obj, (list, tuple)): for it in obj: s = _first_text(it) if s: return s return "" return str(obj).strip() def _extract_date(obj) -> str: """Find dd/mm/yyyy or yyyy-mm-dd inside nested structures or element-string.""" if obj is None: return "" if isinstance(obj, str): s = obj m = DATE_RE1.search(s) if m: return m.group(1) m = DATE_RE2.search(s) if m: return m.group(1) m = DATE_RE3.search(s) if m: return m.group(1) return s.strip() if isinstance(obj, dict): for k in ("text", "date", "value", "name"): if k in obj and obj.get(k) not in (None, ""): s = _extract_date(obj.get(k)) m = DATE_RE1.search(s) if m: return m.group(1) m = DATE_RE2.search(s) if m: return m.group(1) m = DATE_RE3.search(s) if m: return m.group(1) for v in obj.values(): s = _extract_date(v) m = DATE_RE1.search(s) if m: return m.group(1) m = DATE_RE2.search(s) if m: return m.group(1) m = DATE_RE3.search(s) if m: return m.group(1) return "" if isinstance(obj, (list, tuple)): for it in obj: s = _extract_date(it) m = DATE_RE1.search(s) if m: return m.group(1) m = DATE_RE2.search(s) if m: return m.group(1) m = DATE_RE3.search(s) if m: return m.group(1) try: blob = json.dumps(obj, ensure_ascii=False) except Exception: blob = str(obj) m = DATE_RE1.search(blob) if m: return m.group(1) m = DATE_RE2.search(blob) if m: return m.group(1) m = DATE_RE3.search(blob) if m: return m.group(1) return "" s = str(obj) m = DATE_RE1.search(s) if m: return m.group(1) m = DATE_RE2.search(s) if m: return m.group(1) m = DATE_RE3.search(s) if m: return m.group(1) return s.strip() # ---------------------------- # Money parsing/formatting # ---------------------------- def _to_decimal(val): if val is None: return None if isinstance(val, Decimal): return val if isinstance(val, int): return Decimal(val) if isinstance(val, float): return Decimal(str(val)) s = str(val).strip() if s == "" or s == "None": return None cleaned = [] for ch in s: if ch.isdigit() or ch in ".,-": cleaned.append(ch) s2 = "".join(cleaned) if s2 == "" or s2 in ("-", ",", "."): return None # "40.000,00" -> 40000.00 ; "40000.00" -> 40000.00 if ("," in s2) and ("." in s2): s2 = s2.replace(".", "").replace(",", ".") elif "," in s2: s2 = s2.replace(",", ".") try: return Decimal(s2) except Exception: return None def _format_es(d: Decimal, decimals: int) -> str: s = format(d, f",.{decimals}f") s = s.replace(",", "X").replace(".", ",").replace("X", ".") return s def _format_money_no_decimals(val): d = _to_decimal(val) if d is None: return "" d = d.quantize(Decimal("1"), rounding=ROUND_HALF_UP) s = format(d, ",.0f").replace(",", ".") return "$" + s def format_money_row(val): return _format_money_no_decimals(val) def format_money_total(val): return _format_money_no_decimals(val) def format_money_var_2dec(val): d = _to_decimal(val) if d is None: return "" return _format_es(d, 2) # ---------------------------- # Date parse/normalize # ---------------------------- def _to_date_any(val): if val is None: return None if isinstance(val, dt.datetime): return val.date() if isinstance(val, dt.date): return val s = str(val).strip() if s == "": return None m = re.match(r"^(\d{1,2})/(\d{1,2})/(\d{4})$", s) if m: try: dd = int(m.group(1)); mm = int(m.group(2)); yy = int(m.group(3)) return dt.date(yy, mm, dd) except Exception: pass m2 = re.match(r"^(\d{4})-(\d{2})-(\d{2})$", s[:10]) if m2: try: return dt.date(int(m2.group(1)), int(m2.group(2)), int(m2.group(3))) except Exception: pass m3 = re.match(r"^(\d{4})(\d{2})(\d{2})$", s) if m3: try: return dt.date(int(m3.group(1)), int(m3.group(2)), int(m3.group(3))) except Exception: pass s2 = s.replace("T", " ").replace("Z", "") try: return dt.datetime.fromisoformat(s2).date() except Exception: return None def _fmt_ddmmyyyy(d, fallback: str) -> str: if d is None: return (fallback or "").strip() try: return d.strftime("%d/%m/%Y") except Exception: return (fallback or "").strip() # ---------------------------- # Concept classification / normalization # ---------------------------- def _ascii_norm(s): if s is None: return "" s = unicodedata.normalize("NFKD", str(s)) s = "".join(ch for ch in s if unicodedata.category(ch) != "Mn") return s.lower().strip() def _is_refuerzo(type_text: str, name_text: str) -> bool: t = _ascii_norm(type_text) n = _ascii_norm(name_text) return ("refuerzo" in t) or ("refuerzo" in n) or ("reforz" in t) or ("reforz" in n) def _is_sena(type_text: str, name_text: str) -> bool: t = _ascii_norm(type_text) n = _ascii_norm(name_text) return ("sena" in t) or ("sena" in n) or ("seña" in t) or ("seña" in n) def _is_entrega_inicial(type_text: str, name_text: str) -> bool: t = _ascii_norm(type_text) n = _ascii_norm(name_text) return ("entrega inicial" in t) or ("entrega inicial" in n) def _is_pre(type_text: str) -> bool: t = _ascii_norm(type_text) return ("pre entrega" in t) or ("preentrega" in t) or ("pre-entrega" in t) def _is_post(type_text: str) -> bool: t = _ascii_norm(type_text) return ("financi" in t) or ("post entrega" in t) or ("postentrega" in t) or ("post-entrega" in t) QUOTA_RE = re.compile(r"(?i)\bcuota\s*:\s*(\d+)") QUOTA_RE2 = re.compile(r"(?i)\bcuota\s*n[°o]\s*(\d+)") def _concept_from_texts(type_text: str, name_text: str) -> str: base = (name_text or type_text or "").strip() if not base: return "" m = QUOTA_RE.search(base) or QUOTA_RE2.search(base) if m: return f"CUOTA N° {m.group(1)}" return base.upper() # ============================================================ # Google Auth / Drive / Docs helpers (imports inside) # ============================================================ SCOPES = {scopes_api_google} def _load_json(path): with open(path, "r", encoding="utf-8") as f: return json.load(f) def get_services(credentials_json_path, token_json_path): from googleapiclient.discovery import build from google.oauth2 import service_account from google.oauth2.credentials import Credentials from google_auth_oauthlib.flow import InstalledAppFlow from google.auth.transport.requests import Request info = _load_json(credentials_json_path) if isinstance(info, dict) and info.get("type") == "service_account": creds = service_account.Credentials.from_service_account_file(credentials_json_path, scopes=SCOPES) docs = build("docs", "v1", credentials=creds, cache_discovery=False) drive = build("drive", "v3", credentials=creds, cache_discovery=False) return docs, drive, "service_account" creds = None if os.path.exists(token_json_path): creds = Credentials.from_authorized_user_file(token_json_path, SCOPES) if (not creds) or (not creds.valid): if creds and creds.expired and creds.refresh_token: creds.refresh(Request()) else: flow = InstalledAppFlow.from_client_secrets_file(credentials_json_path, SCOPES) try: creds = flow.run_local_server(port=0) except Exception: creds = flow.run_console() os.makedirs(os.path.dirname(token_json_path), exist_ok=True) with open(token_json_path, "w", encoding="utf-8") as f: f.write(creds.to_json()) docs = build("docs", "v1", credentials=creds, cache_discovery=False) drive = build("drive", "v3", credentials=creds, cache_discovery=False) return docs, drive, "oauth" def extract_doc_id_from_url(url): m = re.search(r"/document/d/([a-zA-Z0-9_-]+)", url or "") return m.group(1) if m else "" def ensure_docs_api_compatible(drive_service, file_id: str): meta = drive_service.files().get( fileId=file_id, fields="id,name,mimeType,shortcutDetails", supportsAllDrives=True ).execute() if meta.get("mimeType") == "application/vnd.google-apps.shortcut": target = (meta.get("shortcutDetails") or {}).get("targetId", "") if not target: raise RuntimeError("Es shortcut pero no trae targetId.") file_id = target meta = drive_service.files().get( fileId=file_id, fields="id,name,mimeType", supportsAllDrives=True ).execute() if meta.get("mimeType") == "application/vnd.google-apps.document": return file_id, meta, False new_name = (meta.get("name") or "Documento") + " (Google Docs)" converted = drive_service.files().copy( fileId=file_id, body={"name": new_name, "mimeType": "application/vnd.google-apps.document"}, fields="id,name,mimeType", supportsAllDrives=True ).execute() return converted["id"], converted, True def docs_get(docs_service, doc_id: str): try: return docs_service.documents().get(documentId=doc_id, includeTabsContent=True).execute() except TypeError: return docs_service.documents().get(documentId=doc_id).execute() # ============================================================ # Marker search (tabs-aware) # ============================================================ def _walk_tabs(tabs): if not tabs: return for t in tabs: yield t for x in _walk_tabs(t.get("childTabs") or []): yield x def _list_scopes(doc): scopes = [("main", None)] tabs = doc.get("tabs") or [] for t in _walk_tabs(tabs): tid = (t.get("tabProperties") or {}).get("tabId") if tid is None: continue scopes.append(("tab", str(tid))) return scopes def _get_body_content_for_scope(doc, scope_kind, scope_id): if scope_kind == "main": return (doc.get("body") or {}).get("content", []) or [] tabs = doc.get("tabs") or [] for t in _walk_tabs(tabs): tid = (t.get("tabProperties") or {}).get("tabId") if tid is None: continue if str(tid) == str(scope_id): dtab = t.get("documentTab") or {} return (dtab.get("body") or {}).get("content", []) or [] return [] def _norm_alnum(s: str) -> str: if s is None: return "" s = unicodedata.normalize("NFKD", str(s)).lower() out = [] for ch in s: if unicodedata.category(ch) == "Mn": continue if ch.isalnum(): out.append(ch) return "".join(out) def _iter_text_chars(content_list): def walk(lst): for el in lst or []: if not isinstance(el, dict): continue p = el.get("paragraph") if p: base = el.get("startIndex") or 0 cursor = int(base) for pe in (p.get("elements") or []): tr = pe.get("textRun") if not tr: en = pe.get("endIndex") if en is not None: cursor = int(en) continue txt = tr.get("content", "") or "" st = pe.get("startIndex") or cursor st = int(st) for i, ch in enumerate(txt): if ch in ("\u200b", "\ufeff"): continue yield (st + i, ch) en = pe.get("endIndex") cursor = int(en) if en is not None else st + len(txt) continue t = el.get("table") if t: for row in (t.get("tableRows") or []): for cell in (row.get("tableCells") or []): yield from walk(cell.get("content", [])) continue toc = el.get("tableOfContents") if toc: yield from walk(toc.get("content", [])) continue yield from walk(content_list) def find_marker(doc, marker_raw: str): mr = (marker_raw or "").strip() candidates = [] for m in [mr, mr.strip("_"), "TABLA_PROFORMA", "__TABLA_PROFORMA__", "TABLA PROFORMA", "{TABLA_PROFORMA}", "{" + mr + "}"]: if m and m not in candidates: candidates.append(m) for scope_kind, scope_id in _list_scopes(doc): content = _get_body_content_for_scope(doc, scope_kind, scope_id) raw_chars, raw_map = [], [] for doc_i, ch in _iter_text_chars(content): raw_chars.append(ch) raw_map.append(int(doc_i)) raw_text = "".join(raw_chars) for m in candidates: pos = raw_text.find(m) if pos != -1: sdoc = raw_map[pos] edoc = raw_map[pos + len(m) - 1] + 1 return {"hit": {"scope_kind": scope_kind, "tabId": scope_id, "start": sdoc, "end": edoc, "used": m}, "candidates": candidates} pos2 = raw_text.lower().find(m.lower()) if pos2 != -1: sdoc = raw_map[pos2] edoc = raw_map[pos2 + len(m) - 1] + 1 return {"hit": {"scope_kind": scope_kind, "tabId": scope_id, "start": sdoc, "end": edoc, "used": m}, "candidates": candidates} alnum, alnum_map = [], [] for i, ch in enumerate(raw_chars): if ch.isalnum(): alnum.append(ch.lower()) alnum_map.append(raw_map[i]) alnum_text = "".join(alnum) for m in candidates: target = _norm_alnum(m) if not target: continue pos = alnum_text.find(target) if pos != -1: sdoc = alnum_map[pos] edoc = alnum_map[pos + len(target) - 1] + 1 return {"hit": {"scope_kind": scope_kind, "tabId": scope_id, "start": sdoc, "end": edoc, "used": m}, "candidates": candidates} return {"hit": None, "candidates": candidates} # ============================================================ # Docs batchUpdate helpers # ============================================================ def _loc(index, tab_id): d = {"index": int(index)} if tab_id is not None: d["tabId"] = str(tab_id) return d def _range(start, end, tab_id): d = {"startIndex": int(start), "endIndex": int(end)} if tab_id is not None: d["tabId"] = str(tab_id) return d def _rgb(r, g, b): return {"color": {"rgbColor": {"red": r, "green": g, "blue": b}}} def _merge_req(table_start, tab_id, row, col, row_span, col_span): loc = {"tableStartLocation": _loc(table_start, tab_id), "rowIndex": row, "columnIndex": col} tr = {"tableCellLocation": loc, "rowSpan": row_span, "columnSpan": col_span} return {"mergeTableCells": {"tableRange": tr}} def _cell_bg_req(table_start, tab_id, row, col, row_span, col_span, rgb): loc = {"tableStartLocation": _loc(table_start, tab_id), "rowIndex": row, "columnIndex": col} tr = {"tableCellLocation": loc, "rowSpan": row_span, "columnSpan": col_span} style = {"backgroundColor": rgb} return {"updateTableCellStyle": {"tableRange": tr, "tableCellStyle": style, "fields": "backgroundColor"}} def _cell_borders_req(table_start, tab_id, row, col, row_span, col_span, *, top=None, bottom=None, left=None, right=None): """Update borders for a table cell range. Pass border dicts or None (keeps existing).""" loc = {"tableStartLocation": _loc(table_start, tab_id), "rowIndex": row, "columnIndex": col} tr = {"tableCellLocation": loc, "rowSpan": row_span, "columnSpan": col_span} style = {} fields = [] if top is not None: style["borderTop"] = top fields.append("borderTop") if bottom is not None: style["borderBottom"] = bottom fields.append("borderBottom") if left is not None: style["borderLeft"] = left fields.append("borderLeft") if right is not None: style["borderRight"] = right fields.append("borderRight") if not fields: return None return {"updateTableCellStyle": {"tableRange": tr, "tableCellStyle": style, "fields": ",".join(fields)}} def _border_none(): # Border "invisible": width 0 return {"width": {"magnitude": 0, "unit": "PT"}, "dashStyle": "SOLID", "color": {"color": {"rgbColor": {"red": 1, "green": 1, "blue": 1}}}} def _batch_update(docs_service, doc_id, reqs, chunk=350): for i in range(0, len(reqs), chunk): docs_service.documents().batchUpdate( documentId=doc_id, body={"requests": reqs[i:i + chunk]} ).execute() def get_first_table_after_index(doc, scope_kind, tab_id, index_hint): body = _get_body_content_for_scope(doc, scope_kind, tab_id) best_table = None best_start = None for el in body: table = el.get("table") if not table: continue st = el.get("startIndex", 0) if st >= index_hint: if best_start is None or st < best_start: best_start = st best_table = table return (best_table, best_start) def get_cell_insert_index(table, r, c): cell = table["tableRows"][r]["tableCells"][c] content = cell.get("content", []) for el in content: if el.get("paragraph") is not None: en = el.get("endIndex") if en is not None: return int(en) - 1 for el in content: en = el.get("endIndex") if en is not None: return int(en) - 1 raise RuntimeError(f"No pude calcular indice de insercion para celda ({r},{c}).") def get_cell_text_range(table, r, c): """Return (min_start, max_end) indices for the given cell content.""" cell = table["tableRows"][r]["tableCells"][c] mins = None maxe = None for el in (cell.get("content") or []): st = el.get("startIndex") en = el.get("endIndex") if st is not None: mins = int(st) if mins is None else min(mins, int(st)) if en is not None: maxe = int(en) if maxe is None else max(maxe, int(en)) if mins is None or maxe is None: # fallback: use insert index idx = get_cell_insert_index(table, r, c) return idx, idx + 1 return mins, maxe # ============================================================ # Build rows # ============================================================ def _base_info_rows(data): rows = [] def add_info(label, value): rows.append([label, str(value) if value is not None else "", "", ""]) add_info("EDIFICIO:", data.get("proyecto", "")) add_info("DPTO.:", data.get("producto_odoo", "")) return rows def build_rows_from_arrays(data): type_array = _as_list_any(data.get("type_array")) name_array = _as_list_any(data.get("name_array")) date_array = _as_list_any(data.get("date_array")) amount_array = _as_list_any(data.get("amount_array")) lens = [len(type_array), len(name_array), len(date_array), len(amount_array)] total_len = min(lens) if lens else 0 _sv("dbg_total_len", str(total_len)) entries = [] for i in range(total_len): ttxt = _first_text(type_array[i]) ntxt = _first_text(name_array[i]) dtext = _extract_date(date_array[i]) atext = _first_text(amount_array[i]) amt = _to_decimal(atext) entries.append({"i": i, "type_text": ttxt, "name_text": ntxt, "date_text": dtext, "amount_dec": amt}) sena_item = next((e for e in entries if _is_sena(e["type_text"], e["name_text"])), None) entrega_item = next((e for e in entries if _is_entrega_inicial(e["type_text"], e["name_text"])), None) remaining = [] for e in entries: if sena_item is not None and e["i"] == sena_item["i"]: continue if entrega_item is not None and e["i"] == entrega_item["i"]: continue remaining.append(e) rows = _base_info_rows(data) header_row_index = len(rows) rows.append(["COSTO TOTAL", "MONTO", "FECHA DE PAGO", "VENC"]) cost_total_row_idx = len(rows) rows.append(["COSTO TOTAL", "", "", ""]) title_rows = [] refuerzo_rows = [] merge_fecha_venc_rows = [] today = dt.date.today() total_pagado = Decimal("0") saldo_a_pagar = Decimal("0") pre_quota_rows = [] # [(row_idx, amt_dec), ...] cuotas PRE (type Pre entrega, no refuerzo) def add_payment_row(concepto, amt_dec, date_text, mark_refuerzo=False, force_paid=False): nonlocal total_pagado, saldo_a_pagar d_obj = _to_date_any(date_text) shown = _fmt_ddmmyyyy(d_obj, date_text) fecha_pago = "" venc = "" if force_paid: fecha_pago = shown if shown else (str(date_text).strip() if date_text is not None else "") venc = "" else: if d_obj is not None: if d_obj <= today: fecha_pago = shown else: venc = shown else: venc = shown if shown else "SIN FECHA" monto_fmt = format_money_row(amt_dec) if (amt_dec is not None and amt_dec > 0) else "" if amt_dec is not None and amt_dec > 0: if force_paid or fecha_pago: total_pagado += amt_dec else: saldo_a_pagar += amt_dec rows.append([concepto, monto_fmt, fecha_pago, venc]) row_idx = len(rows) - 1 if mark_refuerzo: refuerzo_rows.append(row_idx) return row_idx # Orden fijo: SEÑA -> ENTREGA INICIAL if sena_item is not None: add_payment_row("SEÑA", sena_item["amount_dec"], sena_item["date_text"], False, True) if entrega_item is not None: add_payment_row("ENTREGA INICIAL", entrega_item["amount_dec"], entrega_item["date_text"], False, True) pre_title_added = False post_title_added = False ref_no = 1 for e in remaining: ttxt = e["type_text"] ntxt = e["name_text"] dtext = e["date_text"] amt_dec = e["amount_dec"] is_ref = _is_refuerzo(ttxt, ntxt) is_post = _is_post(ttxt) group = "post" if is_post else "pre" if group == "pre" and not pre_title_added: title_rows.append(len(rows)) rows.append(["CUOTAS PRE ENTREGA", "", "", ""]) pre_title_added = True if group == "post" and not post_title_added: title_rows.append(len(rows)) rows.append(["CUOTAS POST ENTREGA", "", "", ""]) post_title_added = True if is_ref: add_payment_row(f"{ref_no}° REFUERZO", amt_dec, dtext, True) ref_no += 1 else: concepto = _concept_from_texts(ttxt, ntxt) row_idx = add_payment_row(concepto, amt_dec, dtext, False) if _is_pre(ttxt): pre_quota_rows.append((row_idx, amt_dec)) # ---- Regla DURANTE OBRA ---- plan = _ascii_norm(data.get("plan_de_pagos", "")) is_durante_obra = (plan.replace(" ", "") == "duranteobra") base80 = _to_decimal(data.get("valor_de_compra")) base100 = _to_decimal(data.get("valor_compra")) # fallback: si no hay valor_compra, usar valor_total_compra calculado # (pero para la regla, si no hay base80 tampoco, no se aplica) if is_durante_obra and len(pre_quota_rows) >= 2 and base80 is not None: sum_pre_except_last = Decimal("0") for (_, amt) in pre_quota_rows[:-1]: if amt is not None and amt > 0: sum_pre_except_last += amt last_row_idx, _ = pre_quota_rows[-1] thr80 = base80 * Decimal("0.8") if base100 is None: # provisional: usar el total calculado más abajo pass else: if (sum_pre_except_last >= thr80) and (sum_pre_except_last < base100): rows[last_row_idx][2] = "48HS ANTES DE LA ENTREGA" rows[last_row_idx][3] = "" merge_fecha_venc_rows.append(last_row_idx) _sv("dbg_sum_pre_except_last", str(sum_pre_except_last)) _sv("dbg_thr80", str(thr80)) _sv("dbg_valor_de_compra_used", str(base80)) _sv("dbg_valor_compra_used", str(base100) if base100 is not None else "") # ---- Totales ---- valor_total_compra = total_pagado + saldo_a_pagar # si base100 faltaba, re-evaluar la condición usando valor_total_compra como 100% if is_durante_obra and len(pre_quota_rows) >= 2 and base80 is not None and base100 is None: base100 = valor_total_compra sum_pre_except_last = _to_decimal(_gvs("dbg_sum_pre_except_last", "0")) or Decimal("0") thr80 = base80 * Decimal("0.8") last_row_idx, _ = pre_quota_rows[-1] if (sum_pre_except_last >= thr80) and (sum_pre_except_last < base100): rows[last_row_idx][2] = "48HS ANTES DE LA ENTREGA" rows[last_row_idx][3] = "" if last_row_idx not in merge_fecha_venc_rows: merge_fecha_venc_rows.append(last_row_idx) _sv("dbg_valor_compra_used", str(base100)) rows[cost_total_row_idx][1] = format_money_total(valor_total_compra) total_pagado_row_idx = len(rows) rows.append(["TOTAL PAGADO", format_money_total(total_pagado), "", ""]) saldo_row_idx = len(rows) rows.append(["SALDO A PAGAR", format_money_total(saldo_a_pagar), "", ""]) # Filas debajo de la tabla (sin bordes): # 1) fila en blanco (separación) # 2) fila con línea de firma (guiones bajos) # 3) fila con texto COMPRADOR blank_row_idx = len(rows) rows.append(["", "", "", ""]) line_row_idx = len(rows) rows.append(["_______________________________________________", "", "", ""]) buyer_row_idx = len(rows) rows.append(["COMPRADOR", "", "", ""]) # Vars Rocketbot (sin $) _sv("total_pagado", format_money_var_2dec(total_pagado)) _sv("saldo_a_pagar", format_money_var_2dec(saldo_a_pagar)) _sv("valor_total_compra_financiado", format_money_var_2dec(valor_total_compra)) meta = { "header_row_index": header_row_index, "cost_total_row": cost_total_row_idx, "title_rows": title_rows, "refuerzo_rows": refuerzo_rows, "merge_fecha_venc_rows": merge_fecha_venc_rows, "total_pagado_row": total_pagado_row_idx, "saldo_row": saldo_row_idx, "blank_row": blank_row_idx, "line_row": line_row_idx, "signer_row": buyer_row_idx, } return rows, meta # ============================================================ # Insert table # ============================================================ def insert_table_at_marker(docs_service, doc_id, marker, rows, meta): doc = docs_get(docs_service, doc_id) found = find_marker(doc, marker) hit = (found or {}).get("hit") if not hit: _sv("gdoc_status", "ERROR") _sv("gdoc_error", "No encontre el marcador: " + str(marker)) raise RuntimeError("No encontre el marcador en el documento: " + str(marker)) scope_kind = hit.get("scope_kind", "main") tab_id = hit.get("tabId") start = int(hit["start"]) end = int(hit["end"]) reqs = [] reqs.append({"deleteContentRange": {"range": _range(start, end, tab_id)}}) # Layout base (4 columnas) COL_WIDTHS_PT = { 0: 150, # CONCEPTO 1: 85, # MONTO 2: 95, # FECHA DE PAGO 3: 95, # VENC } ROW_MIN_HEIGHT_PT = 11 CELL_PADDING_LR_PT = 1.5 CELL_PADDING_TB_PT = 0.5 def _dim_pt(dim, default=0.0): try: if not dim: return float(default) mag = dim.get("magnitude", default) return float(mag) except Exception: return float(default) # --- Center table on page (best-effort) --- # Google Docs UI supports table alignment/indent, but the Docs API doesn't expose a direct # 'tableAlignment' field. A common workaround is to set the paragraph indent at the insertion point. # (See: community/StackOverflow discussions about table alignment via updateParagraphStyle.) try: doc_style = (doc.get("documentStyle") or {}) page_w = _dim_pt(((doc_style.get("pageSize") or {}).get("width")), 612.0) m_left = _dim_pt(doc_style.get("marginLeft"), 72.0) m_right = _dim_pt(doc_style.get("marginRight"), 72.0) content_w = max(0.0, page_w - m_left - m_right) # Table width in points = sum of our explicit column widths (already set later). # If column widths are not set for all columns, fall back to 0 (no indent). table_w = 0.0 try: table_w = float(sum(COL_WIDTHS_PT.values())) except Exception: table_w = 0.0 indent = (content_w - table_w) / 2.0 if content_w > 0 and table_w > 0 else 0.0 if indent < 0: indent = 0.0 # Apply to the paragraph where the marker lives (range start..start+1 is enough to overlap it). reqs.append({ "updateParagraphStyle": { "range": _range(start, start + 1, tab_id), "paragraphStyle": { "indentStart": {"unit": "PT", "magnitude": indent}, "alignment": "START" }, "fields": "indentStart,alignment" } }) _sv("dbg_table_center_indent_pt", str(indent)) except Exception as _e: _sv("dbg_table_center_indent_pt", "0") reqs.append({"insertTable": {"rows": len(rows), "columns": 4, "location": _loc(start, tab_id)}}) _batch_update(docs_service, doc_id, reqs) doc2 = docs_get(docs_service, doc_id) table, table_start = get_first_table_after_index(doc2, scope_kind, tab_id, start) if not table: raise RuntimeError("Inserte la tabla pero no pude ubicarla al releer el doc.") # Fill text: high->low indices fill_items = [] for r in range(len(rows)): for c in range(4): txt = str(rows[r][c] or "") if txt.strip() == "": continue idx = get_cell_insert_index(table, r, c) fill_items.append((idx, {"insertText": {"location": _loc(idx, tab_id), "text": txt}})) if fill_items: fill_items.sort(key=lambda x: x[0], reverse=True) _batch_update(docs_service, doc_id, [req for _, req in fill_items]) # Releer para poder alinear/bordes en filas especiales (spacer + COMPRADOR) doc3 = docs_get(docs_service, doc_id) table3, _ = get_first_table_after_index(doc3, scope_kind, tab_id, start) if table3: try: blank_row = int(meta.get("blank_row")) line_row = int(meta.get("line_row")) sg_row = int(meta.get("signer_row")) # merge blank + line + buyer across all 4 columns style_extra = [] # Línea negra debajo de SALDO A PAGAR (antes de zona sin líneas) try: saldo_row = int(meta.get("saldo_row")) black = {"width": {"magnitude": 1.0, "unit": "PT"}, "dashStyle": "SOLID", "color": {"color": {"rgbColor": {"red": 0, "green": 0, "blue": 0}}}} br = _cell_borders_req(table_start, tab_id, saldo_row, 0, 1, 4, bottom=black) if br: style_extra.append(br) except Exception: pass # Alineación: # - Header (títulos) en col 2 y 3: IZQUIERDA (como el resto) # - Filas de datos en col 2 y 3: CENTRADO try: hdr_row = int(meta.get("header_row_index")) last_row = int(meta.get("saldo_row")) # hasta SALDO (inclusive) # Header: left for cc in (2, 3): st, en = get_cell_text_range(table3, hdr_row, cc) if st is not None and en is not None and int(en) > int(st): style_extra.append({ "updateParagraphStyle": { "range": _range(st, en, tab_id), "paragraphStyle": {"alignment": "START"}, "fields": "alignment", } }) # Data rows: center (desde hdr+1) for rr in range(hdr_row + 1, last_row + 1): for cc in (2, 3): st, en = get_cell_text_range(table3, rr, cc) if st is None or en is None or int(en) <= int(st): continue style_extra.append({ "updateParagraphStyle": { "range": _range(st, en, tab_id), "paragraphStyle": {"alignment": "CENTER"}, "fields": "alignment", } }) except Exception: pass style_extra.append(_merge_req(table_start, tab_id, blank_row, 0, 1, 4)) style_extra.append(_merge_req(table_start, tab_id, line_row, 0, 1, 4)) style_extra.append(_merge_req(table_start, tab_id, sg_row, 0, 1, 4)) # remove borders for blank/line/buyer rows (to look like separate text) nb = _border_none() # Mantener la linea negra debajo de SALDO A PAGAR: # si ponemos top=none en la fila en blanco, Google Docs puede ocultar el borde inferior # de la fila anterior (SALDO). Por eso dejamos top en NEGRO y el resto sin bordes. black_top = {"width": {"magnitude": 1.0, "unit": "PT"}, "dashStyle": "SOLID", "color": {"color": {"rgbColor": {"red": 0, "green": 0, "blue": 0}}}} br1 = _cell_borders_req(table_start, tab_id, blank_row, 0, 1, 4, top=black_top, bottom=nb, left=nb, right=nb) br2 = _cell_borders_req(table_start, tab_id, line_row, 0, 1, 4, top=nb, bottom=nb, left=nb, right=nb) br3 = _cell_borders_req(table_start, tab_id, sg_row, 0, 1, 4, top=nb, bottom=nb, left=nb, right=nb) for br in (br1, br2, br3): if br: style_extra.append(br) # center-align text in LINE row and COMPRADOR row; make COMPRADOR bold l_st, l_en = get_cell_text_range(table3, line_row, 0) style_extra.append({ "updateParagraphStyle": { "range": _range(l_st, l_en, tab_id), "paragraphStyle": {"alignment": "CENTER"}, "fields": "alignment", } }) s_st, s_en = get_cell_text_range(table3, sg_row, 0) style_extra.append({ "updateParagraphStyle": { "range": _range(s_st, s_en, tab_id), "paragraphStyle": {"alignment": "CENTER"}, "fields": "alignment", } }) style_extra.append({ "updateTextStyle": { "range": _range(s_st, s_en, tab_id), "textStyle": {"bold": True}, "fields": "bold", } }) _batch_update(docs_service, doc_id, style_extra) except Exception as _e: # debug no bloqueante _sv("dbg_signer_style_error", str(_e)) # Styles GREEN = _rgb(0.82, 0.91, 0.78) YELLOW = _rgb(1.0, 0.95, 0.45) GRAY = _rgb(0.90, 0.90, 0.90) style = [] # merge info rows col 1..3 for rr in range(0, 2): style.append(_merge_req(table_start, tab_id, rr, 1, 1, 3)) # EDIFICIO + DPTO green (rows 0,1) style.append(_cell_bg_req(table_start, tab_id, 0, 0, 1, 4, GREEN)) style.append(_cell_bg_req(table_start, tab_id, 1, 0, 1, 4, GREEN)) hdr = meta["header_row_index"] style.append(_cell_bg_req(table_start, tab_id, hdr, 0, 1, 4, GRAY)) style.append(_cell_bg_req(table_start, tab_id, meta["cost_total_row"], 0, 1, 4, YELLOW)) for tr in meta.get("title_rows", []): style.append(_cell_bg_req(table_start, tab_id, tr, 0, 1, 4, YELLOW)) style.append(_merge_req(table_start, tab_id, tr, 1, 1, 3)) for rr in meta.get("refuerzo_rows", []): style.append(_cell_bg_req(table_start, tab_id, rr, 0, 1, 4, YELLOW)) for rr in meta.get("merge_fecha_venc_rows", []): style.append(_merge_req(table_start, tab_id, rr, 2, 1, 2)) total_pagado_row = meta.get("total_pagado_row") saldo_row = meta.get("saldo_row") style.append(_cell_bg_req(table_start, tab_id, total_pagado_row, 0, 1, 4, YELLOW)) style.append(_cell_bg_req(table_start, tab_id, saldo_row, 0, 1, 4, YELLOW)) style.append(_merge_req(table_start, tab_id, total_pagado_row, 2, 1, 2)) style.append(_merge_req(table_start, tab_id, saldo_row, 2, 1, 2)) # ------------------------------------------------------------ # Layout tweaks (column widths + thinner rows) # ------------------------------------------------------------ # Nota: Google Docs usa puntos (PT). Ajustá estos números si querés afinar más. # Se reutilizan las medidas definidas al inicio de la inserción try: _sv("dbg_col_widths_pt", json.dumps(COL_WIDTHS_PT)) _sv("dbg_row_min_height_pt", str(ROW_MIN_HEIGHT_PT)) except Exception: pass # Anchos de columnas (todas las columnas, para evitar cortes y saltos de línea) for col_idx, w in COL_WIDTHS_PT.items(): style.append({ "updateTableColumnProperties": { "tableStartLocation": _loc(table_start, tab_id), "columnIndices": [int(col_idx)], "tableColumnProperties": { "widthType": "FIXED_WIDTH", "width": {"magnitude": float(w), "unit": "PT"}, }, # IMPORTANTE: incluir widthType en fields (si no, la API puede fallar) "fields": "widthType,width", } }) # Filas más finas (aplica a todas las filas con rowIndices: []) style.append({ "updateTableRowStyle": { "tableStartLocation": _loc(table_start, tab_id), "rowIndices": list(range(len(rows))), "tableRowStyle": { "minRowHeight": {"magnitude": float(ROW_MIN_HEIGHT_PT), "unit": "PT"} }, "fields": "minRowHeight", } }) # Header un poco más alto para evitar cortes si llegara a envolver try: hdr_i = int(meta.get("header_row_index")) style.append({ "updateTableRowStyle": { "tableStartLocation": _loc(table_start, tab_id), "rowIndices": [hdr_i], "tableRowStyle": {"minRowHeight": {"magnitude": 16.0, "unit": "PT"}}, "fields": "minRowHeight", } }) except Exception: pass # Menos padding en las celdas (reduce altura percibida) style.append({ "updateTableCellStyle": { "tableRange": { "tableCellLocation": { "tableStartLocation": _loc(table_start, tab_id), "rowIndex": 0, "columnIndex": 0, }, "rowSpan": int(len(rows)), "columnSpan": 4, }, "tableCellStyle": { "paddingLeft": {"magnitude": float(CELL_PADDING_LR_PT), "unit": "PT"}, "paddingRight": {"magnitude": float(CELL_PADDING_LR_PT), "unit": "PT"}, "paddingTop": {"magnitude": float(CELL_PADDING_TB_PT), "unit": "PT"}, "paddingBottom": {"magnitude": float(CELL_PADDING_TB_PT), "unit": "PT"}, }, "fields": "paddingLeft,paddingRight,paddingTop,paddingBottom", } }) if style: _batch_update(docs_service, doc_id, style) _sv("gdoc_status", "OK") _sv("gdoc_error", "") # ============================================================ # MAIN # ============================================================ marker = _gvs("gdoc_marker", "TABLA_PROFORMA") cred_path = _gvs("gdoc_sa_json", "") if cred_path == "": raise RuntimeError("Falta gdoc_sa_json") if not os.path.isabs(cred_path): cred_path = os.path.join(base_dir, cred_path) if not os.path.exists(cred_path): raise RuntimeError("No existe credentials.json: " + cred_path) token_path = _gvs("gdoc_token_json", "") if token_path == "": token_path = os.path.join(base_dir, "credentials", "token_gdocs_drive.json") if not os.path.isabs(token_path): token_path = os.path.join(base_dir, token_path) docs_service, drive_service, _ = get_services(cred_path, token_path) url = _gvs("current_url", "") if url == "": raise RuntimeError("Falta current_url") doc_id = extract_doc_id_from_url(url) if doc_id == "": raise RuntimeError("No pude extraer documentId de la URL: " + url) doc_id_final, _, _ = ensure_docs_api_compatible(drive_service, doc_id) data = { "nombre_comprador": _gvs("nombre_comprador", ""), "numero_documento": _gvs("numero_documento", ""), "domicilio": _gvs("domicilio", ""), "telefono": _gvs("telefono", ""), "email": _gvs("email", ""), "proyecto": _gvs("proyecto", ""), "producto_odoo": _gvs("producto_odoo", ""), "plan_de_pagos": _gvs("plan_de_pagos", ""), "valor_de_compra": _gvs("valor_de_compra", ""), "valor_compra": _gvs("valor_compra", ""), "type_array": _gv("type_array", None), "name_array": _gv("name_array", None), "date_array": _gv("date_array", None), "amount_array": _gv("amount_array", None), } rows, meta = build_rows_from_arrays(data) insert_table_at_marker(docs_service, doc_id_final, marker, rows, meta) # Entrypoint try: run() except Exception as e: try: SetVar("gdoc_status", "ERROR") SetVar("gdoc_error", str(e)) except Exception: pass raise