from __future__ import annotations import warnings warnings.simplefilter("ignore", ResourceWarning) import os import sys import json import re # -------------------- # Boot libs (Rocketbot portable) # -------------------- base_dir = os.path.dirname(sys.executable) libs_dir = os.path.join(base_dir, "py_libs", "py310") sys.path.insert(0, libs_dir) for k in list(sys.modules.keys()): if k == "pyparsing" or k.startswith("pyparsing."): del sys.modules[k] from googleapiclient.discovery import build from google.oauth2 import service_account from google.oauth2.credentials import Credentials from google_auth_oauthlib.flow import InstalledAppFlow from google.auth.transport.requests import Request as GRequest # ----------------------------------------------------------- # GOOGLE AUTH # ----------------------------------------------------------- SCOPES = {scopes_api_google} def _load_json(path): with open(path, "r", encoding="utf-8") as f: return json.load(f) def get_services(credentials_json_path, token_json_path): info = _load_json(credentials_json_path) if isinstance(info, dict) and info.get("type") == "service_account": creds = service_account.Credentials.from_service_account_file( credentials_json_path, scopes=SCOPES, ) docs = build("docs", "v1", credentials=creds, cache_discovery=False) drive = build("drive", "v3", credentials=creds, cache_discovery=False) return docs, drive, "service_account" creds = None if os.path.exists(token_json_path): creds = Credentials.from_authorized_user_file(token_json_path, SCOPES) if (not creds) or (not creds.valid): if creds and creds.expired and creds.refresh_token: creds.refresh(GRequest()) else: flow = InstalledAppFlow.from_client_secrets_file(credentials_json_path, SCOPES) try: creds = flow.run_local_server(port=0) except Exception: creds = flow.run_console() os.makedirs(os.path.dirname(token_json_path), exist_ok=True) with open(token_json_path, "w", encoding="utf-8") as f: f.write(creds.to_json()) docs = build("docs", "v1", credentials=creds, cache_discovery=False) drive = build("drive", "v3", credentials=creds, cache_discovery=False) return docs, drive, "oauth" # ----------------------------------------------------------- # ROCKETBOT VAR HELPERS # ----------------------------------------------------------- def _sv(name, value): try: SetVar(name, value) except Exception: pass def _missing(v) -> bool: if v is None: return True if isinstance(v, str): s = v.strip() return s == "" or s == "ERROR_NOT_VAR" return False def _gv(name, default=None): try: v = GetVar(name) except Exception: return default return default if _missing(v) else v def _gvs(name, default="") -> str: v = _gv(name, default) if v is None: return default s = str(v).strip() return default if (s == "" or s == "ERROR_NOT_VAR") else s # ----------------------------------------------------------- # DRIVE / DOC HELPERS # ----------------------------------------------------------- def extract_doc_id_from_url(url): m = re.search(r"/document/d/([a-zA-Z0-9_-]+)", url or "") return m.group(1) if m else "" def ensure_docs_api_compatible(drive_service, file_id: str): meta = drive_service.files().get( fileId=file_id, fields="id,name,mimeType,shortcutDetails", supportsAllDrives=True, ).execute() if meta.get("mimeType") == "application/vnd.google-apps.shortcut": target = (meta.get("shortcutDetails") or {}).get("targetId", "") if not target: raise RuntimeError("Es shortcut pero no trae targetId.") file_id = target meta = drive_service.files().get( fileId=file_id, fields="id,name,mimeType", supportsAllDrives=True, ).execute() if meta.get("mimeType") != "application/vnd.google-apps.document": new_name = (meta.get("name") or "Documento") + " (Google Docs)" converted = drive_service.files().copy( fileId=file_id, body={"name": new_name, "mimeType": "application/vnd.google-apps.document"}, fields="id,name,mimeType", supportsAllDrives=True, ).execute() return converted["id"], converted, True return file_id, meta, False def docs_get(docs_service, doc_id: str): return docs_service.documents().get(documentId=doc_id).execute() # ----------------------------------------------------------- # BUSINESS LOGIC # ----------------------------------------------------------- def obtener_textos_reemplazo(firma_digital: str): firma_digital = str(firma_digital).strip().upper() if firma_digital == "TRUE": texto_clausula = ( "Forman parte integrante e inseparable del presente contrato los Anexos " "que se detallan a continuación, los cuales se consideran plenamente " "conocidos y aceptados por las partes con la sola suscripción del " "presente instrumento. " ) texto_firmas_secundarias = "" else: texto_clausula = ( "Porción del presente contrato debiendo ser firmados como parte " "integrante los siguientes ANEXOS:\n" ) texto_firmas_secundarias = ( "___________________________________\t\t_________________________________\n" "COMPRADOR\t\t\t\t\t\tVENDEDOR" ) return texto_clausula, texto_firmas_secundarias def obtener_texto_e_indices(documento: dict): texto = [] indices = [] def recorrer_elementos(elementos): for el in elementos or []: parrafo = el.get("paragraph") if parrafo: for pe in parrafo.get("elements", []): text_run = pe.get("textRun") if not text_run: continue contenido = text_run.get("content", "") start_index = pe.get("startIndex") if start_index is None: continue for i, ch in enumerate(contenido): texto.append(ch) indices.append(start_index + i) tabla = el.get("table") if tabla: for fila in tabla.get("tableRows", []): for celda in fila.get("tableCells", []): recorrer_elementos(celda.get("content", [])) toc = el.get("tableOfContents") if toc: recorrer_elementos(toc.get("content", [])) recorrer_elementos((documento.get("body") or {}).get("content", [])) return "".join(texto), indices def quitar_negrita_marker_y_siguiente_char(docs_service, document_id: str, marker_text: str): documento = docs_get(docs_service, document_id) texto_completo, indices = obtener_texto_e_indices(documento) if not marker_text: return 0 requests = [] pos = 0 while True: idx = texto_completo.find(marker_text, pos) if idx == -1: break start_index = indices[idx] end_pos = idx + len(marker_text) if end_pos < len(indices): end_index = indices[end_pos] + 1 else: end_index = indices[end_pos - 1] + 1 requests.append({ "updateTextStyle": { "range": { "startIndex": start_index, "endIndex": end_index, }, "textStyle": { "bold": False, }, "fields": "bold", } }) pos = idx + len(marker_text) if requests: docs_service.documents().batchUpdate( documentId=document_id, body={"requests": requests}, ).execute() return len(requests) def reemplazar_clausula_y_firmas(docs_service, document_id: str, firma_digital: str, marker_clausula: str = "CLAUSULA_FIRMA_DIGITAL", marker_firmas: str = "FIRMAS_SECUNDARIAS"): texto_clausula, texto_firmas_secundarias = obtener_textos_reemplazo(firma_digital) requests = [ { "replaceAllText": { "containsText": { "text": marker_clausula, "matchCase": True, }, "replaceText": texto_clausula, } }, { "replaceAllText": { "containsText": { "text": marker_firmas, "matchCase": True, }, "replaceText": texto_firmas_secundarias, } }, ] return docs_service.documents().batchUpdate( documentId=document_id, body={"requests": requests}, ).execute() # Rocketbot quirk FIX globals().update(locals()) # ----------------------------------------------------------- # MAIN # ----------------------------------------------------------- try: _sv("error_firma_digital", "") url_doc = _gvs("current_url", "") if url_doc == "": raise RuntimeError("Falta current_url con la URL exacta del documento a procesar.") _sv("debug_current_url_var", url_doc) raw_id = extract_doc_id_from_url(url_doc) if not raw_id: raise RuntimeError("No pude extraer documentId de la URL.") _sv("gdoc_original_id", raw_id) cred_path = _gvs("gdoc_sa_json", "") if cred_path == "": raise RuntimeError("Falta la variable gdoc_sa_json.") if not os.path.isabs(cred_path): cred_path = os.path.join(base_dir, cred_path) if not os.path.exists(cred_path): raise RuntimeError("No existe credentials.json: " + cred_path) token_path = _gvs("gdoc_token_json", "") if token_path == "": token_path = os.path.join(base_dir, "credentials", "token_gdocs_drive.json") if not os.path.isabs(token_path): token_path = os.path.join(base_dir, token_path) docs_service, drive_service, auth_mode = get_services(cred_path, token_path) _sv("gdoc_auth_mode", auth_mode) doc_id, meta_file, converted = ensure_docs_api_compatible(drive_service, raw_id) _sv("gdoc_id", doc_id) _sv("gdoc_converted", "1" if converted else "0") _sv("gdoc_url_final", f"https://docs.google.com/document/d/{doc_id}/edit") _sv("gdoc_name", meta_file.get("name", "")) doc_before = docs_get(docs_service, doc_id) _sv("gdoc_revision_before", doc_before.get("revisionId", "")) firma_digital = _gvs("firma_digital", "") marker_clausula = _gvs("marker_clausula_firma_digital", "CLAUSULA_FIRMA_DIGITAL") marker_firmas = _gvs("marker_firmas_secundarias", "FIRMAS_SECUNDARIAS") if firma_digital == "": raise RuntimeError("Falta la variable firma_digital.") _sv("firma_digital_usada", firma_digital) _sv("marker_clausula_firma_digital_usado", marker_clausula) _sv("marker_firmas_secundarias_usado", marker_firmas) ocurrencias_desnegritadas = quitar_negrita_marker_y_siguiente_char( docs_service=docs_service, document_id=doc_id, marker_text=marker_clausula, ) _sv("marker_clausula_desnegritado_previo", str(ocurrencias_desnegritadas)) resultado = reemplazar_clausula_y_firmas( docs_service=docs_service, document_id=doc_id, firma_digital=firma_digital, marker_clausula=marker_clausula, marker_firmas=marker_firmas, ) replies = resultado.get("replies") or [] ocurrencias_clausula = 0 ocurrencias_firmas = 0 if len(replies) > 0: ocurrencias_clausula = int(((replies[0] or {}).get("replaceAllText") or {}).get("occurrencesChanged", 0)) if len(replies) > 1: ocurrencias_firmas = int(((replies[1] or {}).get("replaceAllText") or {}).get("occurrencesChanged", 0)) _sv("reemplazos_clausula_firma_digital", str(ocurrencias_clausula)) _sv("reemplazos_firmas_secundarias", str(ocurrencias_firmas)) _sv("status_proceso", "Cláusula y firmas reemplazadas correctamente") print("Reemplazos realizados correctamente.") print(resultado) except Exception as e: import traceback error_msg = f"Error en script: {str(e)} - {traceback.format_exc()}" print(error_msg) _sv("status_proceso", "ERROR") _sv("error_firma_digital", error_msg) raise