from __future__ import annotations import warnings warnings.simplefilter("ignore", ResourceWarning) import os import sys import json import re import ast import unicodedata from typing import Any, Dict, List, Tuple # -------------------- # Boot libs (Rocketbot portable) # -------------------- base_dir = os.path.dirname(sys.executable) libs_dir = os.path.join(base_dir, "py_libs", "py310") sys.path.insert(0, libs_dir) for k in list(sys.modules.keys()): if k == "pyparsing" or k.startswith("pyparsing."): del sys.modules[k] from googleapiclient.discovery import build from google.oauth2 import service_account # ----------------------------------------------------------- # GOOGLE AUTH # ----------------------------------------------------------- # Este placeholder queda igual para Rocketbot. SCOPES = {scopes_api_google} MARCADOR_DEFAULT = "DETALLES_TECNICOS" SIMPLE_BULLET_PRESET = "BULLET_DISC_CIRCLE_SQUARE" # ----------------------------------------------------------- # ROCKETBOT VAR HELPERS # ----------------------------------------------------------- def _sv(name, value): try: SetVar(name, value) except Exception: pass def _missing(v) -> bool: if v is None: return True if isinstance(v, str): s = v.strip() return s == "" or s == "ERROR_NOT_VAR" return False def _gv(name, default=None): try: v = GetVar(name) except Exception: return default return default if _missing(v) else v def _gvs(name, default="") -> str: v = _gv(name, default) if v is None: return default s = str(v).strip() return default if (s == "" or s == "ERROR_NOT_VAR") else s # ----------------------------------------------------------- # GOOGLE HELPERS # ----------------------------------------------------------- def _load_json(path): with open(path, "r", encoding="utf-8") as f: return json.load(f) def get_services(credentials_json_path, impersonated_user): info = _load_json(credentials_json_path) if not isinstance(info, dict) or info.get("type") != "service_account": raise RuntimeError("gdoc_sa_json debe apuntar a un JSON de cuenta de servicio.") impersonated_user = (impersonated_user or "").strip() if not impersonated_user: raise RuntimeError("Falta la variable gdoc_impersonated_user.") creds = service_account.Credentials.from_service_account_file( credentials_json_path, scopes=SCOPES, subject=impersonated_user, ) docs = build("docs", "v1", credentials=creds, cache_discovery=False) drive = build("drive", "v3", credentials=creds, cache_discovery=False) return docs, drive, "service_account_impersonated" def extract_doc_id_from_url(url: str) -> str: m = re.search(r"/document/d/([a-zA-Z0-9_-]+)", url or "") return m.group(1) if m else "" def ensure_docs_api_compatible(drive_service, file_id: str): meta = drive_service.files().get( fileId=file_id, fields="id,name,mimeType,shortcutDetails", supportsAllDrives=True, ).execute() if meta.get("mimeType") == "application/vnd.google-apps.shortcut": target = (meta.get("shortcutDetails") or {}).get("targetId", "") if not target: raise RuntimeError("Es shortcut pero no trae targetId.") file_id = target meta = drive_service.files().get( fileId=file_id, fields="id,name,mimeType", supportsAllDrives=True, ).execute() if meta.get("mimeType") != "application/vnd.google-apps.document": new_name = (meta.get("name") or "Documento") + " (Google Docs)" converted = drive_service.files().copy( fileId=file_id, body={"name": new_name, "mimeType": "application/vnd.google-apps.document"}, fields="id,name,mimeType", supportsAllDrives=True, ).execute() return converted["id"], converted, True return file_id, meta, False def docs_get(docs_service, doc_id: str): return docs_service.documents().get(documentId=doc_id).execute() # ----------------------------------------------------------- # DOC STRUCTURE HELPERS # ----------------------------------------------------------- def _norm_alnum(s: str) -> str: if s is None: return "" s = unicodedata.normalize("NFKD", str(s)).lower() out = [] for ch in s: if unicodedata.category(ch) == "Mn": continue if ch.isalnum(): out.append(ch) return "".join(out) def _walk_tabs(tabs): if not tabs: return for t in tabs: yield t for x in _walk_tabs(t.get("childTabs") or []): yield x def _get_tab_body_content(doc, tab_id): tabs = doc.get("tabs") if not tabs: return (doc.get("body") or {}).get("content", []) for t in _walk_tabs(tabs): tid = ((t.get("tabProperties") or {}).get("tabId")) or "" if tid == (tab_id or ""): dtab = t.get("documentTab") or {} return (dtab.get("body") or {}).get("content", []) return [] def _iter_text_chars_from_body(doc, tab_id): content = _get_tab_body_content(doc, tab_id) def walk(content_list): for el in content_list or []: if not isinstance(el, dict): continue p = el.get("paragraph") if p: for pe in p.get("elements", []): tr = pe.get("textRun") if not tr: continue txt = tr.get("content", "") st = pe.get("startIndex") if st is None or txt is None: continue for i, ch in enumerate(txt): yield int(st) + i, ch continue t = el.get("table") if t: for row in t.get("tableRows", []): for cell in row.get("tableCells", []): yield from walk(cell.get("content", [])) continue toc = el.get("tableOfContents") if toc: yield from walk(toc.get("content", [])) continue yield from walk(content) def find_marker_in_body(doc, marker: str): target = _norm_alnum(marker) if not target: return None tabs = doc.get("tabs") tab_ids = [None] if tabs: tab_ids = [] for t in _walk_tabs(tabs): tid = ((t.get("tabProperties") or {}).get("tabId")) or "" tab_ids.append(tid) for tab_id in tab_ids: norm_chars = [] norm_to_docidx = [] for doc_i, ch in _iter_text_chars_from_body(doc, tab_id): if ch.isalnum(): norm_chars.append(ch.lower()) norm_to_docidx.append(int(doc_i)) norm_text = "".join(norm_chars) pos = norm_text.find(target) if pos != -1: start_doc = norm_to_docidx[pos] end_doc = norm_to_docidx[pos + len(target) - 1] + 1 return { "tabId": tab_id, "start": int(start_doc), "end": int(end_doc), } return None def _loc(index, tab_id=None): d = {"index": int(index)} if tab_id: d["tabId"] = tab_id return d def _range(start, end, tab_id=None): d = {"startIndex": int(start), "endIndex": int(end)} if tab_id: d["tabId"] = tab_id return d # ----------------------------------------------------------- # DATA PARSING / FORMAT HELPERS # ----------------------------------------------------------- def normalizar_espacios(texto: str) -> str: return re.sub(r"\s+", " ", str(texto or "").strip()) def parse_especificaciones(raw: Any) -> List[Dict[str, str]]: if isinstance(raw, list): return raw if raw is None: return [] texto = str(raw).strip() if not texto: return [] try: data = json.loads(texto) if isinstance(data, list): return data except Exception: pass try: data = ast.literal_eval(texto) if isinstance(data, list): return data except Exception: pass raise RuntimeError("No se pudo interpretar especificaciones_tecnicas como JSON ni como lista Python.") def dividir_en_bullets(texto: str) -> List[str]: texto = normalizar_espacios(texto) if not texto: return [] partes = re.split(r"(?<=[.!?])\s+(?=(?:[A-ZÁÉÍÓÚÑÜ]|\d))", texto) return [p.strip() for p in partes if p.strip()] def construir_bloque_formateado(especificaciones_tecnicas: List[Dict[str, str]]): partes: List[str] = [] rangos_titulos: List[Tuple[int, int]] = [] rangos_bullets: List[Tuple[int, int]] = [] cursor = 0 items_validos = [] for item in especificaciones_tecnicas or []: clave = normalizar_espacios((item or {}).get("clave", "")) valor = (item or {}).get("valor", "") if clave: items_validos.append({"clave": clave, "valor": valor}) for i, item in enumerate(items_validos): titulo_completo = item["clave"] bullets = dividir_en_bullets(item.get("valor", "")) ini_titulo = cursor partes.append(titulo_completo) cursor += len(titulo_completo) fin_titulo = cursor rangos_titulos.append((ini_titulo, fin_titulo)) partes.append("\n") cursor += 1 for bullet in bullets: ini_bullet = cursor partes.append(bullet) cursor += len(bullet) fin_bullet = cursor rangos_bullets.append((ini_bullet, fin_bullet)) partes.append("\n") cursor += 1 if i < len(items_validos) - 1: partes.append("\n") cursor += 1 return "".join(partes), rangos_titulos, rangos_bullets # ----------------------------------------------------------- # MAIN ACTION # ----------------------------------------------------------- def reemplazar_detalles_tecnicos( docs_service, document_id: str, marcador: str, especificaciones_tecnicas: List[Dict[str, str]], ): documento = docs_get(docs_service, document_id) hit = find_marker_in_body(documento, marcador) if not hit: raise RuntimeError(f"No se encontró el marcador en el BODY del documento: {marcador}") marcador_inicio = int(hit["start"]) marcador_fin = int(hit["end"]) tab_id = hit.get("tabId") texto_insertar, rangos_titulos, rangos_bullets = construir_bloque_formateado(especificaciones_tecnicas) if not texto_insertar.strip(): raise RuntimeError("especificaciones_tecnicas está vacío o no contiene elementos válidos.") docs_service.documents().batchUpdate( documentId=document_id, body={ "requests": [ { "deleteContentRange": { "range": _range(marcador_inicio, marcador_fin, tab_id) } } ] }, ).execute() docs_service.documents().batchUpdate( documentId=document_id, body={ "requests": [ { "insertText": { "location": _loc(marcador_inicio, tab_id), "text": texto_insertar, } } ] }, ).execute() requests_estilo = [] # 1) cada título queda como texto normal del documento, con su número real incluido y en negrita. for ini, fin in rangos_titulos: requests_estilo.append( { "updateTextStyle": { "range": _range(marcador_inicio + ini, marcador_inicio + fin, tab_id), "textStyle": {"bold": True}, "fields": "bold", } } ) # 2) solo los detalles van con viñeta simple. for ini, fin in rangos_bullets: requests_estilo.append( { "createParagraphBullets": { "range": _range(marcador_inicio + ini, marcador_inicio + fin + 1, tab_id), "bulletPreset": SIMPLE_BULLET_PRESET, } } ) if requests_estilo: docs_service.documents().batchUpdate( documentId=document_id, body={"requests": requests_estilo}, ).execute() return { "ok": True, "document_id": document_id, "marcador": marcador, "titulos_formateados": len(rangos_titulos), "bullets_formateados": len(rangos_bullets), "texto_insertado": texto_insertar, } # Rocketbot quirk FIX globals().update(locals()) # ----------------------------------------------------------- # MAIN # ----------------------------------------------------------- try: _sv("error_detalles_tecnicos", "") url_doc = _gvs("current_url", "") if url_doc == "": raise RuntimeError("Falta current_url con la URL exacta del documento a procesar.") raw_id = extract_doc_id_from_url(url_doc) if not raw_id: raise RuntimeError("No pude extraer documentId de la URL.") cred_path = _gvs("gdoc_sa_json", "") if cred_path == "": raise RuntimeError("Falta la variable gdoc_sa_json.") if not os.path.isabs(cred_path): cred_path = os.path.join(base_dir, cred_path) if not os.path.exists(cred_path): raise RuntimeError("No existe JSON de cuenta de servicio: " + cred_path) impersonated_user = _gvs("gdoc_impersonated_user", "") docs_service, drive_service, auth_mode = get_services(cred_path, impersonated_user) _sv("gdoc_auth_mode", auth_mode) _sv("gdoc_impersonated_user_used", impersonated_user) doc_id, meta_file, converted = ensure_docs_api_compatible(drive_service, raw_id) _sv("gdoc_id", doc_id) _sv("gdoc_converted", "1" if converted else "0") _sv("gdoc_url_final", f"https://docs.google.com/document/d/{doc_id}/edit") _sv("gdoc_name", meta_file.get("name", "")) marcador = _gvs("marker_detalles_tecnicos", MARCADOR_DEFAULT) _sv("gdoc_marker_detalles_tecnicos_used", marcador) raw_especificaciones = globals().get("especificaciones_tecnicas", None) if raw_especificaciones is None: raw_especificaciones = _gv("especificaciones_tecnicas", None) especificaciones_tecnicas = parse_especificaciones(raw_especificaciones) resultado = reemplazar_detalles_tecnicos( docs_service=docs_service, document_id=doc_id, marcador=marcador, especificaciones_tecnicas=especificaciones_tecnicas, ) _sv("status_proceso", "DETALLES_TECNICOS insertado correctamente") _sv("detalles_tecnicos_insertados", "1") _sv("detalles_tecnicos_resultado", json.dumps(resultado, ensure_ascii=False)) print("DETALLES_TECNICOS insertado correctamente") print(json.dumps(resultado, ensure_ascii=False, indent=2)) except Exception as e: import traceback error_msg = f"Error en script: {str(e)} - {traceback.format_exc()}" print(error_msg) _sv("status_proceso", "ERROR") _sv("error_detalles_tecnicos", error_msg) raise