diff --git a/agregar_planos.py b/agregar_planos.py new file mode 100644 index 0000000..45f74b2 --- /dev/null +++ b/agregar_planos.py @@ -0,0 +1,621 @@ +from __future__ import annotations + +import os +import sys +import json +import re +import unicodedata +import datetime as dt +from decimal import Decimal +from urllib.request import Request, urlopen + +# -------------------- +# Boot libs (Rocketbot portable) +# -------------------- +base_dir = os.path.dirname(sys.executable) # ...\Rocketbot +libs_dir = os.path.join(base_dir, "py_libs", "py310") +sys.path.insert(0, libs_dir) + +# Rocketbot a veces ya trae pyparsing viejo en memoria +for k in list(sys.modules.keys()): + if k == "pyparsing" or k.startswith("pyparsing."): + del sys.modules[k] + +from googleapiclient.discovery import build +from googleapiclient.errors import HttpError +from google.oauth2 import service_account +from google.oauth2.credentials import Credentials +from google_auth_oauthlib.flow import InstalledAppFlow +from google.auth.transport.requests import Request as GRequest + +# -------------------- +# Rocketbot vars helpers +# -------------------- +def _missing(v) -> bool: + if v is None: + return True + if isinstance(v, str): + s = v.strip() + return s == "" or s == "ERROR_NOT_VAR" + return False + +def _gv(name, default=None): + try: + v = GetVar(name) + except Exception: + return default + return default if _missing(v) else v + +def _gvs(name, default="") -> str: + v = _gv(name, default) + if v is None: + return default + s = str(v).strip() + return default if (s == "" or s == "ERROR_NOT_VAR") else s + +def _sv(name, value): + try: + SetVar(name, value) + except Exception: + pass + +# -------------------- +# Google Auth +# -------------------- +SCOPES = {scopes_api_google} + +def _load_json(path): + with open(path, "r", encoding="utf-8") as f: + return json.load(f) + +def get_services(credentials_json_path, token_json_path): + info = _load_json(credentials_json_path) + + # Service Account + if isinstance(info, dict) and info.get("type") == "service_account": + creds = service_account.Credentials.from_service_account_file( + credentials_json_path, scopes=SCOPES + ) + docs = build("docs", "v1", credentials=creds, cache_discovery=False) + drive = build("drive", "v3", credentials=creds, cache_discovery=False) + return docs, drive, "service_account" + + # OAuth Desktop + creds = None + if os.path.exists(token_json_path): + creds = Credentials.from_authorized_user_file(token_json_path, SCOPES) + + if (not creds) or (not creds.valid): + if creds and creds.expired and creds.refresh_token: + creds.refresh(GRequest()) + else: + flow = InstalledAppFlow.from_client_secrets_file(credentials_json_path, SCOPES) + try: + creds = flow.run_local_server(port=0) + except Exception: + creds = flow.run_console() + + os.makedirs(os.path.dirname(token_json_path), exist_ok=True) + with open(token_json_path, "w", encoding="utf-8") as f: + f.write(creds.to_json()) + + docs = build("docs", "v1", credentials=creds, cache_discovery=False) + drive = build("drive", "v3", credentials=creds, cache_discovery=False) + return docs, drive, "oauth" + +# -------------------- +# Drive helpers +# -------------------- +def extract_doc_id_from_url(url): + m = re.search(r"/document/d/([a-zA-Z0-9_-]+)", url or "") + return m.group(1) if m else "" + +def ensure_docs_api_compatible(drive_service, file_id: str): + """ + Si el ID es shortcut o NO es Google Docs nativo (ej: docx), + crea una copia convertida a application/vnd.google-apps.document y devuelve el nuevo ID. + """ + meta = drive_service.files().get( + fileId=file_id, + fields="id,name,mimeType,shortcutDetails", + supportsAllDrives=True + ).execute() + + # Shortcut -> target + if meta.get("mimeType") == "application/vnd.google-apps.shortcut": + target = (meta.get("shortcutDetails") or {}).get("targetId", "") + if not target: + raise RuntimeError("Es shortcut pero no trae targetId.") + file_id = target + meta = drive_service.files().get( + fileId=file_id, + fields="id,name,mimeType", + supportsAllDrives=True + ).execute() + + # Convert if not native Google Doc + if meta.get("mimeType") != "application/vnd.google-apps.document": + new_name = (meta.get("name") or "Documento") + " (Google Docs)" + converted = drive_service.files().copy( + fileId=file_id, + body={"name": new_name, "mimeType": "application/vnd.google-apps.document"}, + fields="id,name,mimeType", + supportsAllDrives=True + ).execute() + return converted["id"], converted, True + + return file_id, meta, False + +# -------------------- +# Docs GET (tabs) +# -------------------- +def docs_get(docs_service, doc_id: str): + # includeTabsContent=true para docs con Tabs + try: + return docs_service.documents().get(documentId=doc_id, includeTabsContent=True).execute() + except TypeError: + return docs_service.documents().get(documentId=doc_id).execute() + +def _walk_tabs(tabs): + if not tabs: + return + for t in tabs: + yield t + for x in _walk_tabs(t.get("childTabs") or []): + yield x + +def _get_tab_body_content(doc, tab_id): + tabs = doc.get("tabs") + if not tabs: + return (doc.get("body") or {}).get("content", []) + for t in _walk_tabs(tabs): + tid = ((t.get("tabProperties") or {}).get("tabId")) or "" + if tid == (tab_id or ""): + dtab = t.get("documentTab") or {} + return (dtab.get("body") or {}).get("content", []) + return [] + +# -------------------- +# Marker search (BODY only, robust) +# -------------------- +def _is_alnum(ch: str) -> bool: + return ch.isalnum() + +def _norm_alnum(s: str) -> str: + if s is None: + return "" + s = unicodedata.normalize("NFKD", str(s)).lower() + out = [] + for ch in s: + if unicodedata.category(ch) == "Mn": + continue + if ch.isalnum(): + out.append(ch) + return "".join(out) + +def _iter_text_chars_from_body(doc, tab_id): + """ + Yields (docIndex:int, ch:str) en orden, solo del BODY. + """ + content = _get_tab_body_content(doc, tab_id) + + def walk(content_list): + for el in content_list or []: + if not isinstance(el, dict): + continue + + p = el.get("paragraph") + if p: + for pe in p.get("elements", []): + tr = pe.get("textRun") + if not tr: + continue + txt = tr.get("content", "") + st = pe.get("startIndex") + if st is None or txt is None: + continue + for i, ch in enumerate(txt): + yield (st + i, ch) + continue + + t = el.get("table") + if t: + for row in t.get("tableRows", []): + for cell in row.get("tableCells", []): + yield from walk(cell.get("content", [])) + continue + + toc = el.get("tableOfContents") + if toc: + yield from walk(toc.get("content", [])) + continue + + yield from walk(content) + +def find_marker_in_body(doc, marker: str): + """ + Busca marker en el BODY (no headers/footers), soporta tabs. + Matching: alfanumérico normalizado (fuzzy). + Retorna dict: {tabId, start, end, mode} + """ + target = _norm_alnum(marker) + if not target: + return None + + tabs = doc.get("tabs") + tab_ids = [None] + if tabs: + tab_ids = [] + for t in _walk_tabs(tabs): + tid = ((t.get("tabProperties") or {}).get("tabId")) or "" + tab_ids.append(tid) + + for tab_id in tab_ids: + norm_chars = [] + norm_to_docidx = [] # norm position -> real doc index of that char + + for doc_i, ch in _iter_text_chars_from_body(doc, tab_id): + if _is_alnum(ch): + norm_chars.append(ch.lower()) + norm_to_docidx.append(doc_i) + + norm_text = "".join(norm_chars) + pos = norm_text.find(target) + if pos != -1: + start_doc = norm_to_docidx[pos] + end_doc = norm_to_docidx[pos + len(target) - 1] + 1 + return {"tabId": tab_id, "start": int(start_doc), "end": int(end_doc), "mode": "fuzzy_alnum"} + + return None + +# -------------------- +# Docs batch helpers (tab-aware) +# -------------------- +def _loc(index, tab_id): + d = {"index": int(index)} + if tab_id: + d["tabId"] = tab_id + return d + +def _range(start, end, tab_id): + d = {"startIndex": int(start), "endIndex": int(end)} + if tab_id: + d["tabId"] = tab_id + return d + +def _dim_pt(x: float): + return {"magnitude": float(x), "unit": "PT"} + +def _batch_update(docs_service, doc_id, reqs, chunk=900): + for i in range(0, len(reqs), chunk): + docs_service.documents().batchUpdate( + documentId=doc_id, + body={"requests": reqs[i:i + chunk]} + ).execute() + +# -------------------- +# Page box (max size) +# -------------------- +def _get_pt(dimension_obj, default_val): + try: + if not dimension_obj: + return float(default_val) + mag = dimension_obj.get("magnitude", default_val) + return float(mag) + except Exception: + return float(default_val) + +def get_page_limits_pt(doc): + """ + Devuelve (max_w_pt, max_h_pt) basado en pageSize - margins. + Fallback a Letter 8.5x11 con 1in margen si no existe. + """ + ds = doc.get("documentStyle") or {} + ps = ds.get("pageSize") or {} + w_pt = _get_pt(ps.get("width"), 612.0) # 8.5in * 72 + h_pt = _get_pt(ps.get("height"), 792.0) # 11in * 72 + + ml = _get_pt(ds.get("marginLeft"), 72.0) + mr = _get_pt(ds.get("marginRight"), 72.0) + mt = _get_pt(ds.get("marginTop"), 72.0) + mb = _get_pt(ds.get("marginBottom"), 72.0) + + max_w = max(100.0, w_pt - ml - mr - 12.0) + max_h = max(100.0, h_pt - mt - mb - 12.0) + return max_w, max_h + +# -------------------- +# Image URL + dimensions +# -------------------- +def extract_drive_file_id(url: str) -> str: + if not url: + return "" + # /file/d//... or /d//... + m = re.search(r"/d/([a-zA-Z0-9_-]{10,})", url) + if m: + return m.group(1) + # ?id= + m = re.search(r"[?&]id=([a-zA-Z0-9_-]{10,})", url) + if m: + return m.group(1) + return "" + +def sniff_png_wh(data: bytes): + # PNG: width/height en IHDR (bytes 16..24) + if len(data) >= 24 and data[:8] == b"\x89PNG\r\n\x1a\n": + w = int.from_bytes(data[16:20], "big") + h = int.from_bytes(data[20:24], "big") + return w, h + return None + +def sniff_gif_wh(data: bytes): + if len(data) >= 10 and (data[:6] in (b"GIF87a", b"GIF89a")): + w = int.from_bytes(data[6:8], "little") + h = int.from_bytes(data[8:10], "little") + return w, h + return None + +def sniff_jpeg_wh(data: bytes): + # JPEG: buscar SOF0/SOF2 + if len(data) < 4 or data[0:2] != b"\xFF\xD8": + return None + i = 2 + n = len(data) + while i + 9 < n: + if data[i] != 0xFF: + i += 1 + continue + # saltar FFs + while i < n and data[i] == 0xFF: + i += 1 + if i >= n: + break + marker = data[i] + i += 1 + # markers sin length + if marker in (0xD8, 0xD9): + continue + if i + 1 >= n: + break + seglen = int.from_bytes(data[i:i+2], "big") + if seglen < 2 or i + seglen > n: + break + # SOF0..SOF3, SOF5..SOF7, SOF9..SOF11, SOF13..SOF15 + if marker in (0xC0, 0xC1, 0xC2, 0xC3, 0xC5, 0xC6, 0xC7, 0xC9, 0xCA, 0xCB, 0xCD, 0xCE, 0xCF): + # data: [len2][precision1][height2][width2]... + if i + 7 < n: + h = int.from_bytes(data[i+3:i+5], "big") + w = int.from_bytes(data[i+5:i+7], "big") + return w, h + i += seglen + return None + +def sniff_image_wh_from_url(url: str, timeout=20): + """ + Intenta leer solo cabecera (Range) para detectar dims. + """ + headers = { + "User-Agent": "Mozilla/5.0", + "Range": "bytes=0-65535", + } + req = Request(url, headers=headers) + with urlopen(req, timeout=timeout) as resp: + data = resp.read(65536) + + for fn in (sniff_png_wh, sniff_gif_wh, sniff_jpeg_wh): + wh = fn(data) + if wh: + return wh[0], wh[1] + return None, None + +def resolve_image(drive_service, url: str): + """ + Retorna: (uri_para_docs, w_px, h_px, source) + Si es link de Drive y NO se puede acceder/leer metadata, retorna uri="" (no encontrado). + """ + if not url: + return ("", None, None, "empty") + + fid = extract_drive_file_id(url) + if fid: + try: + meta = drive_service.files().get( + fileId=fid, + fields="id,mimeType,imageMediaMetadata(width,height)", + supportsAllDrives=True + ).execute() + + im = meta.get("imageMediaMetadata") or {} + w = im.get("width") + h = im.get("height") + + uri = f"https://drive.google.com/uc?export=download&id={fid}" + return (uri, w, h, "drive") + + except Exception as e: + return ("", None, None, f"drive_error:{type(e).__name__}") + + # URL externa + w = h = None + try: + w, h = sniff_image_wh_from_url(url) + except Exception: + w, h = None, None + + return (url, w, h, "url") + +def fit_object_size_pt(w_px, h_px, max_w_pt, max_h_pt): + """ + Convierte px->pt usando 0.75 (96dpi->72pt) y escala para no exceder max_w/max_h. + """ + if not w_px or not h_px: + # fallback seguro + w_pt = float(max_w_pt) + h_pt = float(min(max_h_pt, max_w_pt * 0.75)) + return w_pt, h_pt + + w_pt0 = float(w_px) * 0.75 + h_pt0 = float(h_px) * 0.75 + if w_pt0 <= 0 or h_pt0 <= 0: + w_pt = float(max_w_pt) + h_pt = float(min(max_h_pt, max_w_pt * 0.75)) + return w_pt, h_pt + + scale = min(max_w_pt / w_pt0, max_h_pt / h_pt0, 1.0) + return w_pt0 * scale, h_pt0 * scale + +# -------------------- +# Insert planos at marker +# -------------------- +def insert_planos_at_marker(docs_service, drive_service, doc_id: str, marker: str, url1: str, url2: str): + # Flags Rocketbot: arrancar siempre en False + _sv("plano1_found", "False") + _sv("plano2_found", "False") + + doc = docs_get(docs_service, doc_id) + + hit = find_marker_in_body(doc, marker) + if not hit: + _sv("gdoc_planos_marker_found", "0") + _sv("gdoc_planos_marker", marker) + raise RuntimeError("No encontre el marcador en el BODY del documento: " + str(marker)) + + tab_id = hit.get("tabId") + start = int(hit["start"]) + end = int(hit["end"]) + + _sv("gdoc_planos_marker_found", "1") + _sv("gdoc_planos_marker_mode", hit.get("mode", "")) + _sv("gdoc_planos_marker_tabId", tab_id or "") + _sv("gdoc_planos_marker_start", str(start)) + _sv("gdoc_planos_marker_end", str(end)) + + max_w_pt, max_h_pt = get_page_limits_pt(doc) + _sv("gdoc_planos_max_w_pt", str(max_w_pt)) + _sv("gdoc_planos_max_h_pt", str(max_h_pt)) + + # Resolver URLs + dims (y setear flags) + uri1, w1, h1, src1 = resolve_image(drive_service, url1) # plano producto + if uri1: + _sv("plano1_found", "True") + + uri2, w2, h2, src2 = resolve_image(drive_service, url2) # plano piso + if uri2: + _sv("plano2_found", "True") + + # Validación: si falta alguno, cortar (y Rocketbot ya ve cuáles quedaron True/False) + if not uri1 or not uri2: + if not uri1 and not uri2: + raise RuntimeError("No se encontró ningún plano (plano 1 y plano 2). Revisá url_plano_producto y url_plano_piso.") + if not uri1: + raise RuntimeError("No se encontró el plano 1 (url_plano_producto). Verificá la URL / permisos en Drive.") + raise RuntimeError("No se encontró el plano 2 (url_plano_piso). Verificá la URL / permisos en Drive.") + + _sv("gdoc_planos_uri1", uri1) + _sv("gdoc_planos_uri2", uri2) + _sv("gdoc_planos_src1", src1) + _sv("gdoc_planos_src2", src2) + _sv("gdoc_planos_w1_px", "" if w1 is None else str(w1)) + _sv("gdoc_planos_h1_px", "" if h1 is None else str(h1)) + _sv("gdoc_planos_w2_px", "" if w2 is None else str(w2)) + _sv("gdoc_planos_h2_px", "" if h2 is None else str(h2)) + + w1_pt, h1_pt = fit_object_size_pt(w1, h1, max_w_pt, max_h_pt) + w2_pt, h2_pt = fit_object_size_pt(w2, h2, max_w_pt, max_h_pt) + + # Construcción robusta SIN depender de index+1: + # Insertamos SIEMPRE en el mismo start para apilar: + # [img1][\n][img2][\n] + reqs = [ + {"deleteContentRange": {"range": _range(start, end, tab_id)}}, + + # 1) newline final + {"insertText": {"location": _loc(start, tab_id), "text": "\n"}}, + + # 2) img2 (queda antes del newline final) + {"insertInlineImage": { + "location": _loc(start, tab_id), + "uri": uri2, + "objectSize": {"width": _dim_pt(w2_pt), "height": _dim_pt(h2_pt)}, + }}, + + # 3) newline entre imágenes (queda antes de img2) + {"insertText": {"location": _loc(start, tab_id), "text": "\n"}}, + + # 4) img1 (queda antes de newline entre imágenes) + {"insertInlineImage": { + "location": _loc(start, tab_id), + "uri": uri1, + "objectSize": {"width": _dim_pt(w1_pt), "height": _dim_pt(h1_pt)}, + }}, + ] + + _batch_update(docs_service, doc_id, reqs) + +# Rocketbot quirk FIX +globals().update(locals()) + +# -------------------- +# MAIN +# -------------------- +try: + # Flags globales: arrancar siempre en False (por si fallamos antes de la función) + _sv("plano1_found", "False") + _sv("plano2_found", "False") + + marker = _gvs("gdoc_marker_planos", "INSERTAR_PLANOS") + + url1 = _gvs("url_plano_producto", "") + url2 = _gvs("url_plano_piso", "") + + cred_path = _gvs("gdoc_sa_json", "") + if cred_path == "": + raise RuntimeError("Falta gdoc_sa_json (ruta a credentials.json)") + if not os.path.isabs(cred_path): + cred_path = os.path.join(base_dir, cred_path) + if not os.path.exists(cred_path): + raise RuntimeError("No existe credentials.json: " + cred_path) + + token_path = _gvs("gdoc_token_json", "") + if token_path == "": + token_path = os.path.join(base_dir, "credentials", "token_gdocs_drive.json") + if not os.path.isabs(token_path): + token_path = os.path.join(base_dir, token_path) + + _sv("gdoc_credentials_used", cred_path) + _sv("gdoc_token_used", token_path) + + docs_service, drive_service, auth_mode = get_services(cred_path, token_path) + _sv("gdoc_auth_mode", auth_mode) + + # doc_id desde URL del navegador + url_doc = "{current_url}" + #if url_doc == "": + # url_doc = _gvs("current_url", "") + #_sv("gdoc_url_used", url_doc) + + doc_id = extract_doc_id_from_url(url_doc) + if doc_id == "": + raise RuntimeError("No pude extraer documentId de la URL: " + url_doc) + + doc_id_final, meta_file, converted = ensure_docs_api_compatible(drive_service, doc_id) + _sv("gdoc_original_id", doc_id) + _sv("gdoc_id", doc_id_final) + _sv("gdoc_file_name", meta_file.get("name", "")) + _sv("gdoc_file_mimeType", meta_file.get("mimeType", "")) + _sv("gdoc_converted", "1" if converted else "0") + _sv("gdoc_url_final", f"https://docs.google.com/document/d/{doc_id_final}/edit") + + insert_planos_at_marker(docs_service, drive_service, doc_id_final, marker, url1, url2) + + _sv("gdoc_planos_status", "OK") + _sv("gdoc_planos_error", "") + +except HttpError as e: + _sv("gdoc_planos_status", "ERROR") + _sv("gdoc_planos_error", "HttpError: " + str(e)) + raise +except Exception as e: + _sv("gdoc_planos_status", "ERROR") + _sv("gdoc_planos_error", str(e)) + raise \ No newline at end of file