From a35e86c318d8a36a4f5caef1dff04c8e8828eea6 Mon Sep 17 00:00:00 2001 From: dulcefigueredo Date: Wed, 22 Apr 2026 19:35:58 +0000 Subject: [PATCH] Subir archivos a "/" MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit añado el script que reemplaza un marcador para el anexo II sobre inventarios --- detalles_tecnicos.py | 547 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 547 insertions(+) create mode 100644 detalles_tecnicos.py diff --git a/detalles_tecnicos.py b/detalles_tecnicos.py new file mode 100644 index 0000000..ade10bb --- /dev/null +++ b/detalles_tecnicos.py @@ -0,0 +1,547 @@ +from __future__ import annotations + +import warnings +warnings.simplefilter("ignore", ResourceWarning) + +import os +import sys +import json +import re +import ast +import unicodedata +from typing import Any, Dict, List, Tuple + +# -------------------- +# Boot libs (Rocketbot portable) +# -------------------- +base_dir = os.path.dirname(sys.executable) +libs_dir = os.path.join(base_dir, "py_libs", "py310") +sys.path.insert(0, libs_dir) + +for k in list(sys.modules.keys()): + if k == "pyparsing" or k.startswith("pyparsing."): + del sys.modules[k] + +from googleapiclient.discovery import build +from google.oauth2 import service_account +from google.oauth2.credentials import Credentials +from google_auth_oauthlib.flow import InstalledAppFlow +from google.auth.transport.requests import Request as GRequest + +# ----------------------------------------------------------- +# GOOGLE AUTH +# ----------------------------------------------------------- +# Este placeholder queda igual para Rocketbot. +SCOPES = {scopes_api_google} + +MARCADOR_DEFAULT = "DETALLES_TECNICOS" +SIMPLE_BULLET_PRESET = "BULLET_DISC_CIRCLE_SQUARE" + + +# ----------------------------------------------------------- +# ROCKETBOT VAR HELPERS +# ----------------------------------------------------------- +def _sv(name, value): + try: + SetVar(name, value) + except Exception: + pass + + +def _missing(v) -> bool: + if v is None: + return True + if isinstance(v, str): + s = v.strip() + return s == "" or s == "ERROR_NOT_VAR" + return False + + +def _gv(name, default=None): + try: + v = GetVar(name) + except Exception: + return default + return default if _missing(v) else v + + +def _gvs(name, default="") -> str: + v = _gv(name, default) + if v is None: + return default + s = str(v).strip() + return default if (s == "" or s == "ERROR_NOT_VAR") else s + + +# ----------------------------------------------------------- +# GOOGLE HELPERS +# ----------------------------------------------------------- +def _load_json(path): + with open(path, "r", encoding="utf-8") as f: + return json.load(f) + + + +def get_services(credentials_json_path, token_json_path): + info = _load_json(credentials_json_path) + + if isinstance(info, dict) and info.get("type") == "service_account": + creds = service_account.Credentials.from_service_account_file( + credentials_json_path, + scopes=SCOPES, + ) + docs = build("docs", "v1", credentials=creds, cache_discovery=False) + drive = build("drive", "v3", credentials=creds, cache_discovery=False) + return docs, drive, "service_account" + + creds = None + if os.path.exists(token_json_path): + creds = Credentials.from_authorized_user_file(token_json_path, SCOPES) + + if (not creds) or (not creds.valid): + if creds and creds.expired and creds.refresh_token: + creds.refresh(GRequest()) + else: + flow = InstalledAppFlow.from_client_secrets_file(credentials_json_path, SCOPES) + try: + creds = flow.run_local_server(port=0) + except Exception: + creds = flow.run_console() + + os.makedirs(os.path.dirname(token_json_path), exist_ok=True) + with open(token_json_path, "w", encoding="utf-8") as f: + f.write(creds.to_json()) + + docs = build("docs", "v1", credentials=creds, cache_discovery=False) + drive = build("drive", "v3", credentials=creds, cache_discovery=False) + return docs, drive, "oauth" + + + +def extract_doc_id_from_url(url: str) -> str: + m = re.search(r"/document/d/([a-zA-Z0-9_-]+)", url or "") + return m.group(1) if m else "" + + + +def ensure_docs_api_compatible(drive_service, file_id: str): + meta = drive_service.files().get( + fileId=file_id, + fields="id,name,mimeType,shortcutDetails", + supportsAllDrives=True, + ).execute() + + if meta.get("mimeType") == "application/vnd.google-apps.shortcut": + target = (meta.get("shortcutDetails") or {}).get("targetId", "") + if not target: + raise RuntimeError("Es shortcut pero no trae targetId.") + file_id = target + meta = drive_service.files().get( + fileId=file_id, + fields="id,name,mimeType", + supportsAllDrives=True, + ).execute() + + if meta.get("mimeType") != "application/vnd.google-apps.document": + new_name = (meta.get("name") or "Documento") + " (Google Docs)" + converted = drive_service.files().copy( + fileId=file_id, + body={"name": new_name, "mimeType": "application/vnd.google-apps.document"}, + fields="id,name,mimeType", + supportsAllDrives=True, + ).execute() + return converted["id"], converted, True + + return file_id, meta, False + + + +def docs_get(docs_service, doc_id: str): + return docs_service.documents().get(documentId=doc_id).execute() + + +# ----------------------------------------------------------- +# DOC STRUCTURE HELPERS +# ----------------------------------------------------------- +def _norm_alnum(s: str) -> str: + if s is None: + return "" + s = unicodedata.normalize("NFKD", str(s)).lower() + out = [] + for ch in s: + if unicodedata.category(ch) == "Mn": + continue + if ch.isalnum(): + out.append(ch) + return "".join(out) + + + +def _walk_tabs(tabs): + if not tabs: + return + for t in tabs: + yield t + for x in _walk_tabs(t.get("childTabs") or []): + yield x + + + +def _get_tab_body_content(doc, tab_id): + tabs = doc.get("tabs") + if not tabs: + return (doc.get("body") or {}).get("content", []) + + for t in _walk_tabs(tabs): + tid = ((t.get("tabProperties") or {}).get("tabId")) or "" + if tid == (tab_id or ""): + dtab = t.get("documentTab") or {} + return (dtab.get("body") or {}).get("content", []) + + return [] + + + +def _iter_text_chars_from_body(doc, tab_id): + content = _get_tab_body_content(doc, tab_id) + + def walk(content_list): + for el in content_list or []: + if not isinstance(el, dict): + continue + + p = el.get("paragraph") + if p: + for pe in p.get("elements", []): + tr = pe.get("textRun") + if not tr: + continue + txt = tr.get("content", "") + st = pe.get("startIndex") + if st is None or txt is None: + continue + for i, ch in enumerate(txt): + yield int(st) + i, ch + continue + + t = el.get("table") + if t: + for row in t.get("tableRows", []): + for cell in row.get("tableCells", []): + yield from walk(cell.get("content", [])) + continue + + toc = el.get("tableOfContents") + if toc: + yield from walk(toc.get("content", [])) + continue + + yield from walk(content) + + + +def find_marker_in_body(doc, marker: str): + target = _norm_alnum(marker) + if not target: + return None + + tabs = doc.get("tabs") + tab_ids = [None] + if tabs: + tab_ids = [] + for t in _walk_tabs(tabs): + tid = ((t.get("tabProperties") or {}).get("tabId")) or "" + tab_ids.append(tid) + + for tab_id in tab_ids: + norm_chars = [] + norm_to_docidx = [] + + for doc_i, ch in _iter_text_chars_from_body(doc, tab_id): + if ch.isalnum(): + norm_chars.append(ch.lower()) + norm_to_docidx.append(int(doc_i)) + + norm_text = "".join(norm_chars) + pos = norm_text.find(target) + + if pos != -1: + start_doc = norm_to_docidx[pos] + end_doc = norm_to_docidx[pos + len(target) - 1] + 1 + return { + "tabId": tab_id, + "start": int(start_doc), + "end": int(end_doc), + } + + return None + + + +def _loc(index, tab_id=None): + d = {"index": int(index)} + if tab_id: + d["tabId"] = tab_id + return d + + + +def _range(start, end, tab_id=None): + d = {"startIndex": int(start), "endIndex": int(end)} + if tab_id: + d["tabId"] = tab_id + return d + + +# ----------------------------------------------------------- +# DATA PARSING / FORMAT HELPERS +# ----------------------------------------------------------- +def normalizar_espacios(texto: str) -> str: + return re.sub(r"\s+", " ", str(texto or "").strip()) + + + +def parse_especificaciones(raw: Any) -> List[Dict[str, str]]: + if isinstance(raw, list): + return raw + + if raw is None: + return [] + + texto = str(raw).strip() + if not texto: + return [] + + try: + data = json.loads(texto) + if isinstance(data, list): + return data + except Exception: + pass + + try: + data = ast.literal_eval(texto) + if isinstance(data, list): + return data + except Exception: + pass + + raise RuntimeError("No se pudo interpretar especificaciones_tecnicas como JSON ni como lista Python.") + + + +def dividir_en_bullets(texto: str) -> List[str]: + texto = normalizar_espacios(texto) + if not texto: + return [] + + partes = re.split(r"(?<=[.!?])\s+(?=(?:[A-ZÁÉÍÓÚÑÜ]|\d))", texto) + return [p.strip() for p in partes if p.strip()] + + + +def construir_bloque_formateado(especificaciones_tecnicas: List[Dict[str, str]]): + partes: List[str] = [] + rangos_titulos: List[Tuple[int, int]] = [] + rangos_bullets: List[Tuple[int, int]] = [] + cursor = 0 + + items_validos = [] + for item in especificaciones_tecnicas or []: + clave = normalizar_espacios((item or {}).get("clave", "")) + valor = (item or {}).get("valor", "") + if clave: + items_validos.append({"clave": clave, "valor": valor}) + + for i, item in enumerate(items_validos): + titulo_completo = item["clave"] + bullets = dividir_en_bullets(item.get("valor", "")) + + ini_titulo = cursor + partes.append(titulo_completo) + cursor += len(titulo_completo) + fin_titulo = cursor + rangos_titulos.append((ini_titulo, fin_titulo)) + + partes.append("\n") + cursor += 1 + + for bullet in bullets: + ini_bullet = cursor + partes.append(bullet) + cursor += len(bullet) + fin_bullet = cursor + rangos_bullets.append((ini_bullet, fin_bullet)) + + partes.append("\n") + cursor += 1 + + if i < len(items_validos) - 1: + partes.append("\n") + cursor += 1 + + return "".join(partes), rangos_titulos, rangos_bullets + + +# ----------------------------------------------------------- +# MAIN ACTION +# ----------------------------------------------------------- +def reemplazar_detalles_tecnicos( + docs_service, + document_id: str, + marcador: str, + especificaciones_tecnicas: List[Dict[str, str]], +): + documento = docs_get(docs_service, document_id) + hit = find_marker_in_body(documento, marcador) + if not hit: + raise RuntimeError(f"No se encontró el marcador en el BODY del documento: {marcador}") + + marcador_inicio = int(hit["start"]) + marcador_fin = int(hit["end"]) + tab_id = hit.get("tabId") + + texto_insertar, rangos_titulos, rangos_bullets = construir_bloque_formateado(especificaciones_tecnicas) + if not texto_insertar.strip(): + raise RuntimeError("especificaciones_tecnicas está vacío o no contiene elementos válidos.") + + docs_service.documents().batchUpdate( + documentId=document_id, + body={ + "requests": [ + { + "deleteContentRange": { + "range": _range(marcador_inicio, marcador_fin, tab_id) + } + } + ] + }, + ).execute() + + docs_service.documents().batchUpdate( + documentId=document_id, + body={ + "requests": [ + { + "insertText": { + "location": _loc(marcador_inicio, tab_id), + "text": texto_insertar, + } + } + ] + }, + ).execute() + + requests_estilo = [] + + # 1) cada título queda como texto normal del documento, con su número real incluido y en negrita. + for ini, fin in rangos_titulos: + requests_estilo.append( + { + "updateTextStyle": { + "range": _range(marcador_inicio + ini, marcador_inicio + fin, tab_id), + "textStyle": {"bold": True}, + "fields": "bold", + } + } + ) + + # 2) solo los detalles van con viñeta simple. + for ini, fin in rangos_bullets: + requests_estilo.append( + { + "createParagraphBullets": { + "range": _range(marcador_inicio + ini, marcador_inicio + fin + 1, tab_id), + "bulletPreset": SIMPLE_BULLET_PRESET, + } + } + ) + + if requests_estilo: + docs_service.documents().batchUpdate( + documentId=document_id, + body={"requests": requests_estilo}, + ).execute() + + return { + "ok": True, + "document_id": document_id, + "marcador": marcador, + "titulos_formateados": len(rangos_titulos), + "bullets_formateados": len(rangos_bullets), + "texto_insertado": texto_insertar, + } + + +# Rocketbot quirk FIX +globals().update(locals()) + + +# ----------------------------------------------------------- +# MAIN +# ----------------------------------------------------------- +try: + _sv("error_detalles_tecnicos", "") + + url_doc = _gvs("current_url", "") + if url_doc == "": + raise RuntimeError("Falta current_url con la URL exacta del documento a procesar.") + + raw_id = extract_doc_id_from_url(url_doc) + if not raw_id: + raise RuntimeError("No pude extraer documentId de la URL.") + + cred_path = _gvs("gdoc_sa_json", "") + if cred_path == "": + raise RuntimeError("Falta la variable gdoc_sa_json.") + if not os.path.isabs(cred_path): + cred_path = os.path.join(base_dir, cred_path) + if not os.path.exists(cred_path): + raise RuntimeError("No existe credentials.json: " + cred_path) + + token_path = _gvs("gdoc_token_json", "") + if token_path == "": + token_path = os.path.join(base_dir, "credentials", "token_gdocs_drive.json") + if not os.path.isabs(token_path): + token_path = os.path.join(base_dir, token_path) + + docs_service, drive_service, auth_mode = get_services(cred_path, token_path) + _sv("gdoc_auth_mode", auth_mode) + + doc_id, meta_file, converted = ensure_docs_api_compatible(drive_service, raw_id) + _sv("gdoc_id", doc_id) + _sv("gdoc_converted", "1" if converted else "0") + _sv("gdoc_url_final", f"https://docs.google.com/document/d/{doc_id}/edit") + _sv("gdoc_name", meta_file.get("name", "")) + + marcador = _gvs("marker_detalles_tecnicos", MARCADOR_DEFAULT) + _sv("gdoc_marker_detalles_tecnicos_used", marcador) + + raw_especificaciones = globals().get("especificaciones_tecnicas", None) + if raw_especificaciones is None: + raw_especificaciones = _gv("especificaciones_tecnicas", None) + + especificaciones_tecnicas = parse_especificaciones(raw_especificaciones) + + resultado = reemplazar_detalles_tecnicos( + docs_service=docs_service, + document_id=doc_id, + marcador=marcador, + especificaciones_tecnicas=especificaciones_tecnicas, + ) + + _sv("status_proceso", "DETALLES_TECNICOS insertado correctamente") + _sv("detalles_tecnicos_insertados", "1") + _sv("detalles_tecnicos_resultado", json.dumps(resultado, ensure_ascii=False)) + + print("DETALLES_TECNICOS insertado correctamente") + print(json.dumps(resultado, ensure_ascii=False, indent=2)) + +except Exception as e: + import traceback + + error_msg = f"Error en script: {str(e)} - {traceback.format_exc()}" + print(error_msg) + _sv("status_proceso", "ERROR") + _sv("error_detalles_tecnicos", error_msg) + raise