diff --git a/firma_digital_personalizacion.py b/firma_digital_personalizacion.py new file mode 100644 index 0000000..a3ae38c --- /dev/null +++ b/firma_digital_personalizacion.py @@ -0,0 +1,393 @@ +from __future__ import annotations + +import warnings +warnings.simplefilter("ignore", ResourceWarning) + +import os +import sys +import json +import re + +# -------------------- +# Boot libs (Rocketbot portable) +# -------------------- +base_dir = os.path.dirname(sys.executable) +libs_dir = os.path.join(base_dir, "py_libs", "py310") +sys.path.insert(0, libs_dir) + +for k in list(sys.modules.keys()): + if k == "pyparsing" or k.startswith("pyparsing."): + del sys.modules[k] + +from googleapiclient.discovery import build +from google.oauth2 import service_account +from google.oauth2.credentials import Credentials +from google_auth_oauthlib.flow import InstalledAppFlow +from google.auth.transport.requests import Request as GRequest + + +# ----------------------------------------------------------- +# GOOGLE AUTH +# ----------------------------------------------------------- +SCOPES = {scopes_api_google} + + +def _load_json(path): + with open(path, "r", encoding="utf-8") as f: + return json.load(f) + + +def get_services(credentials_json_path, token_json_path): + info = _load_json(credentials_json_path) + + if isinstance(info, dict) and info.get("type") == "service_account": + creds = service_account.Credentials.from_service_account_file( + credentials_json_path, + scopes=SCOPES, + ) + docs = build("docs", "v1", credentials=creds, cache_discovery=False) + drive = build("drive", "v3", credentials=creds, cache_discovery=False) + return docs, drive, "service_account" + + creds = None + if os.path.exists(token_json_path): + creds = Credentials.from_authorized_user_file(token_json_path, SCOPES) + + if (not creds) or (not creds.valid): + if creds and creds.expired and creds.refresh_token: + creds.refresh(GRequest()) + else: + flow = InstalledAppFlow.from_client_secrets_file(credentials_json_path, SCOPES) + try: + creds = flow.run_local_server(port=0) + except Exception: + creds = flow.run_console() + + os.makedirs(os.path.dirname(token_json_path), exist_ok=True) + with open(token_json_path, "w", encoding="utf-8") as f: + f.write(creds.to_json()) + + docs = build("docs", "v1", credentials=creds, cache_discovery=False) + drive = build("drive", "v3", credentials=creds, cache_discovery=False) + return docs, drive, "oauth" + + +# ----------------------------------------------------------- +# ROCKETBOT VAR HELPERS +# ----------------------------------------------------------- +def _sv(name, value): + try: + SetVar(name, value) + except Exception: + pass + + +def _missing(v) -> bool: + if v is None: + return True + if isinstance(v, str): + s = v.strip() + return s == "" or s == "ERROR_NOT_VAR" + return False + + +def _gv(name, default=None): + try: + v = GetVar(name) + except Exception: + return default + return default if _missing(v) else v + + +def _gvs(name, default="") -> str: + v = _gv(name, default) + if v is None: + return default + s = str(v).strip() + return default if (s == "" or s == "ERROR_NOT_VAR") else s + + +# ----------------------------------------------------------- +# DRIVE / DOC HELPERS +# ----------------------------------------------------------- +def extract_doc_id_from_url(url): + m = re.search(r"/document/d/([a-zA-Z0-9_-]+)", url or "") + return m.group(1) if m else "" + + + +def ensure_docs_api_compatible(drive_service, file_id: str): + meta = drive_service.files().get( + fileId=file_id, + fields="id,name,mimeType,shortcutDetails", + supportsAllDrives=True, + ).execute() + + if meta.get("mimeType") == "application/vnd.google-apps.shortcut": + target = (meta.get("shortcutDetails") or {}).get("targetId", "") + if not target: + raise RuntimeError("Es shortcut pero no trae targetId.") + file_id = target + meta = drive_service.files().get( + fileId=file_id, + fields="id,name,mimeType", + supportsAllDrives=True, + ).execute() + + if meta.get("mimeType") != "application/vnd.google-apps.document": + new_name = (meta.get("name") or "Documento") + " (Google Docs)" + converted = drive_service.files().copy( + fileId=file_id, + body={"name": new_name, "mimeType": "application/vnd.google-apps.document"}, + fields="id,name,mimeType", + supportsAllDrives=True, + ).execute() + return converted["id"], converted, True + + return file_id, meta, False + + + +def docs_get(docs_service, doc_id: str): + return docs_service.documents().get(documentId=doc_id).execute() + + +# ----------------------------------------------------------- +# BUSINESS LOGIC +# ----------------------------------------------------------- +def obtener_textos_reemplazo(firma_digital: str): + firma_digital = (firma_digital or "").strip().upper() + + if firma_digital == "V": + texto_clausula = ( + "Forman parte integrante e inseparable del presente contrato los Anexos " + "que se detallan a continuación, los cuales se consideran plenamente " + "conocidos y aceptados por las partes con la sola suscripción del " + "presente instrumento. " + ) + texto_firmas_secundarias = "" + else: + texto_clausula = ( + "Porción del presente contrato debiendo ser firmados como parte " + "integrante los siguientes ANEXOS:\n" + ) + texto_firmas_secundarias = ( + "___________________________________\t\t_________________________________\n" + "COMPRADOR\t\t\t\t\t\tVENDEDOR" + ) + + return texto_clausula, texto_firmas_secundarias + + + +def obtener_texto_e_indices(documento: dict): + texto = [] + indices = [] + + def recorrer_elementos(elementos): + for el in elementos or []: + parrafo = el.get("paragraph") + if parrafo: + for pe in parrafo.get("elements", []): + text_run = pe.get("textRun") + if not text_run: + continue + contenido = text_run.get("content", "") + start_index = pe.get("startIndex") + if start_index is None: + continue + for i, ch in enumerate(contenido): + texto.append(ch) + indices.append(start_index + i) + + tabla = el.get("table") + if tabla: + for fila in tabla.get("tableRows", []): + for celda in fila.get("tableCells", []): + recorrer_elementos(celda.get("content", [])) + + toc = el.get("tableOfContents") + if toc: + recorrer_elementos(toc.get("content", [])) + + recorrer_elementos((documento.get("body") or {}).get("content", [])) + return "".join(texto), indices + + + +def quitar_negrita_marker_y_siguiente_char(docs_service, document_id: str, marker_text: str): + documento = docs_get(docs_service, document_id) + texto_completo, indices = obtener_texto_e_indices(documento) + + if not marker_text: + return 0 + + requests = [] + pos = 0 + while True: + idx = texto_completo.find(marker_text, pos) + if idx == -1: + break + + start_index = indices[idx] + end_pos = idx + len(marker_text) + if end_pos < len(indices): + end_index = indices[end_pos] + 1 + else: + end_index = indices[end_pos - 1] + 1 + + requests.append({ + "updateTextStyle": { + "range": { + "startIndex": start_index, + "endIndex": end_index, + }, + "textStyle": { + "bold": False, + }, + "fields": "bold", + } + }) + pos = idx + len(marker_text) + + if requests: + docs_service.documents().batchUpdate( + documentId=document_id, + body={"requests": requests}, + ).execute() + + return len(requests) + + +def reemplazar_clausula_y_firmas(docs_service, document_id: str, firma_digital: str, + marker_clausula: str = "CLAUSULA_FIRMA_DIGITAL", + marker_firmas: str = "FIRMAS_SECUNDARIAS"): + texto_clausula, texto_firmas_secundarias = obtener_textos_reemplazo(firma_digital) + + requests = [ + { + "replaceAllText": { + "containsText": { + "text": marker_clausula, + "matchCase": True, + }, + "replaceText": texto_clausula, + } + }, + { + "replaceAllText": { + "containsText": { + "text": marker_firmas, + "matchCase": True, + }, + "replaceText": texto_firmas_secundarias, + } + }, + ] + + return docs_service.documents().batchUpdate( + documentId=document_id, + body={"requests": requests}, + ).execute() + + +# Rocketbot quirk FIX +globals().update(locals()) + + +# ----------------------------------------------------------- +# MAIN +# ----------------------------------------------------------- +try: + _sv("error_firma_digital", "") + + url_doc = _gvs("current_url", "") + if url_doc == "": + raise RuntimeError("Falta current_url con la URL exacta del documento a procesar.") + + _sv("debug_current_url_var", url_doc) + + raw_id = extract_doc_id_from_url(url_doc) + if not raw_id: + raise RuntimeError("No pude extraer documentId de la URL.") + + _sv("gdoc_original_id", raw_id) + + cred_path = _gvs("gdoc_sa_json", "") + if cred_path == "": + raise RuntimeError("Falta la variable gdoc_sa_json.") + + if not os.path.isabs(cred_path): + cred_path = os.path.join(base_dir, cred_path) + + if not os.path.exists(cred_path): + raise RuntimeError("No existe credentials.json: " + cred_path) + + token_path = _gvs("gdoc_token_json", "") + if token_path == "": + token_path = os.path.join(base_dir, "credentials", "token_gdocs_drive.json") + if not os.path.isabs(token_path): + token_path = os.path.join(base_dir, token_path) + + docs_service, drive_service, auth_mode = get_services(cred_path, token_path) + _sv("gdoc_auth_mode", auth_mode) + + doc_id, meta_file, converted = ensure_docs_api_compatible(drive_service, raw_id) + _sv("gdoc_id", doc_id) + _sv("gdoc_converted", "1" if converted else "0") + _sv("gdoc_url_final", f"https://docs.google.com/document/d/{doc_id}/edit") + _sv("gdoc_name", meta_file.get("name", "")) + + doc_before = docs_get(docs_service, doc_id) + _sv("gdoc_revision_before", doc_before.get("revisionId", "")) + + firma_digital = _gvs("firma_digital", "") + marker_clausula = _gvs("marker_clausula_firma_digital", "CLAUSULA_FIRMA_DIGITAL") + marker_firmas = _gvs("marker_firmas_secundarias", "FIRMAS_SECUNDARIAS") + + if firma_digital == "": + raise RuntimeError("Falta la variable firma_digital.") + + _sv("firma_digital_usada", firma_digital) + _sv("marker_clausula_firma_digital_usado", marker_clausula) + _sv("marker_firmas_secundarias_usado", marker_firmas) + + ocurrencias_desnegritadas = quitar_negrita_marker_y_siguiente_char( + docs_service=docs_service, + document_id=doc_id, + marker_text=marker_clausula, + ) + _sv("marker_clausula_desnegritado_previo", str(ocurrencias_desnegritadas)) + + resultado = reemplazar_clausula_y_firmas( + docs_service=docs_service, + document_id=doc_id, + firma_digital=firma_digital, + marker_clausula=marker_clausula, + marker_firmas=marker_firmas, + ) + + replies = resultado.get("replies") or [] + ocurrencias_clausula = 0 + ocurrencias_firmas = 0 + + if len(replies) > 0: + ocurrencias_clausula = int(((replies[0] or {}).get("replaceAllText") or {}).get("occurrencesChanged", 0)) + if len(replies) > 1: + ocurrencias_firmas = int(((replies[1] or {}).get("replaceAllText") or {}).get("occurrencesChanged", 0)) + + _sv("reemplazos_clausula_firma_digital", str(ocurrencias_clausula)) + _sv("reemplazos_firmas_secundarias", str(ocurrencias_firmas)) + _sv("status_proceso", "Cláusula y firmas reemplazadas correctamente") + + print("Reemplazos realizados correctamente.") + print(resultado) + +except Exception as e: + import traceback + + error_msg = f"Error en script: {str(e)} - {traceback.format_exc()}" + print(error_msg) + _sv("status_proceso", "ERROR") + _sv("error_firma_digital", error_msg) + raise