diff --git a/plan_pagos_financiado_durante_obra.py b/plan_pagos_financiado_durante_obra.py new file mode 100644 index 0000000..64899b6 --- /dev/null +++ b/plan_pagos_financiado_durante_obra.py @@ -0,0 +1,1411 @@ +from __future__ import annotations + +import os +import sys + +# ============================================================ +# Rocketbot portable boot (py_libs) +# ============================================================ +base_dir = os.path.dirname(sys.executable) # ...\Rocketbot +libs_dir = os.path.join(base_dir, "py_libs", "py310") +sys.path.insert(0, libs_dir) + +# Rocketbot a veces ya trae pyparsing viejo en memoria +for k in list(sys.modules.keys()): + if k == "pyparsing" or k.startswith("pyparsing."): + del sys.modules[k] + + +def run(): + """ + v12.9 arrays -> tabla + totales + regla 'Durante Obra' (última cuota pre = 48HS) + + NUEVO (según pedido): + - Si {plan_de_pagos} == 'Durante Obra': + sumar todas las cuotas PRE (type_array = 'Pre entrega'), EXCEPTO la última cuota pre. + Si esa suma: + >= 80% de {valor_de_compra} + y < 100% de {valor_compra} + entonces: NO mostrar la fecha real de la última cuota pre; + en su lugar escribir '48HS ANTES DE LA ENTREGA' (y vaciar VENC). + (Incluye: merge FECHA DE PAGO + VENC en esa fila para que el texto se vea bien.) + + - TOTAL PAGADO / SALDO A PAGAR / VALOR TOTAL COMPRA: + se calculan por fechas (<=hoy vs >hoy) y se guardan en vars Rocketbot: + {total_pagado}, {saldo_a_pagar}, {valor_total_compra} + Siempre con 2 decimales (sin '$'). + + - Orden: SEÑA -> ENTREGA INICIAL -> TITULO CUOTAS PRE -> resto, insertando TITULO CUOTAS POST al empezar post. + Títulos y REFUERZOS en mayúscula. + """ + # Rocketbot scope-safety imports + import json + import re + import ast + import unicodedata + import datetime as dt + from decimal import Decimal, ROUND_HALF_UP + + # ---------------------------- + # Rocketbot vars helpers + # ---------------------------- + def _missing(v) -> bool: + if v is None: + return True + if isinstance(v, str): + s = v.strip() + return s == "" or s == "ERROR_NOT_VAR" + return False + + def _gv(name, default=None): + try: + v = GetVar(name) + except Exception: + return default + return default if _missing(v) else v + + def _gvs(name, default="") -> str: + v = _gv(name, default) + if v is None: + return default + s = str(v).strip() + return default if (s == "" or s == "ERROR_NOT_VAR") else s + + def _sv(name, value): + try: + SetVar(name, value) + except Exception: + pass + + # ---------------------------- + # Safe parsing of possibly huge list strings + # ---------------------------- + def _parse_any(v, default=None): + if v is None: + return default + if isinstance(v, (list, dict)): + return v + + s = str(v).strip() + if s == "" or s == "ERROR_NOT_VAR": + return default + + s = s.replace("\u200b", "").replace("\ufeff", "") + + # unwrap quoted json-ish + if len(s) >= 2 and (s[0] == s[-1]) and s[0] in ("'", '"'): + inner = s[1:-1].strip() + if inner.startswith("[") or inner.startswith("{"): + s = inner + + # json + try: + return json.loads(s) + except Exception: + pass + + # python literal + fixed = re.sub(r"\bnull\b", "None", s, flags=re.I) + fixed = re.sub(r"\btrue\b", "True", fixed, flags=re.I) + fixed = re.sub(r"\bfalse\b", "False", fixed, flags=re.I) + try: + return ast.literal_eval(fixed) + except Exception: + return default + + def _split_top_level_list(s: str): + """Split list-string into top-level elements without full parsing.""" + if not isinstance(s, str): + return [] + txt = s.strip() + if not txt.startswith("[") or not txt.endswith("]"): + return [] + inner = txt[1:-1].strip() + if inner == "": + return [] + + items, buf = [], [] + depth_sq = 0 # [] + depth_cu = 0 # {} + in_str = False + str_ch = "" + esc = False + + def flush(): + chunk = "".join(buf).strip() + if chunk: + items.append(chunk) + buf.clear() + + for ch in inner: + if in_str: + buf.append(ch) + if esc: + esc = False + continue + if ch == "\\": # escape + esc = True + continue + if ch == str_ch: + in_str = False + str_ch = "" + continue + + if ch in ("'", '"'): + in_str = True + str_ch = ch + buf.append(ch) + continue + + if ch == "[": + depth_sq += 1 + buf.append(ch) + continue + if ch == "]": + depth_sq = max(0, depth_sq - 1) + buf.append(ch) + continue + if ch == "{": + depth_cu += 1 + buf.append(ch) + continue + if ch == "}": + depth_cu = max(0, depth_cu - 1) + buf.append(ch) + continue + + if ch == "," and depth_sq == 0 and depth_cu == 0: + flush() + continue + + buf.append(ch) + + flush() + return items + + def _as_list_any(raw): + if raw is None: + return [] + if isinstance(raw, list): + return raw + parsed = _parse_any(raw, None) + if isinstance(parsed, list): + return parsed + if isinstance(parsed, dict): + return [parsed] + if isinstance(raw, str): + parts = _split_top_level_list(raw) + if parts: + return parts + return [] + + # ---------------------------- + # Extraction helpers + # ---------------------------- + DATE_RE1 = re.compile(r"\b(\d{1,2}/\d{1,2}/\d{4})\b") + DATE_RE2 = re.compile(r"\b(\d{4}-\d{2}-\d{2})\b") + DATE_RE3 = re.compile(r"\b(\d{8})\b") + + def _first_text(obj) -> str: + if obj is None: + return "" + if isinstance(obj, str): + s = obj.strip() + return "" if s == "None" else s + if isinstance(obj, (int, float, Decimal)): + return str(obj).strip() + + if isinstance(obj, dict): + for k in ("text", "name", "label", "value", "display_name"): + if k in obj and obj.get(k) not in (None, ""): + s = _first_text(obj.get(k)) + if s: + return s + for v in obj.values(): + s = _first_text(v) + if s: + return s + return "" + + if isinstance(obj, (list, tuple)): + for it in obj: + s = _first_text(it) + if s: + return s + return "" + + return str(obj).strip() + + def _extract_date(obj) -> str: + """Find dd/mm/yyyy or yyyy-mm-dd inside nested structures or element-string.""" + if obj is None: + return "" + + if isinstance(obj, str): + s = obj + m = DATE_RE1.search(s) + if m: + return m.group(1) + m = DATE_RE2.search(s) + if m: + return m.group(1) + m = DATE_RE3.search(s) + if m: + return m.group(1) + return s.strip() + + if isinstance(obj, dict): + for k in ("text", "date", "value", "name"): + if k in obj and obj.get(k) not in (None, ""): + s = _extract_date(obj.get(k)) + m = DATE_RE1.search(s) + if m: + return m.group(1) + m = DATE_RE2.search(s) + if m: + return m.group(1) + m = DATE_RE3.search(s) + if m: + return m.group(1) + for v in obj.values(): + s = _extract_date(v) + m = DATE_RE1.search(s) + if m: + return m.group(1) + m = DATE_RE2.search(s) + if m: + return m.group(1) + m = DATE_RE3.search(s) + if m: + return m.group(1) + return "" + + if isinstance(obj, (list, tuple)): + for it in obj: + s = _extract_date(it) + m = DATE_RE1.search(s) + if m: + return m.group(1) + m = DATE_RE2.search(s) + if m: + return m.group(1) + m = DATE_RE3.search(s) + if m: + return m.group(1) + try: + blob = json.dumps(obj, ensure_ascii=False) + except Exception: + blob = str(obj) + m = DATE_RE1.search(blob) + if m: + return m.group(1) + m = DATE_RE2.search(blob) + if m: + return m.group(1) + m = DATE_RE3.search(blob) + if m: + return m.group(1) + return "" + + s = str(obj) + m = DATE_RE1.search(s) + if m: + return m.group(1) + m = DATE_RE2.search(s) + if m: + return m.group(1) + m = DATE_RE3.search(s) + if m: + return m.group(1) + return s.strip() + + # ---------------------------- + # Money parsing/formatting + # ---------------------------- + def _to_decimal(val): + if val is None: + return None + if isinstance(val, Decimal): + return val + if isinstance(val, int): + return Decimal(val) + if isinstance(val, float): + return Decimal(str(val)) + + s = str(val).strip() + if s == "" or s == "None": + return None + + cleaned = [] + for ch in s: + if ch.isdigit() or ch in ".,-": + cleaned.append(ch) + s2 = "".join(cleaned) + if s2 == "" or s2 in ("-", ",", "."): + return None + + # "40.000,00" -> 40000.00 ; "40000.00" -> 40000.00 + if ("," in s2) and ("." in s2): + s2 = s2.replace(".", "").replace(",", ".") + elif "," in s2: + s2 = s2.replace(",", ".") + + try: + return Decimal(s2) + except Exception: + return None + + def _format_es(d: Decimal, decimals: int) -> str: + s = format(d, f",.{decimals}f") + s = s.replace(",", "X").replace(".", ",").replace("X", ".") + return s + + def _format_money_no_decimals(val): + d = _to_decimal(val) + if d is None: + return "" + d = d.quantize(Decimal("1"), rounding=ROUND_HALF_UP) + s = format(d, ",.0f").replace(",", ".") + return "$" + s + + def format_money_row(val): + return _format_money_no_decimals(val) + + def format_money_total(val): + return _format_money_no_decimals(val) + + def format_money_var_2dec(val): + d = _to_decimal(val) + if d is None: + return "" + return _format_es(d, 2) + + # ---------------------------- + # Date parse/normalize + # ---------------------------- + def _to_date_any(val): + if val is None: + return None + if isinstance(val, dt.datetime): + return val.date() + if isinstance(val, dt.date): + return val + + s = str(val).strip() + if s == "": + return None + + m = re.match(r"^(\d{1,2})/(\d{1,2})/(\d{4})$", s) + if m: + try: + dd = int(m.group(1)); mm = int(m.group(2)); yy = int(m.group(3)) + return dt.date(yy, mm, dd) + except Exception: + pass + + m2 = re.match(r"^(\d{4})-(\d{2})-(\d{2})$", s[:10]) + if m2: + try: + return dt.date(int(m2.group(1)), int(m2.group(2)), int(m2.group(3))) + except Exception: + pass + + m3 = re.match(r"^(\d{4})(\d{2})(\d{2})$", s) + if m3: + try: + return dt.date(int(m3.group(1)), int(m3.group(2)), int(m3.group(3))) + except Exception: + pass + + s2 = s.replace("T", " ").replace("Z", "") + try: + return dt.datetime.fromisoformat(s2).date() + except Exception: + return None + + def _fmt_ddmmyyyy(d, fallback: str) -> str: + if d is None: + return (fallback or "").strip() + try: + return d.strftime("%d/%m/%Y") + except Exception: + return (fallback or "").strip() + + # ---------------------------- + # Concept classification / normalization + # ---------------------------- + def _ascii_norm(s): + if s is None: + return "" + s = unicodedata.normalize("NFKD", str(s)) + s = "".join(ch for ch in s if unicodedata.category(ch) != "Mn") + return s.lower().strip() + + def _is_refuerzo(type_text: str, name_text: str) -> bool: + t = _ascii_norm(type_text) + n = _ascii_norm(name_text) + return ("refuerzo" in t) or ("refuerzo" in n) or ("reforz" in t) or ("reforz" in n) + + def _is_sena(type_text: str, name_text: str) -> bool: + t = _ascii_norm(type_text) + n = _ascii_norm(name_text) + return ("sena" in t) or ("sena" in n) or ("seña" in t) or ("seña" in n) + + def _is_entrega_inicial(type_text: str, name_text: str) -> bool: + t = _ascii_norm(type_text) + n = _ascii_norm(name_text) + return ("entrega inicial" in t) or ("entrega inicial" in n) + + def _is_pre(type_text: str) -> bool: + t = _ascii_norm(type_text) + return ("pre entrega" in t) or ("preentrega" in t) or ("pre-entrega" in t) + + def _is_post(type_text: str) -> bool: + t = _ascii_norm(type_text) + return ("financi" in t) or ("post entrega" in t) or ("postentrega" in t) or ("post-entrega" in t) + + QUOTA_RE = re.compile(r"(?i)\bcuota\s*:\s*(\d+)") + QUOTA_RE2 = re.compile(r"(?i)\bcuota\s*n[°o]\s*(\d+)") + + def _concept_from_texts(type_text: str, name_text: str) -> str: + base = (name_text or type_text or "").strip() + if not base: + return "" + m = QUOTA_RE.search(base) or QUOTA_RE2.search(base) + if m: + return f"CUOTA N° {m.group(1)}" + return base.upper() + + # ============================================================ + # Google Auth / Drive / Docs helpers (imports inside) + # ============================================================ + SCOPES = {scopes_api_google} + + def _load_json(path): + with open(path, "r", encoding="utf-8") as f: + return json.load(f) + + def get_services(credentials_json_path, token_json_path): + from googleapiclient.discovery import build + from google.oauth2 import service_account + from google.oauth2.credentials import Credentials + from google_auth_oauthlib.flow import InstalledAppFlow + from google.auth.transport.requests import Request + + info = _load_json(credentials_json_path) + + if isinstance(info, dict) and info.get("type") == "service_account": + creds = service_account.Credentials.from_service_account_file(credentials_json_path, scopes=SCOPES) + docs = build("docs", "v1", credentials=creds, cache_discovery=False) + drive = build("drive", "v3", credentials=creds, cache_discovery=False) + return docs, drive, "service_account" + + creds = None + if os.path.exists(token_json_path): + creds = Credentials.from_authorized_user_file(token_json_path, SCOPES) + + if (not creds) or (not creds.valid): + if creds and creds.expired and creds.refresh_token: + creds.refresh(Request()) + else: + flow = InstalledAppFlow.from_client_secrets_file(credentials_json_path, SCOPES) + try: + creds = flow.run_local_server(port=0) + except Exception: + creds = flow.run_console() + os.makedirs(os.path.dirname(token_json_path), exist_ok=True) + with open(token_json_path, "w", encoding="utf-8") as f: + f.write(creds.to_json()) + + docs = build("docs", "v1", credentials=creds, cache_discovery=False) + drive = build("drive", "v3", credentials=creds, cache_discovery=False) + return docs, drive, "oauth" + + def extract_doc_id_from_url(url): + m = re.search(r"/document/d/([a-zA-Z0-9_-]+)", url or "") + return m.group(1) if m else "" + + def ensure_docs_api_compatible(drive_service, file_id: str): + meta = drive_service.files().get( + fileId=file_id, + fields="id,name,mimeType,shortcutDetails", + supportsAllDrives=True + ).execute() + + if meta.get("mimeType") == "application/vnd.google-apps.shortcut": + target = (meta.get("shortcutDetails") or {}).get("targetId", "") + if not target: + raise RuntimeError("Es shortcut pero no trae targetId.") + file_id = target + meta = drive_service.files().get( + fileId=file_id, + fields="id,name,mimeType", + supportsAllDrives=True + ).execute() + + if meta.get("mimeType") == "application/vnd.google-apps.document": + return file_id, meta, False + + new_name = (meta.get("name") or "Documento") + " (Google Docs)" + converted = drive_service.files().copy( + fileId=file_id, + body={"name": new_name, "mimeType": "application/vnd.google-apps.document"}, + fields="id,name,mimeType", + supportsAllDrives=True + ).execute() + return converted["id"], converted, True + + def docs_get(docs_service, doc_id: str): + try: + return docs_service.documents().get(documentId=doc_id, includeTabsContent=True).execute() + except TypeError: + return docs_service.documents().get(documentId=doc_id).execute() + + # ============================================================ + # Marker search (tabs-aware) + # ============================================================ + def _walk_tabs(tabs): + if not tabs: + return + for t in tabs: + yield t + for x in _walk_tabs(t.get("childTabs") or []): + yield x + + def _list_scopes(doc): + scopes = [("main", None)] + tabs = doc.get("tabs") or [] + for t in _walk_tabs(tabs): + tid = (t.get("tabProperties") or {}).get("tabId") + if tid is None: + continue + scopes.append(("tab", str(tid))) + return scopes + + def _get_body_content_for_scope(doc, scope_kind, scope_id): + if scope_kind == "main": + return (doc.get("body") or {}).get("content", []) or [] + tabs = doc.get("tabs") or [] + for t in _walk_tabs(tabs): + tid = (t.get("tabProperties") or {}).get("tabId") + if tid is None: + continue + if str(tid) == str(scope_id): + dtab = t.get("documentTab") or {} + return (dtab.get("body") or {}).get("content", []) or [] + return [] + + def _norm_alnum(s: str) -> str: + if s is None: + return "" + s = unicodedata.normalize("NFKD", str(s)).lower() + out = [] + for ch in s: + if unicodedata.category(ch) == "Mn": + continue + if ch.isalnum(): + out.append(ch) + return "".join(out) + + def _iter_text_chars(content_list): + def walk(lst): + for el in lst or []: + if not isinstance(el, dict): + continue + p = el.get("paragraph") + if p: + base = el.get("startIndex") or 0 + cursor = int(base) + for pe in (p.get("elements") or []): + tr = pe.get("textRun") + if not tr: + en = pe.get("endIndex") + if en is not None: + cursor = int(en) + continue + txt = tr.get("content", "") or "" + st = pe.get("startIndex") or cursor + st = int(st) + for i, ch in enumerate(txt): + if ch in ("\u200b", "\ufeff"): + continue + yield (st + i, ch) + en = pe.get("endIndex") + cursor = int(en) if en is not None else st + len(txt) + continue + t = el.get("table") + if t: + for row in (t.get("tableRows") or []): + for cell in (row.get("tableCells") or []): + yield from walk(cell.get("content", [])) + continue + toc = el.get("tableOfContents") + if toc: + yield from walk(toc.get("content", [])) + continue + yield from walk(content_list) + + def find_marker(doc, marker_raw: str): + mr = (marker_raw or "").strip() + candidates = [] + for m in [mr, mr.strip("_"), "TABLA_PROFORMA", "__TABLA_PROFORMA__", "TABLA PROFORMA", "{TABLA_PROFORMA}", "{" + mr + "}"]: + if m and m not in candidates: + candidates.append(m) + + for scope_kind, scope_id in _list_scopes(doc): + content = _get_body_content_for_scope(doc, scope_kind, scope_id) + raw_chars, raw_map = [], [] + for doc_i, ch in _iter_text_chars(content): + raw_chars.append(ch) + raw_map.append(int(doc_i)) + raw_text = "".join(raw_chars) + + for m in candidates: + pos = raw_text.find(m) + if pos != -1: + sdoc = raw_map[pos] + edoc = raw_map[pos + len(m) - 1] + 1 + return {"hit": {"scope_kind": scope_kind, "tabId": scope_id, "start": sdoc, "end": edoc, "used": m}, "candidates": candidates} + pos2 = raw_text.lower().find(m.lower()) + if pos2 != -1: + sdoc = raw_map[pos2] + edoc = raw_map[pos2 + len(m) - 1] + 1 + return {"hit": {"scope_kind": scope_kind, "tabId": scope_id, "start": sdoc, "end": edoc, "used": m}, "candidates": candidates} + + alnum, alnum_map = [], [] + for i, ch in enumerate(raw_chars): + if ch.isalnum(): + alnum.append(ch.lower()) + alnum_map.append(raw_map[i]) + alnum_text = "".join(alnum) + + for m in candidates: + target = _norm_alnum(m) + if not target: + continue + pos = alnum_text.find(target) + if pos != -1: + sdoc = alnum_map[pos] + edoc = alnum_map[pos + len(target) - 1] + 1 + return {"hit": {"scope_kind": scope_kind, "tabId": scope_id, "start": sdoc, "end": edoc, "used": m}, "candidates": candidates} + + return {"hit": None, "candidates": candidates} + + # ============================================================ + # Docs batchUpdate helpers + # ============================================================ + def _loc(index, tab_id): + d = {"index": int(index)} + if tab_id is not None: + d["tabId"] = str(tab_id) + return d + + def _range(start, end, tab_id): + d = {"startIndex": int(start), "endIndex": int(end)} + if tab_id is not None: + d["tabId"] = str(tab_id) + return d + + def _rgb(r, g, b): + return {"color": {"rgbColor": {"red": r, "green": g, "blue": b}}} + + def _merge_req(table_start, tab_id, row, col, row_span, col_span): + loc = {"tableStartLocation": _loc(table_start, tab_id), "rowIndex": row, "columnIndex": col} + tr = {"tableCellLocation": loc, "rowSpan": row_span, "columnSpan": col_span} + return {"mergeTableCells": {"tableRange": tr}} + + def _cell_bg_req(table_start, tab_id, row, col, row_span, col_span, rgb): + loc = {"tableStartLocation": _loc(table_start, tab_id), "rowIndex": row, "columnIndex": col} + tr = {"tableCellLocation": loc, "rowSpan": row_span, "columnSpan": col_span} + style = {"backgroundColor": rgb} + return {"updateTableCellStyle": {"tableRange": tr, "tableCellStyle": style, "fields": "backgroundColor"}} + + + def _cell_borders_req(table_start, tab_id, row, col, row_span, col_span, *, + top=None, bottom=None, left=None, right=None): + """Update borders for a table cell range. Pass border dicts or None (keeps existing).""" + loc = {"tableStartLocation": _loc(table_start, tab_id), "rowIndex": row, "columnIndex": col} + tr = {"tableCellLocation": loc, "rowSpan": row_span, "columnSpan": col_span} + style = {} + fields = [] + + if top is not None: + style["borderTop"] = top + fields.append("borderTop") + if bottom is not None: + style["borderBottom"] = bottom + fields.append("borderBottom") + if left is not None: + style["borderLeft"] = left + fields.append("borderLeft") + if right is not None: + style["borderRight"] = right + fields.append("borderRight") + + if not fields: + return None + + return {"updateTableCellStyle": {"tableRange": tr, "tableCellStyle": style, "fields": ",".join(fields)}} + + def _border_none(): + # Border "invisible": width 0 + return {"width": {"magnitude": 0, "unit": "PT"}, "dashStyle": "SOLID", "color": {"color": {"rgbColor": {"red": 1, "green": 1, "blue": 1}}}} + + def _batch_update(docs_service, doc_id, reqs, chunk=350): + for i in range(0, len(reqs), chunk): + docs_service.documents().batchUpdate( + documentId=doc_id, + body={"requests": reqs[i:i + chunk]} + ).execute() + + def get_first_table_after_index(doc, scope_kind, tab_id, index_hint): + body = _get_body_content_for_scope(doc, scope_kind, tab_id) + best_table = None + best_start = None + for el in body: + table = el.get("table") + if not table: + continue + st = el.get("startIndex", 0) + if st >= index_hint: + if best_start is None or st < best_start: + best_start = st + best_table = table + return (best_table, best_start) + + def get_cell_insert_index(table, r, c): + cell = table["tableRows"][r]["tableCells"][c] + content = cell.get("content", []) + for el in content: + if el.get("paragraph") is not None: + en = el.get("endIndex") + if en is not None: + return int(en) - 1 + for el in content: + en = el.get("endIndex") + if en is not None: + return int(en) - 1 + raise RuntimeError(f"No pude calcular indice de insercion para celda ({r},{c}).") + + + def get_cell_text_range(table, r, c): + """Return (min_start, max_end) indices for the given cell content.""" + cell = table["tableRows"][r]["tableCells"][c] + mins = None + maxe = None + for el in (cell.get("content") or []): + st = el.get("startIndex") + en = el.get("endIndex") + if st is not None: + mins = int(st) if mins is None else min(mins, int(st)) + if en is not None: + maxe = int(en) if maxe is None else max(maxe, int(en)) + if mins is None or maxe is None: + # fallback: use insert index + idx = get_cell_insert_index(table, r, c) + return idx, idx + 1 + return mins, maxe + + # ============================================================ + # Build rows + # ============================================================ + def _base_info_rows(data): + rows = [] + def add_info(label, value): + rows.append([label, str(value) if value is not None else "", "", ""]) + add_info("EDIFICIO:", data.get("proyecto", "")) + add_info("DPTO.:", data.get("producto_odoo", "")) + return rows + + def build_rows_from_arrays(data): + type_array = _as_list_any(data.get("type_array")) + name_array = _as_list_any(data.get("name_array")) + date_array = _as_list_any(data.get("date_array")) + amount_array = _as_list_any(data.get("amount_array")) + + lens = [len(type_array), len(name_array), len(date_array), len(amount_array)] + total_len = min(lens) if lens else 0 + _sv("dbg_total_len", str(total_len)) + + entries = [] + for i in range(total_len): + ttxt = _first_text(type_array[i]) + ntxt = _first_text(name_array[i]) + dtext = _extract_date(date_array[i]) + atext = _first_text(amount_array[i]) + amt = _to_decimal(atext) + entries.append({"i": i, "type_text": ttxt, "name_text": ntxt, "date_text": dtext, "amount_dec": amt}) + + sena_item = next((e for e in entries if _is_sena(e["type_text"], e["name_text"])), None) + entrega_item = next((e for e in entries if _is_entrega_inicial(e["type_text"], e["name_text"])), None) + + remaining = [] + for e in entries: + if sena_item is not None and e["i"] == sena_item["i"]: + continue + if entrega_item is not None and e["i"] == entrega_item["i"]: + continue + remaining.append(e) + + rows = _base_info_rows(data) + header_row_index = len(rows) + rows.append(["COSTO TOTAL", "MONTO", "FECHA DE PAGO", "VENC"]) + + cost_total_row_idx = len(rows) + rows.append(["COSTO TOTAL", "", "", ""]) + + title_rows = [] + refuerzo_rows = [] + merge_fecha_venc_rows = [] + + today = dt.date.today() + total_pagado = Decimal("0") + saldo_a_pagar = Decimal("0") + + pre_quota_rows = [] # [(row_idx, amt_dec), ...] cuotas PRE (type Pre entrega, no refuerzo) + + def add_payment_row(concepto, amt_dec, date_text, mark_refuerzo=False, force_paid=False): + nonlocal total_pagado, saldo_a_pagar + d_obj = _to_date_any(date_text) + shown = _fmt_ddmmyyyy(d_obj, date_text) + + fecha_pago = "" + venc = "" + if d_obj is not None: + if d_obj <= today: + fecha_pago = shown + else: + venc = shown + else: + venc = shown if shown else "SIN FECHA" + + monto_fmt = format_money_row(amt_dec) if (amt_dec is not None and amt_dec > 0) else "" + + if amt_dec is not None and amt_dec > 0: + if force_paid: + total_pagado += amt_dec + elif fecha_pago: + total_pagado += amt_dec + else: + saldo_a_pagar += amt_dec + + rows.append([concepto, monto_fmt, fecha_pago, venc]) + row_idx = len(rows) - 1 + if mark_refuerzo: + refuerzo_rows.append(row_idx) + return row_idx + + # Orden fijo: SEÑA -> ENTREGA INICIAL + if sena_item is not None: + add_payment_row("SEÑA", sena_item["amount_dec"], sena_item["date_text"], False, force_paid=True) + + if entrega_item is not None: + add_payment_row("ENTREGA INICIAL", entrega_item["amount_dec"], entrega_item["date_text"], False, force_paid=True) + + pre_title_added = False + post_title_added = False + ref_no = 1 + + for e in remaining: + ttxt = e["type_text"] + ntxt = e["name_text"] + dtext = e["date_text"] + amt_dec = e["amount_dec"] + + is_ref = _is_refuerzo(ttxt, ntxt) + is_post = _is_post(ttxt) + group = "post" if is_post else "pre" + + if group == "pre" and not pre_title_added: + title_rows.append(len(rows)) + rows.append(["CUOTAS PRE ENTREGA", "", "", ""]) + pre_title_added = True + + if group == "post" and not post_title_added: + title_rows.append(len(rows)) + rows.append(["CUOTAS POST ENTREGA", "", "", ""]) + post_title_added = True + + if is_ref: + add_payment_row(f"{ref_no}° REFUERZO", amt_dec, dtext, True) + ref_no += 1 + else: + concepto = _concept_from_texts(ttxt, ntxt) + row_idx = add_payment_row(concepto, amt_dec, dtext, False) + if _is_pre(ttxt): + pre_quota_rows.append((row_idx, amt_dec)) + + # ---- Regla DURANTE OBRA ---- + plan = _ascii_norm(data.get("plan_de_pagos", "")) + is_durante_obra = (plan.replace(" ", "") == "duranteobra") + + base80 = _to_decimal(data.get("valor_de_compra")) + base100 = _to_decimal(data.get("valor_compra")) + + # fallback: si no hay valor_compra, usar valor_total_compra calculado + # (pero para la regla, si no hay base80 tampoco, no se aplica) + if is_durante_obra and len(pre_quota_rows) >= 2 and base80 is not None: + sum_pre_except_last = Decimal("0") + for (_, amt) in pre_quota_rows[:-1]: + if amt is not None and amt > 0: + sum_pre_except_last += amt + + last_row_idx, _ = pre_quota_rows[-1] + thr80 = base80 * Decimal("0.8") + + if base100 is None: + # provisional: usar el total calculado más abajo + pass + else: + if (sum_pre_except_last >= thr80) and (sum_pre_except_last < base100): + rows[last_row_idx][2] = "48HS ANTES DE LA ENTREGA" + rows[last_row_idx][3] = "" + merge_fecha_venc_rows.append(last_row_idx) + + _sv("dbg_sum_pre_except_last", str(sum_pre_except_last)) + _sv("dbg_thr80", str(thr80)) + _sv("dbg_valor_de_compra_used", str(base80)) + _sv("dbg_valor_compra_used", str(base100) if base100 is not None else "") + + # ---- Totales ---- + valor_total_compra = total_pagado + saldo_a_pagar + + # si base100 faltaba, re-evaluar la condición usando valor_total_compra como 100% + if is_durante_obra and len(pre_quota_rows) >= 2 and base80 is not None and base100 is None: + base100 = valor_total_compra + sum_pre_except_last = _to_decimal(_gvs("dbg_sum_pre_except_last", "0")) or Decimal("0") + thr80 = base80 * Decimal("0.8") + last_row_idx, _ = pre_quota_rows[-1] + if (sum_pre_except_last >= thr80) and (sum_pre_except_last < base100): + rows[last_row_idx][2] = "48HS ANTES DE LA ENTREGA" + rows[last_row_idx][3] = "" + if last_row_idx not in merge_fecha_venc_rows: + merge_fecha_venc_rows.append(last_row_idx) + _sv("dbg_valor_compra_used", str(base100)) + + rows[cost_total_row_idx][1] = format_money_total(valor_total_compra) + + total_pagado_row_idx = len(rows) + rows.append(["TOTAL PAGADO", format_money_total(total_pagado), "", ""]) + saldo_row_idx = len(rows) + rows.append(["SALDO A PAGAR", format_money_total(saldo_a_pagar), "", ""]) + + # Filas debajo de la tabla (sin bordes): + # 1) fila en blanco (separación) + # 2) fila con línea de firma (guiones bajos) + # 3) fila con texto COMPRADOR + blank_row_idx = len(rows) + rows.append(["", "", "", ""]) + line_row_idx = len(rows) + rows.append(["_______________________________________________", "", "", ""]) + buyer_row_idx = len(rows) + rows.append(["COMPRADOR", "", "", ""]) + + # Vars Rocketbot (sin $) + _sv("total_pagado", format_money_var_2dec(total_pagado)) + _sv("saldo_a_pagar", format_money_var_2dec(saldo_a_pagar)) + _sv("valor_total_compra_financiado", format_money_var_2dec(valor_total_compra)) + + meta = { + "header_row_index": header_row_index, + "cost_total_row": cost_total_row_idx, + "title_rows": title_rows, + "refuerzo_rows": refuerzo_rows, + "merge_fecha_venc_rows": merge_fecha_venc_rows, + "total_pagado_row": total_pagado_row_idx, + "saldo_row": saldo_row_idx, + "blank_row": blank_row_idx, + "line_row": line_row_idx, + "signer_row": buyer_row_idx, + } + return rows, meta + + # ============================================================ + # Insert table + # ============================================================ + def insert_table_at_marker(docs_service, doc_id, marker, rows, meta): + doc = docs_get(docs_service, doc_id) + + found = find_marker(doc, marker) + hit = (found or {}).get("hit") + if not hit: + _sv("gdoc_status", "ERROR") + _sv("gdoc_error", "No encontre el marcador: " + str(marker)) + raise RuntimeError("No encontre el marcador en el documento: " + str(marker)) + + scope_kind = hit.get("scope_kind", "main") + tab_id = hit.get("tabId") + start = int(hit["start"]) + end = int(hit["end"]) + + reqs = [] + reqs.append({"deleteContentRange": {"range": _range(start, end, tab_id)}}) + + # Layout base (4 columnas) + COL_WIDTHS_PT = { + 0: 150, # CONCEPTO + 1: 85, # MONTO + 2: 95, # FECHA DE PAGO + 3: 95, # VENC + } + ROW_MIN_HEIGHT_PT = 11 + CELL_PADDING_LR_PT = 1.5 + CELL_PADDING_TB_PT = 0.5 + + def _dim_pt(dim, default=0.0): + try: + if not dim: + return float(default) + mag = dim.get("magnitude", default) + return float(mag) + except Exception: + return float(default) + + # --- Center table on page (best-effort) --- + # Google Docs UI supports table alignment/indent, but the Docs API doesn't expose a direct + # 'tableAlignment' field. A common workaround is to set the paragraph indent at the insertion point. + # (See: community/StackOverflow discussions about table alignment via updateParagraphStyle.) + try: + doc_style = (doc.get("documentStyle") or {}) + page_w = _dim_pt(((doc_style.get("pageSize") or {}).get("width")), 612.0) + m_left = _dim_pt(doc_style.get("marginLeft"), 72.0) + m_right = _dim_pt(doc_style.get("marginRight"), 72.0) + content_w = max(0.0, page_w - m_left - m_right) + + # Table width in points = sum of our explicit column widths (already set later). + # If column widths are not set for all columns, fall back to 0 (no indent). + table_w = 0.0 + try: + table_w = float(sum(COL_WIDTHS_PT.values())) + except Exception: + table_w = 0.0 + + indent = (content_w - table_w) / 2.0 if content_w > 0 and table_w > 0 else 0.0 + if indent < 0: + indent = 0.0 + + # Apply to the paragraph where the marker lives (range start..start+1 is enough to overlap it). + reqs.append({ + "updateParagraphStyle": { + "range": _range(start, start + 1, tab_id), + "paragraphStyle": { + "indentStart": {"unit": "PT", "magnitude": indent}, + "alignment": "START" + }, + "fields": "indentStart,alignment" + } + }) + _sv("dbg_table_center_indent_pt", str(indent)) + except Exception as _e: + _sv("dbg_table_center_indent_pt", "0") + + reqs.append({"insertTable": {"rows": len(rows), "columns": 4, "location": _loc(start, tab_id)}}) + + _batch_update(docs_service, doc_id, reqs) + + doc2 = docs_get(docs_service, doc_id) + table, table_start = get_first_table_after_index(doc2, scope_kind, tab_id, start) + if not table: + raise RuntimeError("Inserte la tabla pero no pude ubicarla al releer el doc.") + + # Fill text: high->low indices + fill_items = [] + for r in range(len(rows)): + for c in range(4): + txt = str(rows[r][c] or "") + if txt.strip() == "": + continue + idx = get_cell_insert_index(table, r, c) + fill_items.append((idx, {"insertText": {"location": _loc(idx, tab_id), "text": txt}})) + if fill_items: + fill_items.sort(key=lambda x: x[0], reverse=True) + _batch_update(docs_service, doc_id, [req for _, req in fill_items]) + + # Releer para poder alinear/bordes en filas especiales (spacer + COMPRADOR) + doc3 = docs_get(docs_service, doc_id) + table3, _ = get_first_table_after_index(doc3, scope_kind, tab_id, start) + if table3: + try: + blank_row = int(meta.get("blank_row")) + line_row = int(meta.get("line_row")) + sg_row = int(meta.get("signer_row")) + + # merge blank + line + buyer across all 4 columns + style_extra = [] + + # Línea negra debajo de SALDO A PAGAR (antes de zona sin líneas) + try: + saldo_row = int(meta.get("saldo_row")) + black = {"width": {"magnitude": 1.0, "unit": "PT"}, + "dashStyle": "SOLID", + "color": {"color": {"rgbColor": {"red": 0, "green": 0, "blue": 0}}}} + br = _cell_borders_req(table_start, tab_id, saldo_row, 0, 1, 4, bottom=black) + if br: + style_extra.append(br) + except Exception: + pass + + # Alineación: + # - Header (títulos) en col 2 y 3: IZQUIERDA (como el resto) + # - Filas de datos en col 2 y 3: CENTRADO + try: + hdr_row = int(meta.get("header_row_index")) + last_row = int(meta.get("saldo_row")) # hasta SALDO (inclusive) + + # Header: left + for cc in (2, 3): + st, en = get_cell_text_range(table3, hdr_row, cc) + if st is not None and en is not None and int(en) > int(st): + style_extra.append({ + "updateParagraphStyle": { + "range": _range(st, en, tab_id), + "paragraphStyle": {"alignment": "START"}, + "fields": "alignment", + } + }) + + # Data rows: center (desde hdr+1) + for rr in range(hdr_row + 1, last_row + 1): + for cc in (2, 3): + st, en = get_cell_text_range(table3, rr, cc) + if st is None or en is None or int(en) <= int(st): + continue + style_extra.append({ + "updateParagraphStyle": { + "range": _range(st, en, tab_id), + "paragraphStyle": {"alignment": "CENTER"}, + "fields": "alignment", + } + }) + except Exception: + pass + + style_extra.append(_merge_req(table_start, tab_id, blank_row, 0, 1, 4)) + style_extra.append(_merge_req(table_start, tab_id, line_row, 0, 1, 4)) + style_extra.append(_merge_req(table_start, tab_id, sg_row, 0, 1, 4)) + + # remove borders for blank/line/buyer rows (to look like separate text) + nb = _border_none() + # Mantener la linea negra debajo de SALDO A PAGAR: + # si ponemos top=none en la fila en blanco, Google Docs puede ocultar el borde inferior + # de la fila anterior (SALDO). Por eso dejamos top en NEGRO y el resto sin bordes. + black_top = {"width": {"magnitude": 1.0, "unit": "PT"}, + "dashStyle": "SOLID", + "color": {"color": {"rgbColor": {"red": 0, "green": 0, "blue": 0}}}} + br1 = _cell_borders_req(table_start, tab_id, blank_row, 0, 1, 4, top=black_top, bottom=nb, left=nb, right=nb) + br2 = _cell_borders_req(table_start, tab_id, line_row, 0, 1, 4, top=nb, bottom=nb, left=nb, right=nb) + br3 = _cell_borders_req(table_start, tab_id, sg_row, 0, 1, 4, top=nb, bottom=nb, left=nb, right=nb) + for br in (br1, br2, br3): + if br: + style_extra.append(br) + + # center-align text in LINE row and COMPRADOR row; make COMPRADOR bold + l_st, l_en = get_cell_text_range(table3, line_row, 0) + style_extra.append({ + "updateParagraphStyle": { + "range": _range(l_st, l_en, tab_id), + "paragraphStyle": {"alignment": "CENTER"}, + "fields": "alignment", + } + }) + + s_st, s_en = get_cell_text_range(table3, sg_row, 0) + style_extra.append({ + "updateParagraphStyle": { + "range": _range(s_st, s_en, tab_id), + "paragraphStyle": {"alignment": "CENTER"}, + "fields": "alignment", + } + }) + style_extra.append({ + "updateTextStyle": { + "range": _range(s_st, s_en, tab_id), + "textStyle": {"bold": True}, + "fields": "bold", + } + }) + + _batch_update(docs_service, doc_id, style_extra) + except Exception as _e: + # debug no bloqueante + _sv("dbg_signer_style_error", str(_e)) + + # Styles + GREEN = _rgb(0.82, 0.91, 0.78) + YELLOW = _rgb(1.0, 0.95, 0.45) + GRAY = _rgb(0.90, 0.90, 0.90) + + style = [] + # merge info rows col 1..3 + for rr in range(0, 2): + style.append(_merge_req(table_start, tab_id, rr, 1, 1, 3)) + # EDIFICIO + DPTO green (rows 0,1) + style.append(_cell_bg_req(table_start, tab_id, 0, 0, 1, 4, GREEN)) + style.append(_cell_bg_req(table_start, tab_id, 1, 0, 1, 4, GREEN)) + + hdr = meta["header_row_index"] + style.append(_cell_bg_req(table_start, tab_id, hdr, 0, 1, 4, GRAY)) + + style.append(_cell_bg_req(table_start, tab_id, meta["cost_total_row"], 0, 1, 4, YELLOW)) + + for tr in meta.get("title_rows", []): + style.append(_cell_bg_req(table_start, tab_id, tr, 0, 1, 4, YELLOW)) + style.append(_merge_req(table_start, tab_id, tr, 1, 1, 3)) + + for rr in meta.get("refuerzo_rows", []): + style.append(_cell_bg_req(table_start, tab_id, rr, 0, 1, 4, YELLOW)) + + for rr in meta.get("merge_fecha_venc_rows", []): + style.append(_merge_req(table_start, tab_id, rr, 2, 1, 2)) + + total_pagado_row = meta.get("total_pagado_row") + saldo_row = meta.get("saldo_row") + style.append(_cell_bg_req(table_start, tab_id, total_pagado_row, 0, 1, 4, YELLOW)) + style.append(_cell_bg_req(table_start, tab_id, saldo_row, 0, 1, 4, YELLOW)) + style.append(_merge_req(table_start, tab_id, total_pagado_row, 2, 1, 2)) + style.append(_merge_req(table_start, tab_id, saldo_row, 2, 1, 2)) + + + # ------------------------------------------------------------ + # Layout tweaks (column widths + thinner rows) + # ------------------------------------------------------------ + # Nota: Google Docs usa puntos (PT). Ajustá estos números si querés afinar más. + # Se reutilizan las medidas definidas al inicio de la inserción + + try: + _sv("dbg_col_widths_pt", json.dumps(COL_WIDTHS_PT)) + _sv("dbg_row_min_height_pt", str(ROW_MIN_HEIGHT_PT)) + except Exception: + pass + + # Anchos de columnas (todas las columnas, para evitar cortes y saltos de línea) + for col_idx, w in COL_WIDTHS_PT.items(): + style.append({ + "updateTableColumnProperties": { + "tableStartLocation": _loc(table_start, tab_id), + "columnIndices": [int(col_idx)], + "tableColumnProperties": { + "widthType": "FIXED_WIDTH", + "width": {"magnitude": float(w), "unit": "PT"}, + }, + # IMPORTANTE: incluir widthType en fields (si no, la API puede fallar) + "fields": "widthType,width", + } + }) + + # Filas más finas (aplica a todas las filas con rowIndices: []) + style.append({ + "updateTableRowStyle": { + "tableStartLocation": _loc(table_start, tab_id), + "rowIndices": list(range(len(rows))), + "tableRowStyle": { + "minRowHeight": {"magnitude": float(ROW_MIN_HEIGHT_PT), "unit": "PT"} + }, + "fields": "minRowHeight", + } + }) + + # Header un poco más alto para evitar cortes si llegara a envolver + try: + hdr_i = int(meta.get("header_row_index")) + style.append({ + "updateTableRowStyle": { + "tableStartLocation": _loc(table_start, tab_id), + "rowIndices": [hdr_i], + "tableRowStyle": {"minRowHeight": {"magnitude": 16.0, "unit": "PT"}}, + "fields": "minRowHeight", + } + }) + except Exception: + pass + + # Menos padding en las celdas (reduce altura percibida) + style.append({ + "updateTableCellStyle": { + "tableRange": { + "tableCellLocation": { + "tableStartLocation": _loc(table_start, tab_id), + "rowIndex": 0, + "columnIndex": 0, + }, + "rowSpan": int(len(rows)), + "columnSpan": 4, + }, + "tableCellStyle": { + "paddingLeft": {"magnitude": float(CELL_PADDING_LR_PT), "unit": "PT"}, + "paddingRight": {"magnitude": float(CELL_PADDING_LR_PT), "unit": "PT"}, + "paddingTop": {"magnitude": float(CELL_PADDING_TB_PT), "unit": "PT"}, + "paddingBottom": {"magnitude": float(CELL_PADDING_TB_PT), "unit": "PT"}, + }, + "fields": "paddingLeft,paddingRight,paddingTop,paddingBottom", + } + }) + + if style: + _batch_update(docs_service, doc_id, style) + + _sv("gdoc_status", "OK") + _sv("gdoc_error", "") + + # ============================================================ + # MAIN + # ============================================================ + marker = _gvs("gdoc_marker", "TABLA_PROFORMA") + + cred_path = _gvs("gdoc_sa_json", "") + if cred_path == "": + raise RuntimeError("Falta gdoc_sa_json") + if not os.path.isabs(cred_path): + cred_path = os.path.join(base_dir, cred_path) + if not os.path.exists(cred_path): + raise RuntimeError("No existe credentials.json: " + cred_path) + + token_path = _gvs("gdoc_token_json", "") + if token_path == "": + token_path = os.path.join(base_dir, "credentials", "token_gdocs_drive.json") + if not os.path.isabs(token_path): + token_path = os.path.join(base_dir, token_path) + + docs_service, drive_service, _ = get_services(cred_path, token_path) + + url = _gvs("current_url", "") + if url == "": + raise RuntimeError("Falta current_url") + doc_id = extract_doc_id_from_url(url) + if doc_id == "": + raise RuntimeError("No pude extraer documentId de la URL: " + url) + doc_id_final, _, _ = ensure_docs_api_compatible(drive_service, doc_id) + + data = { + "nombre_comprador": _gvs("nombre_comprador", ""), + "numero_documento": _gvs("numero_documento", ""), + "domicilio": _gvs("domicilio", ""), + "telefono": _gvs("telefono", ""), + "email": _gvs("email", ""), + "proyecto": _gvs("proyecto", ""), + "producto_odoo": _gvs("producto_odoo", ""), + + "plan_de_pagos": _gvs("plan_de_pagos", ""), + "valor_de_compra": _gvs("valor_de_compra", ""), + "valor_compra": _gvs("valor_compra", ""), + + "type_array": _gv("type_array", None), + "name_array": _gv("name_array", None), + "date_array": _gv("date_array", None), + "amount_array": _gv("amount_array", None), + } + + rows, meta = build_rows_from_arrays(data) + insert_table_at_marker(docs_service, doc_id_final, marker, rows, meta) + + +# Entrypoint +try: + run() +except Exception as e: + try: + SetVar("gdoc_status", "ERROR") + SetVar("gdoc_error", str(e)) + except Exception: + pass + raise \ No newline at end of file diff --git a/script_unificado_firma_digital.py b/script_unificado_firma_digital.py new file mode 100644 index 0000000..a3ae38c --- /dev/null +++ b/script_unificado_firma_digital.py @@ -0,0 +1,393 @@ +from __future__ import annotations + +import warnings +warnings.simplefilter("ignore", ResourceWarning) + +import os +import sys +import json +import re + +# -------------------- +# Boot libs (Rocketbot portable) +# -------------------- +base_dir = os.path.dirname(sys.executable) +libs_dir = os.path.join(base_dir, "py_libs", "py310") +sys.path.insert(0, libs_dir) + +for k in list(sys.modules.keys()): + if k == "pyparsing" or k.startswith("pyparsing."): + del sys.modules[k] + +from googleapiclient.discovery import build +from google.oauth2 import service_account +from google.oauth2.credentials import Credentials +from google_auth_oauthlib.flow import InstalledAppFlow +from google.auth.transport.requests import Request as GRequest + + +# ----------------------------------------------------------- +# GOOGLE AUTH +# ----------------------------------------------------------- +SCOPES = {scopes_api_google} + + +def _load_json(path): + with open(path, "r", encoding="utf-8") as f: + return json.load(f) + + +def get_services(credentials_json_path, token_json_path): + info = _load_json(credentials_json_path) + + if isinstance(info, dict) and info.get("type") == "service_account": + creds = service_account.Credentials.from_service_account_file( + credentials_json_path, + scopes=SCOPES, + ) + docs = build("docs", "v1", credentials=creds, cache_discovery=False) + drive = build("drive", "v3", credentials=creds, cache_discovery=False) + return docs, drive, "service_account" + + creds = None + if os.path.exists(token_json_path): + creds = Credentials.from_authorized_user_file(token_json_path, SCOPES) + + if (not creds) or (not creds.valid): + if creds and creds.expired and creds.refresh_token: + creds.refresh(GRequest()) + else: + flow = InstalledAppFlow.from_client_secrets_file(credentials_json_path, SCOPES) + try: + creds = flow.run_local_server(port=0) + except Exception: + creds = flow.run_console() + + os.makedirs(os.path.dirname(token_json_path), exist_ok=True) + with open(token_json_path, "w", encoding="utf-8") as f: + f.write(creds.to_json()) + + docs = build("docs", "v1", credentials=creds, cache_discovery=False) + drive = build("drive", "v3", credentials=creds, cache_discovery=False) + return docs, drive, "oauth" + + +# ----------------------------------------------------------- +# ROCKETBOT VAR HELPERS +# ----------------------------------------------------------- +def _sv(name, value): + try: + SetVar(name, value) + except Exception: + pass + + +def _missing(v) -> bool: + if v is None: + return True + if isinstance(v, str): + s = v.strip() + return s == "" or s == "ERROR_NOT_VAR" + return False + + +def _gv(name, default=None): + try: + v = GetVar(name) + except Exception: + return default + return default if _missing(v) else v + + +def _gvs(name, default="") -> str: + v = _gv(name, default) + if v is None: + return default + s = str(v).strip() + return default if (s == "" or s == "ERROR_NOT_VAR") else s + + +# ----------------------------------------------------------- +# DRIVE / DOC HELPERS +# ----------------------------------------------------------- +def extract_doc_id_from_url(url): + m = re.search(r"/document/d/([a-zA-Z0-9_-]+)", url or "") + return m.group(1) if m else "" + + + +def ensure_docs_api_compatible(drive_service, file_id: str): + meta = drive_service.files().get( + fileId=file_id, + fields="id,name,mimeType,shortcutDetails", + supportsAllDrives=True, + ).execute() + + if meta.get("mimeType") == "application/vnd.google-apps.shortcut": + target = (meta.get("shortcutDetails") or {}).get("targetId", "") + if not target: + raise RuntimeError("Es shortcut pero no trae targetId.") + file_id = target + meta = drive_service.files().get( + fileId=file_id, + fields="id,name,mimeType", + supportsAllDrives=True, + ).execute() + + if meta.get("mimeType") != "application/vnd.google-apps.document": + new_name = (meta.get("name") or "Documento") + " (Google Docs)" + converted = drive_service.files().copy( + fileId=file_id, + body={"name": new_name, "mimeType": "application/vnd.google-apps.document"}, + fields="id,name,mimeType", + supportsAllDrives=True, + ).execute() + return converted["id"], converted, True + + return file_id, meta, False + + + +def docs_get(docs_service, doc_id: str): + return docs_service.documents().get(documentId=doc_id).execute() + + +# ----------------------------------------------------------- +# BUSINESS LOGIC +# ----------------------------------------------------------- +def obtener_textos_reemplazo(firma_digital: str): + firma_digital = (firma_digital or "").strip().upper() + + if firma_digital == "V": + texto_clausula = ( + "Forman parte integrante e inseparable del presente contrato los Anexos " + "que se detallan a continuación, los cuales se consideran plenamente " + "conocidos y aceptados por las partes con la sola suscripción del " + "presente instrumento. " + ) + texto_firmas_secundarias = "" + else: + texto_clausula = ( + "Porción del presente contrato debiendo ser firmados como parte " + "integrante los siguientes ANEXOS:\n" + ) + texto_firmas_secundarias = ( + "___________________________________\t\t_________________________________\n" + "COMPRADOR\t\t\t\t\t\tVENDEDOR" + ) + + return texto_clausula, texto_firmas_secundarias + + + +def obtener_texto_e_indices(documento: dict): + texto = [] + indices = [] + + def recorrer_elementos(elementos): + for el in elementos or []: + parrafo = el.get("paragraph") + if parrafo: + for pe in parrafo.get("elements", []): + text_run = pe.get("textRun") + if not text_run: + continue + contenido = text_run.get("content", "") + start_index = pe.get("startIndex") + if start_index is None: + continue + for i, ch in enumerate(contenido): + texto.append(ch) + indices.append(start_index + i) + + tabla = el.get("table") + if tabla: + for fila in tabla.get("tableRows", []): + for celda in fila.get("tableCells", []): + recorrer_elementos(celda.get("content", [])) + + toc = el.get("tableOfContents") + if toc: + recorrer_elementos(toc.get("content", [])) + + recorrer_elementos((documento.get("body") or {}).get("content", [])) + return "".join(texto), indices + + + +def quitar_negrita_marker_y_siguiente_char(docs_service, document_id: str, marker_text: str): + documento = docs_get(docs_service, document_id) + texto_completo, indices = obtener_texto_e_indices(documento) + + if not marker_text: + return 0 + + requests = [] + pos = 0 + while True: + idx = texto_completo.find(marker_text, pos) + if idx == -1: + break + + start_index = indices[idx] + end_pos = idx + len(marker_text) + if end_pos < len(indices): + end_index = indices[end_pos] + 1 + else: + end_index = indices[end_pos - 1] + 1 + + requests.append({ + "updateTextStyle": { + "range": { + "startIndex": start_index, + "endIndex": end_index, + }, + "textStyle": { + "bold": False, + }, + "fields": "bold", + } + }) + pos = idx + len(marker_text) + + if requests: + docs_service.documents().batchUpdate( + documentId=document_id, + body={"requests": requests}, + ).execute() + + return len(requests) + + +def reemplazar_clausula_y_firmas(docs_service, document_id: str, firma_digital: str, + marker_clausula: str = "CLAUSULA_FIRMA_DIGITAL", + marker_firmas: str = "FIRMAS_SECUNDARIAS"): + texto_clausula, texto_firmas_secundarias = obtener_textos_reemplazo(firma_digital) + + requests = [ + { + "replaceAllText": { + "containsText": { + "text": marker_clausula, + "matchCase": True, + }, + "replaceText": texto_clausula, + } + }, + { + "replaceAllText": { + "containsText": { + "text": marker_firmas, + "matchCase": True, + }, + "replaceText": texto_firmas_secundarias, + } + }, + ] + + return docs_service.documents().batchUpdate( + documentId=document_id, + body={"requests": requests}, + ).execute() + + +# Rocketbot quirk FIX +globals().update(locals()) + + +# ----------------------------------------------------------- +# MAIN +# ----------------------------------------------------------- +try: + _sv("error_firma_digital", "") + + url_doc = _gvs("current_url", "") + if url_doc == "": + raise RuntimeError("Falta current_url con la URL exacta del documento a procesar.") + + _sv("debug_current_url_var", url_doc) + + raw_id = extract_doc_id_from_url(url_doc) + if not raw_id: + raise RuntimeError("No pude extraer documentId de la URL.") + + _sv("gdoc_original_id", raw_id) + + cred_path = _gvs("gdoc_sa_json", "") + if cred_path == "": + raise RuntimeError("Falta la variable gdoc_sa_json.") + + if not os.path.isabs(cred_path): + cred_path = os.path.join(base_dir, cred_path) + + if not os.path.exists(cred_path): + raise RuntimeError("No existe credentials.json: " + cred_path) + + token_path = _gvs("gdoc_token_json", "") + if token_path == "": + token_path = os.path.join(base_dir, "credentials", "token_gdocs_drive.json") + if not os.path.isabs(token_path): + token_path = os.path.join(base_dir, token_path) + + docs_service, drive_service, auth_mode = get_services(cred_path, token_path) + _sv("gdoc_auth_mode", auth_mode) + + doc_id, meta_file, converted = ensure_docs_api_compatible(drive_service, raw_id) + _sv("gdoc_id", doc_id) + _sv("gdoc_converted", "1" if converted else "0") + _sv("gdoc_url_final", f"https://docs.google.com/document/d/{doc_id}/edit") + _sv("gdoc_name", meta_file.get("name", "")) + + doc_before = docs_get(docs_service, doc_id) + _sv("gdoc_revision_before", doc_before.get("revisionId", "")) + + firma_digital = _gvs("firma_digital", "") + marker_clausula = _gvs("marker_clausula_firma_digital", "CLAUSULA_FIRMA_DIGITAL") + marker_firmas = _gvs("marker_firmas_secundarias", "FIRMAS_SECUNDARIAS") + + if firma_digital == "": + raise RuntimeError("Falta la variable firma_digital.") + + _sv("firma_digital_usada", firma_digital) + _sv("marker_clausula_firma_digital_usado", marker_clausula) + _sv("marker_firmas_secundarias_usado", marker_firmas) + + ocurrencias_desnegritadas = quitar_negrita_marker_y_siguiente_char( + docs_service=docs_service, + document_id=doc_id, + marker_text=marker_clausula, + ) + _sv("marker_clausula_desnegritado_previo", str(ocurrencias_desnegritadas)) + + resultado = reemplazar_clausula_y_firmas( + docs_service=docs_service, + document_id=doc_id, + firma_digital=firma_digital, + marker_clausula=marker_clausula, + marker_firmas=marker_firmas, + ) + + replies = resultado.get("replies") or [] + ocurrencias_clausula = 0 + ocurrencias_firmas = 0 + + if len(replies) > 0: + ocurrencias_clausula = int(((replies[0] or {}).get("replaceAllText") or {}).get("occurrencesChanged", 0)) + if len(replies) > 1: + ocurrencias_firmas = int(((replies[1] or {}).get("replaceAllText") or {}).get("occurrencesChanged", 0)) + + _sv("reemplazos_clausula_firma_digital", str(ocurrencias_clausula)) + _sv("reemplazos_firmas_secundarias", str(ocurrencias_firmas)) + _sv("status_proceso", "Cláusula y firmas reemplazadas correctamente") + + print("Reemplazos realizados correctamente.") + print(resultado) + +except Exception as e: + import traceback + + error_msg = f"Error en script: {str(e)} - {traceback.format_exc()}" + print(error_msg) + _sv("status_proceso", "ERROR") + _sv("error_firma_digital", error_msg) + raise