diff --git a/agregar_planos.py b/agregar_planos.py index 7d1fab2..00ecffb 100644 --- a/agregar_planos.py +++ b/agregar_planos.py @@ -68,7 +68,7 @@ def _load_json(path): def get_services(credentials_json_path, impersonated_user): info = _load_json(credentials_json_path) if not isinstance(info, dict) or info.get("type") != "service_account": - raise RuntimeError("gdoc_sa_json2 debe apuntar a un JSON de cuenta de servicio.") + raise RuntimeError("gdoc_sa_json debe apuntar a un JSON de cuenta de servicio.") impersonated_user = (impersonated_user or "").strip() if not impersonated_user: @@ -548,9 +548,9 @@ try: url1 = _gvs("url_plano_producto", "") url2 = _gvs("url_plano_piso", "") - cred_path = _gvs("gdoc_sa_json2", "") + cred_path = _gvs("gdoc_sa_json", "") if cred_path == "": - raise RuntimeError("Falta gdoc_sa_json2 (ruta al JSON de cuenta de servicio)") + raise RuntimeError("Falta gdoc_sa_json (ruta al JSON de cuenta de servicio)") if not os.path.isabs(cred_path): cred_path = os.path.join(base_dir, cred_path) if not os.path.exists(cred_path): diff --git a/clausulas_durante_obra.py b/clausulas_durante_obra.py index 5690738..baa9d9e 100644 --- a/clausulas_durante_obra.py +++ b/clausulas_durante_obra.py @@ -42,7 +42,7 @@ def _load_json(path): def get_services(credentials_json_path, impersonated_user): info = _load_json(credentials_json_path) if not isinstance(info, dict) or info.get("type") != "service_account": - raise RuntimeError("gdoc_sa_json2 debe apuntar a un JSON de cuenta de servicio.") + raise RuntimeError("gdoc_sa_json debe apuntar a un JSON de cuenta de servicio.") impersonated_user = (impersonated_user or "").strip() if not impersonated_user: @@ -672,9 +672,9 @@ try: _sv("gdoc_original_id", raw_id) - cred_path = _gvs("gdoc_sa_json2", "") + cred_path = _gvs("gdoc_sa_json", "") if cred_path == "": - raise RuntimeError("Falta la variable gdoc_sa_json2.") + raise RuntimeError("Falta la variable gdoc_sa_json.") if not os.path.isabs(cred_path): cred_path = os.path.join(base_dir, cred_path) diff --git a/clausulas_financiado.py b/clausulas_financiado.py index a092b93..482159e 100644 --- a/clausulas_financiado.py +++ b/clausulas_financiado.py @@ -42,7 +42,7 @@ def _load_json(path): def get_services(credentials_json_path, impersonated_user): info = _load_json(credentials_json_path) if not isinstance(info, dict) or info.get("type") != "service_account": - raise RuntimeError("gdoc_sa_json2 debe apuntar a un JSON de cuenta de servicio.") + raise RuntimeError("gdoc_sa_json debe apuntar a un JSON de cuenta de servicio.") impersonated_user = (impersonated_user or "").strip() if not impersonated_user: @@ -672,9 +672,9 @@ try: _sv("gdoc_original_id", raw_id) - cred_path = _gvs("gdoc_sa_json2", "") + cred_path = _gvs("gdoc_sa_json", "") if cred_path == "": - raise RuntimeError("Falta la variable gdoc_sa_json2.") + raise RuntimeError("Falta la variable gdoc_sa_json.") if not os.path.isabs(cred_path): cred_path = os.path.join(base_dir, cred_path) diff --git a/detalles_tecnicos.py b/detalles_tecnicos.py deleted file mode 100644 index ade10bb..0000000 --- a/detalles_tecnicos.py +++ /dev/null @@ -1,547 +0,0 @@ -from __future__ import annotations - -import warnings -warnings.simplefilter("ignore", ResourceWarning) - -import os -import sys -import json -import re -import ast -import unicodedata -from typing import Any, Dict, List, Tuple - -# -------------------- -# Boot libs (Rocketbot portable) -# -------------------- -base_dir = os.path.dirname(sys.executable) -libs_dir = os.path.join(base_dir, "py_libs", "py310") -sys.path.insert(0, libs_dir) - -for k in list(sys.modules.keys()): - if k == "pyparsing" or k.startswith("pyparsing."): - del sys.modules[k] - -from googleapiclient.discovery import build -from google.oauth2 import service_account -from google.oauth2.credentials import Credentials -from google_auth_oauthlib.flow import InstalledAppFlow -from google.auth.transport.requests import Request as GRequest - -# ----------------------------------------------------------- -# GOOGLE AUTH -# ----------------------------------------------------------- -# Este placeholder queda igual para Rocketbot. -SCOPES = {scopes_api_google} - -MARCADOR_DEFAULT = "DETALLES_TECNICOS" -SIMPLE_BULLET_PRESET = "BULLET_DISC_CIRCLE_SQUARE" - - -# ----------------------------------------------------------- -# ROCKETBOT VAR HELPERS -# ----------------------------------------------------------- -def _sv(name, value): - try: - SetVar(name, value) - except Exception: - pass - - -def _missing(v) -> bool: - if v is None: - return True - if isinstance(v, str): - s = v.strip() - return s == "" or s == "ERROR_NOT_VAR" - return False - - -def _gv(name, default=None): - try: - v = GetVar(name) - except Exception: - return default - return default if _missing(v) else v - - -def _gvs(name, default="") -> str: - v = _gv(name, default) - if v is None: - return default - s = str(v).strip() - return default if (s == "" or s == "ERROR_NOT_VAR") else s - - -# ----------------------------------------------------------- -# GOOGLE HELPERS -# ----------------------------------------------------------- -def _load_json(path): - with open(path, "r", encoding="utf-8") as f: - return json.load(f) - - - -def get_services(credentials_json_path, token_json_path): - info = _load_json(credentials_json_path) - - if isinstance(info, dict) and info.get("type") == "service_account": - creds = service_account.Credentials.from_service_account_file( - credentials_json_path, - scopes=SCOPES, - ) - docs = build("docs", "v1", credentials=creds, cache_discovery=False) - drive = build("drive", "v3", credentials=creds, cache_discovery=False) - return docs, drive, "service_account" - - creds = None - if os.path.exists(token_json_path): - creds = Credentials.from_authorized_user_file(token_json_path, SCOPES) - - if (not creds) or (not creds.valid): - if creds and creds.expired and creds.refresh_token: - creds.refresh(GRequest()) - else: - flow = InstalledAppFlow.from_client_secrets_file(credentials_json_path, SCOPES) - try: - creds = flow.run_local_server(port=0) - except Exception: - creds = flow.run_console() - - os.makedirs(os.path.dirname(token_json_path), exist_ok=True) - with open(token_json_path, "w", encoding="utf-8") as f: - f.write(creds.to_json()) - - docs = build("docs", "v1", credentials=creds, cache_discovery=False) - drive = build("drive", "v3", credentials=creds, cache_discovery=False) - return docs, drive, "oauth" - - - -def extract_doc_id_from_url(url: str) -> str: - m = re.search(r"/document/d/([a-zA-Z0-9_-]+)", url or "") - return m.group(1) if m else "" - - - -def ensure_docs_api_compatible(drive_service, file_id: str): - meta = drive_service.files().get( - fileId=file_id, - fields="id,name,mimeType,shortcutDetails", - supportsAllDrives=True, - ).execute() - - if meta.get("mimeType") == "application/vnd.google-apps.shortcut": - target = (meta.get("shortcutDetails") or {}).get("targetId", "") - if not target: - raise RuntimeError("Es shortcut pero no trae targetId.") - file_id = target - meta = drive_service.files().get( - fileId=file_id, - fields="id,name,mimeType", - supportsAllDrives=True, - ).execute() - - if meta.get("mimeType") != "application/vnd.google-apps.document": - new_name = (meta.get("name") or "Documento") + " (Google Docs)" - converted = drive_service.files().copy( - fileId=file_id, - body={"name": new_name, "mimeType": "application/vnd.google-apps.document"}, - fields="id,name,mimeType", - supportsAllDrives=True, - ).execute() - return converted["id"], converted, True - - return file_id, meta, False - - - -def docs_get(docs_service, doc_id: str): - return docs_service.documents().get(documentId=doc_id).execute() - - -# ----------------------------------------------------------- -# DOC STRUCTURE HELPERS -# ----------------------------------------------------------- -def _norm_alnum(s: str) -> str: - if s is None: - return "" - s = unicodedata.normalize("NFKD", str(s)).lower() - out = [] - for ch in s: - if unicodedata.category(ch) == "Mn": - continue - if ch.isalnum(): - out.append(ch) - return "".join(out) - - - -def _walk_tabs(tabs): - if not tabs: - return - for t in tabs: - yield t - for x in _walk_tabs(t.get("childTabs") or []): - yield x - - - -def _get_tab_body_content(doc, tab_id): - tabs = doc.get("tabs") - if not tabs: - return (doc.get("body") or {}).get("content", []) - - for t in _walk_tabs(tabs): - tid = ((t.get("tabProperties") or {}).get("tabId")) or "" - if tid == (tab_id or ""): - dtab = t.get("documentTab") or {} - return (dtab.get("body") or {}).get("content", []) - - return [] - - - -def _iter_text_chars_from_body(doc, tab_id): - content = _get_tab_body_content(doc, tab_id) - - def walk(content_list): - for el in content_list or []: - if not isinstance(el, dict): - continue - - p = el.get("paragraph") - if p: - for pe in p.get("elements", []): - tr = pe.get("textRun") - if not tr: - continue - txt = tr.get("content", "") - st = pe.get("startIndex") - if st is None or txt is None: - continue - for i, ch in enumerate(txt): - yield int(st) + i, ch - continue - - t = el.get("table") - if t: - for row in t.get("tableRows", []): - for cell in row.get("tableCells", []): - yield from walk(cell.get("content", [])) - continue - - toc = el.get("tableOfContents") - if toc: - yield from walk(toc.get("content", [])) - continue - - yield from walk(content) - - - -def find_marker_in_body(doc, marker: str): - target = _norm_alnum(marker) - if not target: - return None - - tabs = doc.get("tabs") - tab_ids = [None] - if tabs: - tab_ids = [] - for t in _walk_tabs(tabs): - tid = ((t.get("tabProperties") or {}).get("tabId")) or "" - tab_ids.append(tid) - - for tab_id in tab_ids: - norm_chars = [] - norm_to_docidx = [] - - for doc_i, ch in _iter_text_chars_from_body(doc, tab_id): - if ch.isalnum(): - norm_chars.append(ch.lower()) - norm_to_docidx.append(int(doc_i)) - - norm_text = "".join(norm_chars) - pos = norm_text.find(target) - - if pos != -1: - start_doc = norm_to_docidx[pos] - end_doc = norm_to_docidx[pos + len(target) - 1] + 1 - return { - "tabId": tab_id, - "start": int(start_doc), - "end": int(end_doc), - } - - return None - - - -def _loc(index, tab_id=None): - d = {"index": int(index)} - if tab_id: - d["tabId"] = tab_id - return d - - - -def _range(start, end, tab_id=None): - d = {"startIndex": int(start), "endIndex": int(end)} - if tab_id: - d["tabId"] = tab_id - return d - - -# ----------------------------------------------------------- -# DATA PARSING / FORMAT HELPERS -# ----------------------------------------------------------- -def normalizar_espacios(texto: str) -> str: - return re.sub(r"\s+", " ", str(texto or "").strip()) - - - -def parse_especificaciones(raw: Any) -> List[Dict[str, str]]: - if isinstance(raw, list): - return raw - - if raw is None: - return [] - - texto = str(raw).strip() - if not texto: - return [] - - try: - data = json.loads(texto) - if isinstance(data, list): - return data - except Exception: - pass - - try: - data = ast.literal_eval(texto) - if isinstance(data, list): - return data - except Exception: - pass - - raise RuntimeError("No se pudo interpretar especificaciones_tecnicas como JSON ni como lista Python.") - - - -def dividir_en_bullets(texto: str) -> List[str]: - texto = normalizar_espacios(texto) - if not texto: - return [] - - partes = re.split(r"(?<=[.!?])\s+(?=(?:[A-ZÁÉÍÓÚÑÜ]|\d))", texto) - return [p.strip() for p in partes if p.strip()] - - - -def construir_bloque_formateado(especificaciones_tecnicas: List[Dict[str, str]]): - partes: List[str] = [] - rangos_titulos: List[Tuple[int, int]] = [] - rangos_bullets: List[Tuple[int, int]] = [] - cursor = 0 - - items_validos = [] - for item in especificaciones_tecnicas or []: - clave = normalizar_espacios((item or {}).get("clave", "")) - valor = (item or {}).get("valor", "") - if clave: - items_validos.append({"clave": clave, "valor": valor}) - - for i, item in enumerate(items_validos): - titulo_completo = item["clave"] - bullets = dividir_en_bullets(item.get("valor", "")) - - ini_titulo = cursor - partes.append(titulo_completo) - cursor += len(titulo_completo) - fin_titulo = cursor - rangos_titulos.append((ini_titulo, fin_titulo)) - - partes.append("\n") - cursor += 1 - - for bullet in bullets: - ini_bullet = cursor - partes.append(bullet) - cursor += len(bullet) - fin_bullet = cursor - rangos_bullets.append((ini_bullet, fin_bullet)) - - partes.append("\n") - cursor += 1 - - if i < len(items_validos) - 1: - partes.append("\n") - cursor += 1 - - return "".join(partes), rangos_titulos, rangos_bullets - - -# ----------------------------------------------------------- -# MAIN ACTION -# ----------------------------------------------------------- -def reemplazar_detalles_tecnicos( - docs_service, - document_id: str, - marcador: str, - especificaciones_tecnicas: List[Dict[str, str]], -): - documento = docs_get(docs_service, document_id) - hit = find_marker_in_body(documento, marcador) - if not hit: - raise RuntimeError(f"No se encontró el marcador en el BODY del documento: {marcador}") - - marcador_inicio = int(hit["start"]) - marcador_fin = int(hit["end"]) - tab_id = hit.get("tabId") - - texto_insertar, rangos_titulos, rangos_bullets = construir_bloque_formateado(especificaciones_tecnicas) - if not texto_insertar.strip(): - raise RuntimeError("especificaciones_tecnicas está vacío o no contiene elementos válidos.") - - docs_service.documents().batchUpdate( - documentId=document_id, - body={ - "requests": [ - { - "deleteContentRange": { - "range": _range(marcador_inicio, marcador_fin, tab_id) - } - } - ] - }, - ).execute() - - docs_service.documents().batchUpdate( - documentId=document_id, - body={ - "requests": [ - { - "insertText": { - "location": _loc(marcador_inicio, tab_id), - "text": texto_insertar, - } - } - ] - }, - ).execute() - - requests_estilo = [] - - # 1) cada título queda como texto normal del documento, con su número real incluido y en negrita. - for ini, fin in rangos_titulos: - requests_estilo.append( - { - "updateTextStyle": { - "range": _range(marcador_inicio + ini, marcador_inicio + fin, tab_id), - "textStyle": {"bold": True}, - "fields": "bold", - } - } - ) - - # 2) solo los detalles van con viñeta simple. - for ini, fin in rangos_bullets: - requests_estilo.append( - { - "createParagraphBullets": { - "range": _range(marcador_inicio + ini, marcador_inicio + fin + 1, tab_id), - "bulletPreset": SIMPLE_BULLET_PRESET, - } - } - ) - - if requests_estilo: - docs_service.documents().batchUpdate( - documentId=document_id, - body={"requests": requests_estilo}, - ).execute() - - return { - "ok": True, - "document_id": document_id, - "marcador": marcador, - "titulos_formateados": len(rangos_titulos), - "bullets_formateados": len(rangos_bullets), - "texto_insertado": texto_insertar, - } - - -# Rocketbot quirk FIX -globals().update(locals()) - - -# ----------------------------------------------------------- -# MAIN -# ----------------------------------------------------------- -try: - _sv("error_detalles_tecnicos", "") - - url_doc = _gvs("current_url", "") - if url_doc == "": - raise RuntimeError("Falta current_url con la URL exacta del documento a procesar.") - - raw_id = extract_doc_id_from_url(url_doc) - if not raw_id: - raise RuntimeError("No pude extraer documentId de la URL.") - - cred_path = _gvs("gdoc_sa_json", "") - if cred_path == "": - raise RuntimeError("Falta la variable gdoc_sa_json.") - if not os.path.isabs(cred_path): - cred_path = os.path.join(base_dir, cred_path) - if not os.path.exists(cred_path): - raise RuntimeError("No existe credentials.json: " + cred_path) - - token_path = _gvs("gdoc_token_json", "") - if token_path == "": - token_path = os.path.join(base_dir, "credentials", "token_gdocs_drive.json") - if not os.path.isabs(token_path): - token_path = os.path.join(base_dir, token_path) - - docs_service, drive_service, auth_mode = get_services(cred_path, token_path) - _sv("gdoc_auth_mode", auth_mode) - - doc_id, meta_file, converted = ensure_docs_api_compatible(drive_service, raw_id) - _sv("gdoc_id", doc_id) - _sv("gdoc_converted", "1" if converted else "0") - _sv("gdoc_url_final", f"https://docs.google.com/document/d/{doc_id}/edit") - _sv("gdoc_name", meta_file.get("name", "")) - - marcador = _gvs("marker_detalles_tecnicos", MARCADOR_DEFAULT) - _sv("gdoc_marker_detalles_tecnicos_used", marcador) - - raw_especificaciones = globals().get("especificaciones_tecnicas", None) - if raw_especificaciones is None: - raw_especificaciones = _gv("especificaciones_tecnicas", None) - - especificaciones_tecnicas = parse_especificaciones(raw_especificaciones) - - resultado = reemplazar_detalles_tecnicos( - docs_service=docs_service, - document_id=doc_id, - marcador=marcador, - especificaciones_tecnicas=especificaciones_tecnicas, - ) - - _sv("status_proceso", "DETALLES_TECNICOS insertado correctamente") - _sv("detalles_tecnicos_insertados", "1") - _sv("detalles_tecnicos_resultado", json.dumps(resultado, ensure_ascii=False)) - - print("DETALLES_TECNICOS insertado correctamente") - print(json.dumps(resultado, ensure_ascii=False, indent=2)) - -except Exception as e: - import traceback - - error_msg = f"Error en script: {str(e)} - {traceback.format_exc()}" - print(error_msg) - _sv("status_proceso", "ERROR") - _sv("error_detalles_tecnicos", error_msg) - raise diff --git a/duplicar_plantilla_contrato.py b/duplicar_plantilla_contrato.py index f1972d8..5ce5f3a 100644 --- a/duplicar_plantilla_contrato.py +++ b/duplicar_plantilla_contrato.py @@ -34,7 +34,7 @@ def rb_get_var(name, required=True, default=''): SCOPES = {scopes_api_google} -CREDENTIALS_PATH = rb_get_var('gdoc_sa_json2') +CREDENTIALS_PATH = rb_get_var('gdoc_sa_json') IMPERSONATED_USER = rb_get_var('gdoc_impersonated_user') @@ -46,7 +46,7 @@ def _load_json(path): def get_drive_service(credentials_json_path=CREDENTIALS_PATH, impersonated_user=IMPERSONATED_USER): info = _load_json(credentials_json_path) if not isinstance(info, dict) or info.get('type') != 'service_account': - raise RuntimeError('gdoc_sa_json2 debe apuntar a un JSON de cuenta de servicio.') + raise RuntimeError('gdoc_sa_json debe apuntar a un JSON de cuenta de servicio.') impersonated_user = (impersonated_user or '').strip() if not impersonated_user: diff --git a/envio_contrato_docs_email.py b/envio_contrato_docs_email.py index 7710141..07af678 100644 --- a/envio_contrato_docs_email.py +++ b/envio_contrato_docs_email.py @@ -23,7 +23,7 @@ if os.path.isdir(libs_dir) and libs_dir not in sys.path: SCOPES = {scopes_api_google} -CREDENTIALS_PATH = '{gdoc_sa_json2}' +CREDENTIALS_PATH = '{gdoc_sa_json}' IMPERSONATED_USER = '{gdoc_impersonated_user}' @@ -59,7 +59,7 @@ def _load_json(path): def get_drive_and_gmail_services(credentials_json_path=CREDENTIALS_PATH, impersonated_user=IMPERSONATED_USER): info = _load_json(credentials_json_path) if not isinstance(info, dict) or info.get('type') != 'service_account': - raise RuntimeError('gdoc_sa_json2 debe apuntar a un JSON de cuenta de servicio.') + raise RuntimeError('gdoc_sa_json debe apuntar a un JSON de cuenta de servicio.') impersonated_user = (impersonated_user or '').strip() if not impersonated_user: diff --git a/firma_digital_personalizacion.py b/firma_digital_personalizacion.py index d5b3a11..6aea0af 100644 --- a/firma_digital_personalizacion.py +++ b/firma_digital_personalizacion.py @@ -37,7 +37,7 @@ def _load_json(path): def get_services(credentials_json_path, impersonated_user): info = _load_json(credentials_json_path) if not isinstance(info, dict) or info.get("type") != "service_account": - raise RuntimeError("gdoc_sa_json2 debe apuntar a un JSON de cuenta de servicio.") + raise RuntimeError("gdoc_sa_json debe apuntar a un JSON de cuenta de servicio.") impersonated_user = (impersonated_user or "").strip() if not impersonated_user: @@ -294,9 +294,9 @@ try: _sv("gdoc_original_id", raw_id) - cred_path = _gvs("gdoc_sa_json2", "") + cred_path = _gvs("gdoc_sa_json", "") if cred_path == "": - raise RuntimeError("Falta la variable gdoc_sa_json2.") + raise RuntimeError("Falta la variable gdoc_sa_json.") if not os.path.isabs(cred_path): cred_path = os.path.join(base_dir, cred_path) diff --git a/parrafos_dinamicos_forma_pago.py b/parrafos_dinamicos_forma_pago.py index 3c6f8de..e241201 100644 --- a/parrafos_dinamicos_forma_pago.py +++ b/parrafos_dinamicos_forma_pago.py @@ -542,7 +542,7 @@ def _load_json(path): def get_services(credentials_json_path, impersonated_user): info = _load_json(credentials_json_path) if not isinstance(info, dict) or info.get("type") != "service_account": - raise RuntimeError("gdoc_sa_json2 debe apuntar a un JSON de cuenta de servicio.") + raise RuntimeError("gdoc_sa_json debe apuntar a un JSON de cuenta de servicio.") impersonated_user = (impersonated_user or "").strip() if not impersonated_user: @@ -955,9 +955,9 @@ try: _sv("gdoc_original_id", raw_id) - cred_path = _gvs("gdoc_sa_json2", "") + cred_path = _gvs("gdoc_sa_json", "") if cred_path == "": - raise RuntimeError("Falta la variable gdoc_sa_json2.") + raise RuntimeError("Falta la variable gdoc_sa_json.") if not os.path.isabs(cred_path): cred_path = os.path.join(base_dir, cred_path) diff --git a/plan_de_pago_contado.py b/plan_de_pago_contado.py index 62f7725..0626531 100644 --- a/plan_de_pago_contado.py +++ b/plan_de_pago_contado.py @@ -338,7 +338,7 @@ def _load_json(path): #Función auxiliar para cargar un archivo JSON def get_services(credentials_json_path, impersonated_user): #Funci?n auxiliar para obtener los servicios de Google info = _load_json(credentials_json_path) # carga el archivo JSON if not isinstance(info, dict) or info.get("type") != "service_account": - raise RuntimeError("gdoc_sa_json2 debe apuntar a un JSON de cuenta de servicio.") + raise RuntimeError("gdoc_sa_json debe apuntar a un JSON de cuenta de servicio.") impersonated_user = (impersonated_user or "").strip() if not impersonated_user: @@ -1223,9 +1223,9 @@ try: marker = _gvs("gdoc_marker", "TABLA_PROFORMA") _sv("dbg_marker_var", marker) - cred_path = _gvs("gdoc_sa_json2", "") + cred_path = _gvs("gdoc_sa_json", "") if cred_path == "": - raise RuntimeError("Falta gdoc_sa_json2 (ruta al JSON de cuenta de servicio)") + raise RuntimeError("Falta gdoc_sa_json (ruta al JSON de cuenta de servicio)") if not os.path.isabs(cred_path): cred_path = os.path.join(base_dir, cred_path) if not os.path.exists(cred_path): diff --git a/plan_pagos_financiado_durante_obra.py b/plan_pagos_financiado_durante_obra.py index 321de43..92c03a5 100644 --- a/plan_pagos_financiado_durante_obra.py +++ b/plan_pagos_financiado_durante_obra.py @@ -491,7 +491,7 @@ def run(): info = _load_json(credentials_json_path) if not isinstance(info, dict) or info.get("type") != "service_account": - raise RuntimeError("gdoc_sa_json2 debe apuntar a un JSON de cuenta de servicio.") + raise RuntimeError("gdoc_sa_json debe apuntar a un JSON de cuenta de servicio.") impersonated_user = (impersonated_user or "").strip() if not impersonated_user: @@ -1339,9 +1339,9 @@ def run(): # ============================================================ marker = _gvs("gdoc_marker", "TABLA_PROFORMA") - cred_path = _gvs("gdoc_sa_json2", "") + cred_path = _gvs("gdoc_sa_json", "") if cred_path == "": - raise RuntimeError("Falta gdoc_sa_json2") + raise RuntimeError("Falta gdoc_sa_json") if not os.path.isabs(cred_path): cred_path = os.path.join(base_dir, cred_path) if not os.path.exists(cred_path): diff --git a/reemplaza_detalles_tecnicos_anexo2.py b/reemplaza_detalles_tecnicos_anexo2.py index edfff1d..3be0356 100644 --- a/reemplaza_detalles_tecnicos_anexo2.py +++ b/reemplaza_detalles_tecnicos_anexo2.py @@ -24,9 +24,6 @@ for k in list(sys.modules.keys()): from googleapiclient.discovery import build from google.oauth2 import service_account -from google.oauth2.credentials import Credentials -from google_auth_oauthlib.flow import InstalledAppFlow -from google.auth.transport.requests import Request as GRequest # ----------------------------------------------------------- # GOOGLE AUTH @@ -82,39 +79,23 @@ def _load_json(path): -def get_services(credentials_json_path, token_json_path): +def get_services(credentials_json_path, impersonated_user): info = _load_json(credentials_json_path) + if not isinstance(info, dict) or info.get("type") != "service_account": + raise RuntimeError("gdoc_sa_json debe apuntar a un JSON de cuenta de servicio.") - if isinstance(info, dict) and info.get("type") == "service_account": - creds = service_account.Credentials.from_service_account_file( - credentials_json_path, - scopes=SCOPES, - ) - docs = build("docs", "v1", credentials=creds, cache_discovery=False) - drive = build("drive", "v3", credentials=creds, cache_discovery=False) - return docs, drive, "service_account" - - creds = None - if os.path.exists(token_json_path): - creds = Credentials.from_authorized_user_file(token_json_path, SCOPES) - - if (not creds) or (not creds.valid): - if creds and creds.expired and creds.refresh_token: - creds.refresh(GRequest()) - else: - flow = InstalledAppFlow.from_client_secrets_file(credentials_json_path, SCOPES) - try: - creds = flow.run_local_server(port=0) - except Exception: - creds = flow.run_console() - - os.makedirs(os.path.dirname(token_json_path), exist_ok=True) - with open(token_json_path, "w", encoding="utf-8") as f: - f.write(creds.to_json()) + impersonated_user = (impersonated_user or "").strip() + if not impersonated_user: + raise RuntimeError("Falta la variable gdoc_impersonated_user.") + creds = service_account.Credentials.from_service_account_file( + credentials_json_path, + scopes=SCOPES, + subject=impersonated_user, + ) docs = build("docs", "v1", credentials=creds, cache_discovery=False) drive = build("drive", "v3", credentials=creds, cache_discovery=False) - return docs, drive, "oauth" + return docs, drive, "service_account_impersonated" @@ -497,16 +478,13 @@ try: if not os.path.isabs(cred_path): cred_path = os.path.join(base_dir, cred_path) if not os.path.exists(cred_path): - raise RuntimeError("No existe credentials.json: " + cred_path) + raise RuntimeError("No existe JSON de cuenta de servicio: " + cred_path) - token_path = _gvs("gdoc_token_json", "") - if token_path == "": - token_path = os.path.join(base_dir, "credentials", "token_gdocs_drive.json") - if not os.path.isabs(token_path): - token_path = os.path.join(base_dir, token_path) + impersonated_user = _gvs("gdoc_impersonated_user", "") - docs_service, drive_service, auth_mode = get_services(cred_path, token_path) + docs_service, drive_service, auth_mode = get_services(cred_path, impersonated_user) _sv("gdoc_auth_mode", auth_mode) + _sv("gdoc_impersonated_user_used", impersonated_user) doc_id, meta_file, converted = ensure_docs_api_compatible(drive_service, raw_id) _sv("gdoc_id", doc_id) diff --git a/reemplazar_variables_contrato.py b/reemplazar_variables_contrato.py index 68865aa..409d835 100644 --- a/reemplazar_variables_contrato.py +++ b/reemplazar_variables_contrato.py @@ -23,7 +23,7 @@ if os.path.isdir(libs_dir) and libs_dir not in sys.path: SCOPES = {scopes_api_google} -CREDENTIALS_PATH = '{gdoc_sa_json2}' +CREDENTIALS_PATH = '{gdoc_sa_json}' IMPERSONATED_USER = '{gdoc_impersonated_user}' @@ -59,7 +59,7 @@ def _load_json(path): def get_google_services(credentials_json_path=CREDENTIALS_PATH, impersonated_user=IMPERSONATED_USER): info = _load_json(credentials_json_path) if not isinstance(info, dict) or info.get('type') != 'service_account': - raise RuntimeError('gdoc_sa_json2 debe apuntar a un JSON de cuenta de servicio.') + raise RuntimeError('gdoc_sa_json debe apuntar a un JSON de cuenta de servicio.') impersonated_user = (impersonated_user or '').strip() if not impersonated_user: diff --git a/script_unificado_firma_digital.py b/script_unificado_firma_digital.py deleted file mode 100644 index a3ae38c..0000000 --- a/script_unificado_firma_digital.py +++ /dev/null @@ -1,393 +0,0 @@ -from __future__ import annotations - -import warnings -warnings.simplefilter("ignore", ResourceWarning) - -import os -import sys -import json -import re - -# -------------------- -# Boot libs (Rocketbot portable) -# -------------------- -base_dir = os.path.dirname(sys.executable) -libs_dir = os.path.join(base_dir, "py_libs", "py310") -sys.path.insert(0, libs_dir) - -for k in list(sys.modules.keys()): - if k == "pyparsing" or k.startswith("pyparsing."): - del sys.modules[k] - -from googleapiclient.discovery import build -from google.oauth2 import service_account -from google.oauth2.credentials import Credentials -from google_auth_oauthlib.flow import InstalledAppFlow -from google.auth.transport.requests import Request as GRequest - - -# ----------------------------------------------------------- -# GOOGLE AUTH -# ----------------------------------------------------------- -SCOPES = {scopes_api_google} - - -def _load_json(path): - with open(path, "r", encoding="utf-8") as f: - return json.load(f) - - -def get_services(credentials_json_path, token_json_path): - info = _load_json(credentials_json_path) - - if isinstance(info, dict) and info.get("type") == "service_account": - creds = service_account.Credentials.from_service_account_file( - credentials_json_path, - scopes=SCOPES, - ) - docs = build("docs", "v1", credentials=creds, cache_discovery=False) - drive = build("drive", "v3", credentials=creds, cache_discovery=False) - return docs, drive, "service_account" - - creds = None - if os.path.exists(token_json_path): - creds = Credentials.from_authorized_user_file(token_json_path, SCOPES) - - if (not creds) or (not creds.valid): - if creds and creds.expired and creds.refresh_token: - creds.refresh(GRequest()) - else: - flow = InstalledAppFlow.from_client_secrets_file(credentials_json_path, SCOPES) - try: - creds = flow.run_local_server(port=0) - except Exception: - creds = flow.run_console() - - os.makedirs(os.path.dirname(token_json_path), exist_ok=True) - with open(token_json_path, "w", encoding="utf-8") as f: - f.write(creds.to_json()) - - docs = build("docs", "v1", credentials=creds, cache_discovery=False) - drive = build("drive", "v3", credentials=creds, cache_discovery=False) - return docs, drive, "oauth" - - -# ----------------------------------------------------------- -# ROCKETBOT VAR HELPERS -# ----------------------------------------------------------- -def _sv(name, value): - try: - SetVar(name, value) - except Exception: - pass - - -def _missing(v) -> bool: - if v is None: - return True - if isinstance(v, str): - s = v.strip() - return s == "" or s == "ERROR_NOT_VAR" - return False - - -def _gv(name, default=None): - try: - v = GetVar(name) - except Exception: - return default - return default if _missing(v) else v - - -def _gvs(name, default="") -> str: - v = _gv(name, default) - if v is None: - return default - s = str(v).strip() - return default if (s == "" or s == "ERROR_NOT_VAR") else s - - -# ----------------------------------------------------------- -# DRIVE / DOC HELPERS -# ----------------------------------------------------------- -def extract_doc_id_from_url(url): - m = re.search(r"/document/d/([a-zA-Z0-9_-]+)", url or "") - return m.group(1) if m else "" - - - -def ensure_docs_api_compatible(drive_service, file_id: str): - meta = drive_service.files().get( - fileId=file_id, - fields="id,name,mimeType,shortcutDetails", - supportsAllDrives=True, - ).execute() - - if meta.get("mimeType") == "application/vnd.google-apps.shortcut": - target = (meta.get("shortcutDetails") or {}).get("targetId", "") - if not target: - raise RuntimeError("Es shortcut pero no trae targetId.") - file_id = target - meta = drive_service.files().get( - fileId=file_id, - fields="id,name,mimeType", - supportsAllDrives=True, - ).execute() - - if meta.get("mimeType") != "application/vnd.google-apps.document": - new_name = (meta.get("name") or "Documento") + " (Google Docs)" - converted = drive_service.files().copy( - fileId=file_id, - body={"name": new_name, "mimeType": "application/vnd.google-apps.document"}, - fields="id,name,mimeType", - supportsAllDrives=True, - ).execute() - return converted["id"], converted, True - - return file_id, meta, False - - - -def docs_get(docs_service, doc_id: str): - return docs_service.documents().get(documentId=doc_id).execute() - - -# ----------------------------------------------------------- -# BUSINESS LOGIC -# ----------------------------------------------------------- -def obtener_textos_reemplazo(firma_digital: str): - firma_digital = (firma_digital or "").strip().upper() - - if firma_digital == "V": - texto_clausula = ( - "Forman parte integrante e inseparable del presente contrato los Anexos " - "que se detallan a continuación, los cuales se consideran plenamente " - "conocidos y aceptados por las partes con la sola suscripción del " - "presente instrumento. " - ) - texto_firmas_secundarias = "" - else: - texto_clausula = ( - "Porción del presente contrato debiendo ser firmados como parte " - "integrante los siguientes ANEXOS:\n" - ) - texto_firmas_secundarias = ( - "___________________________________\t\t_________________________________\n" - "COMPRADOR\t\t\t\t\t\tVENDEDOR" - ) - - return texto_clausula, texto_firmas_secundarias - - - -def obtener_texto_e_indices(documento: dict): - texto = [] - indices = [] - - def recorrer_elementos(elementos): - for el in elementos or []: - parrafo = el.get("paragraph") - if parrafo: - for pe in parrafo.get("elements", []): - text_run = pe.get("textRun") - if not text_run: - continue - contenido = text_run.get("content", "") - start_index = pe.get("startIndex") - if start_index is None: - continue - for i, ch in enumerate(contenido): - texto.append(ch) - indices.append(start_index + i) - - tabla = el.get("table") - if tabla: - for fila in tabla.get("tableRows", []): - for celda in fila.get("tableCells", []): - recorrer_elementos(celda.get("content", [])) - - toc = el.get("tableOfContents") - if toc: - recorrer_elementos(toc.get("content", [])) - - recorrer_elementos((documento.get("body") or {}).get("content", [])) - return "".join(texto), indices - - - -def quitar_negrita_marker_y_siguiente_char(docs_service, document_id: str, marker_text: str): - documento = docs_get(docs_service, document_id) - texto_completo, indices = obtener_texto_e_indices(documento) - - if not marker_text: - return 0 - - requests = [] - pos = 0 - while True: - idx = texto_completo.find(marker_text, pos) - if idx == -1: - break - - start_index = indices[idx] - end_pos = idx + len(marker_text) - if end_pos < len(indices): - end_index = indices[end_pos] + 1 - else: - end_index = indices[end_pos - 1] + 1 - - requests.append({ - "updateTextStyle": { - "range": { - "startIndex": start_index, - "endIndex": end_index, - }, - "textStyle": { - "bold": False, - }, - "fields": "bold", - } - }) - pos = idx + len(marker_text) - - if requests: - docs_service.documents().batchUpdate( - documentId=document_id, - body={"requests": requests}, - ).execute() - - return len(requests) - - -def reemplazar_clausula_y_firmas(docs_service, document_id: str, firma_digital: str, - marker_clausula: str = "CLAUSULA_FIRMA_DIGITAL", - marker_firmas: str = "FIRMAS_SECUNDARIAS"): - texto_clausula, texto_firmas_secundarias = obtener_textos_reemplazo(firma_digital) - - requests = [ - { - "replaceAllText": { - "containsText": { - "text": marker_clausula, - "matchCase": True, - }, - "replaceText": texto_clausula, - } - }, - { - "replaceAllText": { - "containsText": { - "text": marker_firmas, - "matchCase": True, - }, - "replaceText": texto_firmas_secundarias, - } - }, - ] - - return docs_service.documents().batchUpdate( - documentId=document_id, - body={"requests": requests}, - ).execute() - - -# Rocketbot quirk FIX -globals().update(locals()) - - -# ----------------------------------------------------------- -# MAIN -# ----------------------------------------------------------- -try: - _sv("error_firma_digital", "") - - url_doc = _gvs("current_url", "") - if url_doc == "": - raise RuntimeError("Falta current_url con la URL exacta del documento a procesar.") - - _sv("debug_current_url_var", url_doc) - - raw_id = extract_doc_id_from_url(url_doc) - if not raw_id: - raise RuntimeError("No pude extraer documentId de la URL.") - - _sv("gdoc_original_id", raw_id) - - cred_path = _gvs("gdoc_sa_json", "") - if cred_path == "": - raise RuntimeError("Falta la variable gdoc_sa_json.") - - if not os.path.isabs(cred_path): - cred_path = os.path.join(base_dir, cred_path) - - if not os.path.exists(cred_path): - raise RuntimeError("No existe credentials.json: " + cred_path) - - token_path = _gvs("gdoc_token_json", "") - if token_path == "": - token_path = os.path.join(base_dir, "credentials", "token_gdocs_drive.json") - if not os.path.isabs(token_path): - token_path = os.path.join(base_dir, token_path) - - docs_service, drive_service, auth_mode = get_services(cred_path, token_path) - _sv("gdoc_auth_mode", auth_mode) - - doc_id, meta_file, converted = ensure_docs_api_compatible(drive_service, raw_id) - _sv("gdoc_id", doc_id) - _sv("gdoc_converted", "1" if converted else "0") - _sv("gdoc_url_final", f"https://docs.google.com/document/d/{doc_id}/edit") - _sv("gdoc_name", meta_file.get("name", "")) - - doc_before = docs_get(docs_service, doc_id) - _sv("gdoc_revision_before", doc_before.get("revisionId", "")) - - firma_digital = _gvs("firma_digital", "") - marker_clausula = _gvs("marker_clausula_firma_digital", "CLAUSULA_FIRMA_DIGITAL") - marker_firmas = _gvs("marker_firmas_secundarias", "FIRMAS_SECUNDARIAS") - - if firma_digital == "": - raise RuntimeError("Falta la variable firma_digital.") - - _sv("firma_digital_usada", firma_digital) - _sv("marker_clausula_firma_digital_usado", marker_clausula) - _sv("marker_firmas_secundarias_usado", marker_firmas) - - ocurrencias_desnegritadas = quitar_negrita_marker_y_siguiente_char( - docs_service=docs_service, - document_id=doc_id, - marker_text=marker_clausula, - ) - _sv("marker_clausula_desnegritado_previo", str(ocurrencias_desnegritadas)) - - resultado = reemplazar_clausula_y_firmas( - docs_service=docs_service, - document_id=doc_id, - firma_digital=firma_digital, - marker_clausula=marker_clausula, - marker_firmas=marker_firmas, - ) - - replies = resultado.get("replies") or [] - ocurrencias_clausula = 0 - ocurrencias_firmas = 0 - - if len(replies) > 0: - ocurrencias_clausula = int(((replies[0] or {}).get("replaceAllText") or {}).get("occurrencesChanged", 0)) - if len(replies) > 1: - ocurrencias_firmas = int(((replies[1] or {}).get("replaceAllText") or {}).get("occurrencesChanged", 0)) - - _sv("reemplazos_clausula_firma_digital", str(ocurrencias_clausula)) - _sv("reemplazos_firmas_secundarias", str(ocurrencias_firmas)) - _sv("status_proceso", "Cláusula y firmas reemplazadas correctamente") - - print("Reemplazos realizados correctamente.") - print(resultado) - -except Exception as e: - import traceback - - error_msg = f"Error en script: {str(e)} - {traceback.format_exc()}" - print(error_msg) - _sv("status_proceso", "ERROR") - _sv("error_firma_digital", error_msg) - raise