from __future__ import annotations import os import sys import json import re import unicodedata import datetime as dt from decimal import Decimal from urllib.request import Request, urlopen # -------------------- # Boot libs (Rocketbot portable) # -------------------- base_dir = os.path.dirname(sys.executable) # ...\Rocketbot libs_dir = os.path.join(base_dir, "py_libs", "py310") sys.path.insert(0, libs_dir) # Rocketbot a veces ya trae pyparsing viejo en memoria for k in list(sys.modules.keys()): if k == "pyparsing" or k.startswith("pyparsing."): del sys.modules[k] from googleapiclient.discovery import build from googleapiclient.errors import HttpError from google.oauth2 import service_account # -------------------- # Rocketbot vars helpers # -------------------- def _missing(v) -> bool: if v is None: return True if isinstance(v, str): s = v.strip() return s == "" or s == "ERROR_NOT_VAR" return False def _gv(name, default=None): try: v = GetVar(name) except Exception: return default return default if _missing(v) else v def _gvs(name, default="") -> str: v = _gv(name, default) if v is None: return default s = str(v).strip() return default if (s == "" or s == "ERROR_NOT_VAR") else s def _sv(name, value): try: SetVar(name, value) except Exception: pass # -------------------- # Google Auth # -------------------- SCOPES = {scopes_api_google} def _load_json(path): with open(path, "r", encoding="utf-8") as f: return json.load(f) def get_services(credentials_json_path, impersonated_user): info = _load_json(credentials_json_path) if not isinstance(info, dict) or info.get("type") != "service_account": raise RuntimeError("gdoc_sa_json2 debe apuntar a un JSON de cuenta de servicio.") impersonated_user = (impersonated_user or "").strip() if not impersonated_user: raise RuntimeError("Falta la variable gdoc_impersonated_user.") creds = service_account.Credentials.from_service_account_file( credentials_json_path, scopes=SCOPES, subject=impersonated_user, ) docs = build("docs", "v1", credentials=creds, cache_discovery=False) drive = build("drive", "v3", credentials=creds, cache_discovery=False) return docs, drive, "service_account_impersonated" # -------------------- # Drive helpers # -------------------- def extract_doc_id_from_url(url): m = re.search(r"/document/d/([a-zA-Z0-9_-]+)", url or "") return m.group(1) if m else "" def ensure_docs_api_compatible(drive_service, file_id: str): """ Si el ID es shortcut o NO es Google Docs nativo (ej: docx), crea una copia convertida a application/vnd.google-apps.document y devuelve el nuevo ID. """ meta = drive_service.files().get( fileId=file_id, fields="id,name,mimeType,shortcutDetails", supportsAllDrives=True ).execute() # Shortcut -> target if meta.get("mimeType") == "application/vnd.google-apps.shortcut": target = (meta.get("shortcutDetails") or {}).get("targetId", "") if not target: raise RuntimeError("Es shortcut pero no trae targetId.") file_id = target meta = drive_service.files().get( fileId=file_id, fields="id,name,mimeType", supportsAllDrives=True ).execute() # Convert if not native Google Doc if meta.get("mimeType") != "application/vnd.google-apps.document": new_name = (meta.get("name") or "Documento") + " (Google Docs)" converted = drive_service.files().copy( fileId=file_id, body={"name": new_name, "mimeType": "application/vnd.google-apps.document"}, fields="id,name,mimeType", supportsAllDrives=True ).execute() return converted["id"], converted, True return file_id, meta, False # -------------------- # Docs GET (tabs) # -------------------- def docs_get(docs_service, doc_id: str): # includeTabsContent=true para docs con Tabs try: return docs_service.documents().get(documentId=doc_id, includeTabsContent=True).execute() except TypeError: return docs_service.documents().get(documentId=doc_id).execute() def _walk_tabs(tabs): if not tabs: return for t in tabs: yield t for x in _walk_tabs(t.get("childTabs") or []): yield x def _get_tab_body_content(doc, tab_id): tabs = doc.get("tabs") if not tabs: return (doc.get("body") or {}).get("content", []) for t in _walk_tabs(tabs): tid = ((t.get("tabProperties") or {}).get("tabId")) or "" if tid == (tab_id or ""): dtab = t.get("documentTab") or {} return (dtab.get("body") or {}).get("content", []) return [] # -------------------- # Marker search (BODY only, robust) # -------------------- def _is_alnum(ch: str) -> bool: return ch.isalnum() def _norm_alnum(s: str) -> str: if s is None: return "" s = unicodedata.normalize("NFKD", str(s)).lower() out = [] for ch in s: if unicodedata.category(ch) == "Mn": continue if ch.isalnum(): out.append(ch) return "".join(out) def _iter_text_chars_from_body(doc, tab_id): """ Yields (docIndex:int, ch:str) en orden, solo del BODY. """ content = _get_tab_body_content(doc, tab_id) def walk(content_list): for el in content_list or []: if not isinstance(el, dict): continue p = el.get("paragraph") if p: for pe in p.get("elements", []): tr = pe.get("textRun") if not tr: continue txt = tr.get("content", "") st = pe.get("startIndex") if st is None or txt is None: continue for i, ch in enumerate(txt): yield (st + i, ch) continue t = el.get("table") if t: for row in t.get("tableRows", []): for cell in row.get("tableCells", []): yield from walk(cell.get("content", [])) continue toc = el.get("tableOfContents") if toc: yield from walk(toc.get("content", [])) continue yield from walk(content) def find_marker_in_body(doc, marker: str): """ Busca marker en el BODY (no headers/footers), soporta tabs. Matching: alfanumérico normalizado (fuzzy). Retorna dict: {tabId, start, end, mode} """ target = _norm_alnum(marker) if not target: return None tabs = doc.get("tabs") tab_ids = [None] if tabs: tab_ids = [] for t in _walk_tabs(tabs): tid = ((t.get("tabProperties") or {}).get("tabId")) or "" tab_ids.append(tid) for tab_id in tab_ids: norm_chars = [] norm_to_docidx = [] # norm position -> real doc index of that char for doc_i, ch in _iter_text_chars_from_body(doc, tab_id): if _is_alnum(ch): norm_chars.append(ch.lower()) norm_to_docidx.append(doc_i) norm_text = "".join(norm_chars) pos = norm_text.find(target) if pos != -1: start_doc = norm_to_docidx[pos] end_doc = norm_to_docidx[pos + len(target) - 1] + 1 return {"tabId": tab_id, "start": int(start_doc), "end": int(end_doc), "mode": "fuzzy_alnum"} return None # -------------------- # Docs batch helpers (tab-aware) # -------------------- def _loc(index, tab_id): d = {"index": int(index)} if tab_id: d["tabId"] = tab_id return d def _range(start, end, tab_id): d = {"startIndex": int(start), "endIndex": int(end)} if tab_id: d["tabId"] = tab_id return d def _dim_pt(x: float): return {"magnitude": float(x), "unit": "PT"} def _batch_update(docs_service, doc_id, reqs, chunk=900): for i in range(0, len(reqs), chunk): docs_service.documents().batchUpdate( documentId=doc_id, body={"requests": reqs[i:i + chunk]} ).execute() # -------------------- # Page box (max size) # -------------------- def _get_pt(dimension_obj, default_val): try: if not dimension_obj: return float(default_val) mag = dimension_obj.get("magnitude", default_val) return float(mag) except Exception: return float(default_val) def get_page_limits_pt(doc): """ Devuelve (max_w_pt, max_h_pt) basado en pageSize - margins. Fallback a Letter 8.5x11 con 1in margen si no existe. """ ds = doc.get("documentStyle") or {} ps = ds.get("pageSize") or {} w_pt = _get_pt(ps.get("width"), 612.0) # 8.5in * 72 h_pt = _get_pt(ps.get("height"), 792.0) # 11in * 72 ml = _get_pt(ds.get("marginLeft"), 72.0) mr = _get_pt(ds.get("marginRight"), 72.0) mt = _get_pt(ds.get("marginTop"), 72.0) mb = _get_pt(ds.get("marginBottom"), 72.0) max_w = max(100.0, w_pt - ml - mr - 12.0) max_h = max(100.0, h_pt - mt - mb - 12.0) return max_w, max_h # -------------------- # Image URL + dimensions # -------------------- def extract_drive_file_id(url: str) -> str: if not url: return "" # /file/d//... or /d//... m = re.search(r"/d/([a-zA-Z0-9_-]{10,})", url) if m: return m.group(1) # ?id= m = re.search(r"[?&]id=([a-zA-Z0-9_-]{10,})", url) if m: return m.group(1) return "" def sniff_png_wh(data: bytes): # PNG: width/height en IHDR (bytes 16..24) if len(data) >= 24 and data[:8] == b"\x89PNG\r\n\x1a\n": w = int.from_bytes(data[16:20], "big") h = int.from_bytes(data[20:24], "big") return w, h return None def sniff_gif_wh(data: bytes): if len(data) >= 10 and (data[:6] in (b"GIF87a", b"GIF89a")): w = int.from_bytes(data[6:8], "little") h = int.from_bytes(data[8:10], "little") return w, h return None def sniff_jpeg_wh(data: bytes): # JPEG: buscar SOF0/SOF2 if len(data) < 4 or data[0:2] != b"\xFF\xD8": return None i = 2 n = len(data) while i + 9 < n: if data[i] != 0xFF: i += 1 continue # saltar FFs while i < n and data[i] == 0xFF: i += 1 if i >= n: break marker = data[i] i += 1 # markers sin length if marker in (0xD8, 0xD9): continue if i + 1 >= n: break seglen = int.from_bytes(data[i:i+2], "big") if seglen < 2 or i + seglen > n: break # SOF0..SOF3, SOF5..SOF7, SOF9..SOF11, SOF13..SOF15 if marker in (0xC0, 0xC1, 0xC2, 0xC3, 0xC5, 0xC6, 0xC7, 0xC9, 0xCA, 0xCB, 0xCD, 0xCE, 0xCF): # data: [len2][precision1][height2][width2]... if i + 7 < n: h = int.from_bytes(data[i+3:i+5], "big") w = int.from_bytes(data[i+5:i+7], "big") return w, h i += seglen return None def sniff_image_wh_from_url(url: str, timeout=20): """ Intenta leer solo cabecera (Range) para detectar dims. """ headers = { "User-Agent": "Mozilla/5.0", "Range": "bytes=0-65535", } req = Request(url, headers=headers) with urlopen(req, timeout=timeout) as resp: data = resp.read(65536) for fn in (sniff_png_wh, sniff_gif_wh, sniff_jpeg_wh): wh = fn(data) if wh: return wh[0], wh[1] return None, None def resolve_image(drive_service, url: str): """ Retorna: (uri_para_docs, w_px, h_px, source) Si es link de Drive y NO se puede acceder/leer metadata, retorna uri="" (no encontrado). """ if not url: return ("", None, None, "empty") fid = extract_drive_file_id(url) if fid: try: meta = drive_service.files().get( fileId=fid, fields="id,mimeType,imageMediaMetadata(width,height)", supportsAllDrives=True ).execute() im = meta.get("imageMediaMetadata") or {} w = im.get("width") h = im.get("height") uri = f"https://drive.google.com/uc?export=download&id={fid}" return (uri, w, h, "drive") except Exception as e: return ("", None, None, f"drive_error:{type(e).__name__}") # URL externa w = h = None try: w, h = sniff_image_wh_from_url(url) except Exception: w, h = None, None return (url, w, h, "url") def fit_object_size_pt(w_px, h_px, max_w_pt, max_h_pt): """ Convierte px->pt usando 0.75 (96dpi->72pt) y escala para no exceder max_w/max_h. """ if not w_px or not h_px: # fallback seguro w_pt = float(max_w_pt) h_pt = float(min(max_h_pt, max_w_pt * 0.75)) return w_pt, h_pt w_pt0 = float(w_px) * 0.75 h_pt0 = float(h_px) * 0.75 if w_pt0 <= 0 or h_pt0 <= 0: w_pt = float(max_w_pt) h_pt = float(min(max_h_pt, max_w_pt * 0.75)) return w_pt, h_pt scale = min(max_w_pt / w_pt0, max_h_pt / h_pt0, 1.0) return w_pt0 * scale, h_pt0 * scale # -------------------- # Insert planos at marker # -------------------- def insert_planos_at_marker(docs_service, drive_service, doc_id: str, marker: str, url1: str, url2: str): # Flags Rocketbot: arrancar siempre en False _sv("plano1_found", "False") _sv("plano2_found", "False") doc = docs_get(docs_service, doc_id) hit = find_marker_in_body(doc, marker) if not hit: _sv("gdoc_planos_marker_found", "0") _sv("gdoc_planos_marker", marker) raise RuntimeError("No encontre el marcador en el BODY del documento: " + str(marker)) tab_id = hit.get("tabId") start = int(hit["start"]) end = int(hit["end"]) _sv("gdoc_planos_marker_found", "1") _sv("gdoc_planos_marker_mode", hit.get("mode", "")) _sv("gdoc_planos_marker_tabId", tab_id or "") _sv("gdoc_planos_marker_start", str(start)) _sv("gdoc_planos_marker_end", str(end)) max_w_pt, max_h_pt = get_page_limits_pt(doc) _sv("gdoc_planos_max_w_pt", str(max_w_pt)) _sv("gdoc_planos_max_h_pt", str(max_h_pt)) # Resolver URLs + dims (y setear flags) uri1, w1, h1, src1 = resolve_image(drive_service, url1) # plano producto if uri1: _sv("plano1_found", "True") uri2, w2, h2, src2 = resolve_image(drive_service, url2) # plano piso if uri2: _sv("plano2_found", "True") # Validación: si falta alguno, cortar (y Rocketbot ya ve cuáles quedaron True/False) if not uri1 or not uri2: if not uri1 and not uri2: raise RuntimeError("No se encontró ningún plano (plano 1 y plano 2). Revisá url_plano_producto y url_plano_piso.") if not uri1: raise RuntimeError("No se encontró el plano 1 (url_plano_producto). Verificá la URL / permisos en Drive.") raise RuntimeError("No se encontró el plano 2 (url_plano_piso). Verificá la URL / permisos en Drive.") _sv("gdoc_planos_uri1", uri1) _sv("gdoc_planos_uri2", uri2) _sv("gdoc_planos_src1", src1) _sv("gdoc_planos_src2", src2) _sv("gdoc_planos_w1_px", "" if w1 is None else str(w1)) _sv("gdoc_planos_h1_px", "" if h1 is None else str(h1)) _sv("gdoc_planos_w2_px", "" if w2 is None else str(w2)) _sv("gdoc_planos_h2_px", "" if h2 is None else str(h2)) w1_pt, h1_pt = fit_object_size_pt(w1, h1, max_w_pt, max_h_pt) w2_pt, h2_pt = fit_object_size_pt(w2, h2, max_w_pt, max_h_pt) # Construcción robusta SIN depender de index+1: # Insertamos SIEMPRE en el mismo start para apilar: # [img1][\n][img2][\n] reqs = [ {"deleteContentRange": {"range": _range(start, end, tab_id)}}, # 1) newline final {"insertText": {"location": _loc(start, tab_id), "text": "\n"}}, # 2) img2 (queda antes del newline final) {"insertInlineImage": { "location": _loc(start, tab_id), "uri": uri2, "objectSize": {"width": _dim_pt(w2_pt), "height": _dim_pt(h2_pt)}, }}, # 3) newline entre imágenes (queda antes de img2) {"insertText": {"location": _loc(start, tab_id), "text": "\n"}}, # 4) img1 (queda antes de newline entre imágenes) {"insertInlineImage": { "location": _loc(start, tab_id), "uri": uri1, "objectSize": {"width": _dim_pt(w1_pt), "height": _dim_pt(h1_pt)}, }}, ] _batch_update(docs_service, doc_id, reqs) # Rocketbot quirk FIX globals().update(locals()) # -------------------- # MAIN # -------------------- try: # Flags globales: arrancar siempre en False (por si fallamos antes de la función) _sv("plano1_found", "False") _sv("plano2_found", "False") marker = _gvs("gdoc_marker_planos", "INSERTAR_PLANOS") url1 = _gvs("url_plano_producto", "") url2 = _gvs("url_plano_piso", "") cred_path = _gvs("gdoc_sa_json2", "") if cred_path == "": raise RuntimeError("Falta gdoc_sa_json2 (ruta al JSON de cuenta de servicio)") if not os.path.isabs(cred_path): cred_path = os.path.join(base_dir, cred_path) if not os.path.exists(cred_path): raise RuntimeError("No existe JSON de cuenta de servicio: " + cred_path) impersonated_user = _gvs("gdoc_impersonated_user", "") _sv("gdoc_credentials_used", cred_path) _sv("gdoc_impersonated_user_used", impersonated_user) docs_service, drive_service, auth_mode = get_services(cred_path, impersonated_user) _sv("gdoc_auth_mode", auth_mode) # doc_id desde URL del navegador url_doc = "{current_url}" #if url_doc == "": # url_doc = _gvs("current_url", "") #_sv("gdoc_url_used", url_doc) doc_id = extract_doc_id_from_url(url_doc) if doc_id == "": raise RuntimeError("No pude extraer documentId de la URL: " + url_doc) doc_id_final, meta_file, converted = ensure_docs_api_compatible(drive_service, doc_id) _sv("gdoc_original_id", doc_id) _sv("gdoc_id", doc_id_final) _sv("gdoc_file_name", meta_file.get("name", "")) _sv("gdoc_file_mimeType", meta_file.get("mimeType", "")) _sv("gdoc_converted", "1" if converted else "0") _sv("gdoc_url_final", f"https://docs.google.com/document/d/{doc_id_final}/edit") insert_planos_at_marker(docs_service, drive_service, doc_id_final, marker, url1, url2) _sv("gdoc_planos_status", "OK") _sv("gdoc_planos_error", "") except HttpError as e: _sv("gdoc_planos_status", "ERROR") _sv("gdoc_planos_error", "HttpError: " + str(e)) raise except Exception as e: _sv("gdoc_planos_status", "ERROR") _sv("gdoc_planos_error", str(e)) raise