Compare commits

..

4 Commits

Author SHA1 Message Date
mabejoyb
33cbdd36b8 Corrección de variable gdoc_sa_json2 a gdoc_sa_json. 2026-05-13 19:11:37 -03:00
mabejoyb
dd1a529cdf Ajustes para unificar dependencia de current_url. 2026-05-13 18:22:54 -03:00
mabejoyb
252085029a Ajuste en script que envia el correo electronico con el contrato generado. 2026-05-13 14:24:02 -03:00
mabejoyb
d0c1f77ec3 Scripst ajustados para soportar cuenta de servicio 2026-05-13 14:20:55 -03:00
13 changed files with 180 additions and 1187 deletions

View File

@@ -68,7 +68,7 @@ def _load_json(path):
def get_services(credentials_json_path, impersonated_user): def get_services(credentials_json_path, impersonated_user):
info = _load_json(credentials_json_path) info = _load_json(credentials_json_path)
if not isinstance(info, dict) or info.get("type") != "service_account": if not isinstance(info, dict) or info.get("type") != "service_account":
raise RuntimeError("gdoc_sa_json2 debe apuntar a un JSON de cuenta de servicio.") raise RuntimeError("gdoc_sa_json debe apuntar a un JSON de cuenta de servicio.")
impersonated_user = (impersonated_user or "").strip() impersonated_user = (impersonated_user or "").strip()
if not impersonated_user: if not impersonated_user:
@@ -548,9 +548,9 @@ try:
url1 = _gvs("url_plano_producto", "") url1 = _gvs("url_plano_producto", "")
url2 = _gvs("url_plano_piso", "") url2 = _gvs("url_plano_piso", "")
cred_path = _gvs("gdoc_sa_json2", "") cred_path = _gvs("gdoc_sa_json", "")
if cred_path == "": if cred_path == "":
raise RuntimeError("Falta gdoc_sa_json2 (ruta al JSON de cuenta de servicio)") raise RuntimeError("Falta gdoc_sa_json (ruta al JSON de cuenta de servicio)")
if not os.path.isabs(cred_path): if not os.path.isabs(cred_path):
cred_path = os.path.join(base_dir, cred_path) cred_path = os.path.join(base_dir, cred_path)
if not os.path.exists(cred_path): if not os.path.exists(cred_path):
@@ -564,11 +564,9 @@ try:
docs_service, drive_service, auth_mode = get_services(cred_path, impersonated_user) docs_service, drive_service, auth_mode = get_services(cred_path, impersonated_user)
_sv("gdoc_auth_mode", auth_mode) _sv("gdoc_auth_mode", auth_mode)
# doc_id desde URL del navegador # doc_id desde current_url, que lo setea duplicar_plantilla_contrato.py
url_doc = "{current_url}" url_doc = _gvs("current_url", "")
#if url_doc == "": _sv("gdoc_url_used", url_doc)
# url_doc = _gvs("current_url", "")
#_sv("gdoc_url_used", url_doc)
doc_id = extract_doc_id_from_url(url_doc) doc_id = extract_doc_id_from_url(url_doc)
if doc_id == "": if doc_id == "":

View File

@@ -26,9 +26,6 @@ for k in list(sys.modules.keys()):
from googleapiclient.discovery import build from googleapiclient.discovery import build
from googleapiclient.errors import HttpError from googleapiclient.errors import HttpError
from google.oauth2 import service_account from google.oauth2 import service_account
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request as GRequest
# ----------------------------------------------------------- # -----------------------------------------------------------
@@ -42,39 +39,23 @@ def _load_json(path):
return json.load(f) return json.load(f)
def get_services(credentials_json_path, token_json_path): def get_services(credentials_json_path, impersonated_user):
info = _load_json(credentials_json_path) info = _load_json(credentials_json_path)
if not isinstance(info, dict) or info.get("type") != "service_account":
raise RuntimeError("gdoc_sa_json debe apuntar a un JSON de cuenta de servicio.")
impersonated_user = (impersonated_user or "").strip()
if not impersonated_user:
raise RuntimeError("Falta la variable gdoc_impersonated_user.")
if isinstance(info, dict) and info.get("type") == "service_account":
creds = service_account.Credentials.from_service_account_file( creds = service_account.Credentials.from_service_account_file(
credentials_json_path, credentials_json_path,
scopes=SCOPES, scopes=SCOPES,
subject=impersonated_user,
) )
docs = build("docs", "v1", credentials=creds, cache_discovery=False) docs = build("docs", "v1", credentials=creds, cache_discovery=False)
drive = build("drive", "v3", credentials=creds, cache_discovery=False) drive = build("drive", "v3", credentials=creds, cache_discovery=False)
return docs, drive, "service_account" return docs, drive, "service_account_impersonated"
creds = None
if os.path.exists(token_json_path):
creds = Credentials.from_authorized_user_file(token_json_path, SCOPES)
if (not creds) or (not creds.valid):
if creds and creds.expired and creds.refresh_token:
creds.refresh(GRequest())
else:
flow = InstalledAppFlow.from_client_secrets_file(credentials_json_path, SCOPES)
try:
creds = flow.run_local_server(port=0)
except Exception:
creds = flow.run_console()
os.makedirs(os.path.dirname(token_json_path), exist_ok=True)
with open(token_json_path, "w", encoding="utf-8") as f:
f.write(creds.to_json())
docs = build("docs", "v1", credentials=creds, cache_discovery=False)
drive = build("drive", "v3", credentials=creds, cache_discovery=False)
return docs, drive, "oauth"
# ----------------------------------------------------------- # -----------------------------------------------------------
@@ -699,16 +680,13 @@ try:
cred_path = os.path.join(base_dir, cred_path) cred_path = os.path.join(base_dir, cred_path)
if not os.path.exists(cred_path): if not os.path.exists(cred_path):
raise RuntimeError("No existe credentials.json: " + cred_path) raise RuntimeError("No existe JSON de cuenta de servicio: " + cred_path)
token_path = _gvs("gdoc_token_json", "") impersonated_user = _gvs("gdoc_impersonated_user", "")
if token_path == "":
token_path = os.path.join(base_dir, "credentials", "token_gdocs_drive.json")
if not os.path.isabs(token_path):
token_path = os.path.join(base_dir, token_path)
docs_service, drive_service, auth_mode = get_services(cred_path, token_path) docs_service, drive_service, auth_mode = get_services(cred_path, impersonated_user)
_sv("gdoc_auth_mode", auth_mode) _sv("gdoc_auth_mode", auth_mode)
_sv("gdoc_impersonated_user_used", impersonated_user)
doc_id, meta_file, converted = ensure_docs_api_compatible(drive_service, raw_id) doc_id, meta_file, converted = ensure_docs_api_compatible(drive_service, raw_id)
_sv("gdoc_id", doc_id) _sv("gdoc_id", doc_id)

View File

@@ -26,9 +26,6 @@ for k in list(sys.modules.keys()):
from googleapiclient.discovery import build from googleapiclient.discovery import build
from googleapiclient.errors import HttpError from googleapiclient.errors import HttpError
from google.oauth2 import service_account from google.oauth2 import service_account
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request as GRequest
# ----------------------------------------------------------- # -----------------------------------------------------------
@@ -42,39 +39,23 @@ def _load_json(path):
return json.load(f) return json.load(f)
def get_services(credentials_json_path, token_json_path): def get_services(credentials_json_path, impersonated_user):
info = _load_json(credentials_json_path) info = _load_json(credentials_json_path)
if not isinstance(info, dict) or info.get("type") != "service_account":
raise RuntimeError("gdoc_sa_json debe apuntar a un JSON de cuenta de servicio.")
impersonated_user = (impersonated_user or "").strip()
if not impersonated_user:
raise RuntimeError("Falta la variable gdoc_impersonated_user.")
if isinstance(info, dict) and info.get("type") == "service_account":
creds = service_account.Credentials.from_service_account_file( creds = service_account.Credentials.from_service_account_file(
credentials_json_path, credentials_json_path,
scopes=SCOPES, scopes=SCOPES,
subject=impersonated_user,
) )
docs = build("docs", "v1", credentials=creds, cache_discovery=False) docs = build("docs", "v1", credentials=creds, cache_discovery=False)
drive = build("drive", "v3", credentials=creds, cache_discovery=False) drive = build("drive", "v3", credentials=creds, cache_discovery=False)
return docs, drive, "service_account" return docs, drive, "service_account_impersonated"
creds = None
if os.path.exists(token_json_path):
creds = Credentials.from_authorized_user_file(token_json_path, SCOPES)
if (not creds) or (not creds.valid):
if creds and creds.expired and creds.refresh_token:
creds.refresh(GRequest())
else:
flow = InstalledAppFlow.from_client_secrets_file(credentials_json_path, SCOPES)
try:
creds = flow.run_local_server(port=0)
except Exception:
creds = flow.run_console()
os.makedirs(os.path.dirname(token_json_path), exist_ok=True)
with open(token_json_path, "w", encoding="utf-8") as f:
f.write(creds.to_json())
docs = build("docs", "v1", credentials=creds, cache_discovery=False)
drive = build("drive", "v3", credentials=creds, cache_discovery=False)
return docs, drive, "oauth"
# ----------------------------------------------------------- # -----------------------------------------------------------
@@ -699,16 +680,13 @@ try:
cred_path = os.path.join(base_dir, cred_path) cred_path = os.path.join(base_dir, cred_path)
if not os.path.exists(cred_path): if not os.path.exists(cred_path):
raise RuntimeError("No existe credentials.json: " + cred_path) raise RuntimeError("No existe JSON de cuenta de servicio: " + cred_path)
token_path = _gvs("gdoc_token_json", "") impersonated_user = _gvs("gdoc_impersonated_user", "")
if token_path == "":
token_path = os.path.join(base_dir, "credentials", "token_gdocs_drive.json")
if not os.path.isabs(token_path):
token_path = os.path.join(base_dir, token_path)
docs_service, drive_service, auth_mode = get_services(cred_path, token_path) docs_service, drive_service, auth_mode = get_services(cred_path, impersonated_user)
_sv("gdoc_auth_mode", auth_mode) _sv("gdoc_auth_mode", auth_mode)
_sv("gdoc_impersonated_user_used", impersonated_user)
doc_id, meta_file, converted = ensure_docs_api_compatible(drive_service, raw_id) doc_id, meta_file, converted = ensure_docs_api_compatible(drive_service, raw_id)
_sv("gdoc_id", doc_id) _sv("gdoc_id", doc_id)

View File

@@ -1,547 +0,0 @@
from __future__ import annotations
import warnings
warnings.simplefilter("ignore", ResourceWarning)
import os
import sys
import json
import re
import ast
import unicodedata
from typing import Any, Dict, List, Tuple
# --------------------
# Boot libs (Rocketbot portable)
# --------------------
base_dir = os.path.dirname(sys.executable)
libs_dir = os.path.join(base_dir, "py_libs", "py310")
sys.path.insert(0, libs_dir)
for k in list(sys.modules.keys()):
if k == "pyparsing" or k.startswith("pyparsing."):
del sys.modules[k]
from googleapiclient.discovery import build
from google.oauth2 import service_account
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request as GRequest
# -----------------------------------------------------------
# GOOGLE AUTH
# -----------------------------------------------------------
# Este placeholder queda igual para Rocketbot.
SCOPES = {scopes_api_google}
MARCADOR_DEFAULT = "DETALLES_TECNICOS"
SIMPLE_BULLET_PRESET = "BULLET_DISC_CIRCLE_SQUARE"
# -----------------------------------------------------------
# ROCKETBOT VAR HELPERS
# -----------------------------------------------------------
def _sv(name, value):
try:
SetVar(name, value)
except Exception:
pass
def _missing(v) -> bool:
if v is None:
return True
if isinstance(v, str):
s = v.strip()
return s == "" or s == "ERROR_NOT_VAR"
return False
def _gv(name, default=None):
try:
v = GetVar(name)
except Exception:
return default
return default if _missing(v) else v
def _gvs(name, default="") -> str:
v = _gv(name, default)
if v is None:
return default
s = str(v).strip()
return default if (s == "" or s == "ERROR_NOT_VAR") else s
# -----------------------------------------------------------
# GOOGLE HELPERS
# -----------------------------------------------------------
def _load_json(path):
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
def get_services(credentials_json_path, token_json_path):
info = _load_json(credentials_json_path)
if isinstance(info, dict) and info.get("type") == "service_account":
creds = service_account.Credentials.from_service_account_file(
credentials_json_path,
scopes=SCOPES,
)
docs = build("docs", "v1", credentials=creds, cache_discovery=False)
drive = build("drive", "v3", credentials=creds, cache_discovery=False)
return docs, drive, "service_account"
creds = None
if os.path.exists(token_json_path):
creds = Credentials.from_authorized_user_file(token_json_path, SCOPES)
if (not creds) or (not creds.valid):
if creds and creds.expired and creds.refresh_token:
creds.refresh(GRequest())
else:
flow = InstalledAppFlow.from_client_secrets_file(credentials_json_path, SCOPES)
try:
creds = flow.run_local_server(port=0)
except Exception:
creds = flow.run_console()
os.makedirs(os.path.dirname(token_json_path), exist_ok=True)
with open(token_json_path, "w", encoding="utf-8") as f:
f.write(creds.to_json())
docs = build("docs", "v1", credentials=creds, cache_discovery=False)
drive = build("drive", "v3", credentials=creds, cache_discovery=False)
return docs, drive, "oauth"
def extract_doc_id_from_url(url: str) -> str:
m = re.search(r"/document/d/([a-zA-Z0-9_-]+)", url or "")
return m.group(1) if m else ""
def ensure_docs_api_compatible(drive_service, file_id: str):
meta = drive_service.files().get(
fileId=file_id,
fields="id,name,mimeType,shortcutDetails",
supportsAllDrives=True,
).execute()
if meta.get("mimeType") == "application/vnd.google-apps.shortcut":
target = (meta.get("shortcutDetails") or {}).get("targetId", "")
if not target:
raise RuntimeError("Es shortcut pero no trae targetId.")
file_id = target
meta = drive_service.files().get(
fileId=file_id,
fields="id,name,mimeType",
supportsAllDrives=True,
).execute()
if meta.get("mimeType") != "application/vnd.google-apps.document":
new_name = (meta.get("name") or "Documento") + " (Google Docs)"
converted = drive_service.files().copy(
fileId=file_id,
body={"name": new_name, "mimeType": "application/vnd.google-apps.document"},
fields="id,name,mimeType",
supportsAllDrives=True,
).execute()
return converted["id"], converted, True
return file_id, meta, False
def docs_get(docs_service, doc_id: str):
return docs_service.documents().get(documentId=doc_id).execute()
# -----------------------------------------------------------
# DOC STRUCTURE HELPERS
# -----------------------------------------------------------
def _norm_alnum(s: str) -> str:
if s is None:
return ""
s = unicodedata.normalize("NFKD", str(s)).lower()
out = []
for ch in s:
if unicodedata.category(ch) == "Mn":
continue
if ch.isalnum():
out.append(ch)
return "".join(out)
def _walk_tabs(tabs):
if not tabs:
return
for t in tabs:
yield t
for x in _walk_tabs(t.get("childTabs") or []):
yield x
def _get_tab_body_content(doc, tab_id):
tabs = doc.get("tabs")
if not tabs:
return (doc.get("body") or {}).get("content", [])
for t in _walk_tabs(tabs):
tid = ((t.get("tabProperties") or {}).get("tabId")) or ""
if tid == (tab_id or ""):
dtab = t.get("documentTab") or {}
return (dtab.get("body") or {}).get("content", [])
return []
def _iter_text_chars_from_body(doc, tab_id):
content = _get_tab_body_content(doc, tab_id)
def walk(content_list):
for el in content_list or []:
if not isinstance(el, dict):
continue
p = el.get("paragraph")
if p:
for pe in p.get("elements", []):
tr = pe.get("textRun")
if not tr:
continue
txt = tr.get("content", "")
st = pe.get("startIndex")
if st is None or txt is None:
continue
for i, ch in enumerate(txt):
yield int(st) + i, ch
continue
t = el.get("table")
if t:
for row in t.get("tableRows", []):
for cell in row.get("tableCells", []):
yield from walk(cell.get("content", []))
continue
toc = el.get("tableOfContents")
if toc:
yield from walk(toc.get("content", []))
continue
yield from walk(content)
def find_marker_in_body(doc, marker: str):
target = _norm_alnum(marker)
if not target:
return None
tabs = doc.get("tabs")
tab_ids = [None]
if tabs:
tab_ids = []
for t in _walk_tabs(tabs):
tid = ((t.get("tabProperties") or {}).get("tabId")) or ""
tab_ids.append(tid)
for tab_id in tab_ids:
norm_chars = []
norm_to_docidx = []
for doc_i, ch in _iter_text_chars_from_body(doc, tab_id):
if ch.isalnum():
norm_chars.append(ch.lower())
norm_to_docidx.append(int(doc_i))
norm_text = "".join(norm_chars)
pos = norm_text.find(target)
if pos != -1:
start_doc = norm_to_docidx[pos]
end_doc = norm_to_docidx[pos + len(target) - 1] + 1
return {
"tabId": tab_id,
"start": int(start_doc),
"end": int(end_doc),
}
return None
def _loc(index, tab_id=None):
d = {"index": int(index)}
if tab_id:
d["tabId"] = tab_id
return d
def _range(start, end, tab_id=None):
d = {"startIndex": int(start), "endIndex": int(end)}
if tab_id:
d["tabId"] = tab_id
return d
# -----------------------------------------------------------
# DATA PARSING / FORMAT HELPERS
# -----------------------------------------------------------
def normalizar_espacios(texto: str) -> str:
return re.sub(r"\s+", " ", str(texto or "").strip())
def parse_especificaciones(raw: Any) -> List[Dict[str, str]]:
if isinstance(raw, list):
return raw
if raw is None:
return []
texto = str(raw).strip()
if not texto:
return []
try:
data = json.loads(texto)
if isinstance(data, list):
return data
except Exception:
pass
try:
data = ast.literal_eval(texto)
if isinstance(data, list):
return data
except Exception:
pass
raise RuntimeError("No se pudo interpretar especificaciones_tecnicas como JSON ni como lista Python.")
def dividir_en_bullets(texto: str) -> List[str]:
texto = normalizar_espacios(texto)
if not texto:
return []
partes = re.split(r"(?<=[.!?])\s+(?=(?:[A-ZÁÉÍÓÚÑÜ]|\d))", texto)
return [p.strip() for p in partes if p.strip()]
def construir_bloque_formateado(especificaciones_tecnicas: List[Dict[str, str]]):
partes: List[str] = []
rangos_titulos: List[Tuple[int, int]] = []
rangos_bullets: List[Tuple[int, int]] = []
cursor = 0
items_validos = []
for item in especificaciones_tecnicas or []:
clave = normalizar_espacios((item or {}).get("clave", ""))
valor = (item or {}).get("valor", "")
if clave:
items_validos.append({"clave": clave, "valor": valor})
for i, item in enumerate(items_validos):
titulo_completo = item["clave"]
bullets = dividir_en_bullets(item.get("valor", ""))
ini_titulo = cursor
partes.append(titulo_completo)
cursor += len(titulo_completo)
fin_titulo = cursor
rangos_titulos.append((ini_titulo, fin_titulo))
partes.append("\n")
cursor += 1
for bullet in bullets:
ini_bullet = cursor
partes.append(bullet)
cursor += len(bullet)
fin_bullet = cursor
rangos_bullets.append((ini_bullet, fin_bullet))
partes.append("\n")
cursor += 1
if i < len(items_validos) - 1:
partes.append("\n")
cursor += 1
return "".join(partes), rangos_titulos, rangos_bullets
# -----------------------------------------------------------
# MAIN ACTION
# -----------------------------------------------------------
def reemplazar_detalles_tecnicos(
docs_service,
document_id: str,
marcador: str,
especificaciones_tecnicas: List[Dict[str, str]],
):
documento = docs_get(docs_service, document_id)
hit = find_marker_in_body(documento, marcador)
if not hit:
raise RuntimeError(f"No se encontró el marcador en el BODY del documento: {marcador}")
marcador_inicio = int(hit["start"])
marcador_fin = int(hit["end"])
tab_id = hit.get("tabId")
texto_insertar, rangos_titulos, rangos_bullets = construir_bloque_formateado(especificaciones_tecnicas)
if not texto_insertar.strip():
raise RuntimeError("especificaciones_tecnicas está vacío o no contiene elementos válidos.")
docs_service.documents().batchUpdate(
documentId=document_id,
body={
"requests": [
{
"deleteContentRange": {
"range": _range(marcador_inicio, marcador_fin, tab_id)
}
}
]
},
).execute()
docs_service.documents().batchUpdate(
documentId=document_id,
body={
"requests": [
{
"insertText": {
"location": _loc(marcador_inicio, tab_id),
"text": texto_insertar,
}
}
]
},
).execute()
requests_estilo = []
# 1) cada título queda como texto normal del documento, con su número real incluido y en negrita.
for ini, fin in rangos_titulos:
requests_estilo.append(
{
"updateTextStyle": {
"range": _range(marcador_inicio + ini, marcador_inicio + fin, tab_id),
"textStyle": {"bold": True},
"fields": "bold",
}
}
)
# 2) solo los detalles van con viñeta simple.
for ini, fin in rangos_bullets:
requests_estilo.append(
{
"createParagraphBullets": {
"range": _range(marcador_inicio + ini, marcador_inicio + fin + 1, tab_id),
"bulletPreset": SIMPLE_BULLET_PRESET,
}
}
)
if requests_estilo:
docs_service.documents().batchUpdate(
documentId=document_id,
body={"requests": requests_estilo},
).execute()
return {
"ok": True,
"document_id": document_id,
"marcador": marcador,
"titulos_formateados": len(rangos_titulos),
"bullets_formateados": len(rangos_bullets),
"texto_insertado": texto_insertar,
}
# Rocketbot quirk FIX
globals().update(locals())
# -----------------------------------------------------------
# MAIN
# -----------------------------------------------------------
try:
_sv("error_detalles_tecnicos", "")
url_doc = _gvs("current_url", "")
if url_doc == "":
raise RuntimeError("Falta current_url con la URL exacta del documento a procesar.")
raw_id = extract_doc_id_from_url(url_doc)
if not raw_id:
raise RuntimeError("No pude extraer documentId de la URL.")
cred_path = _gvs("gdoc_sa_json", "")
if cred_path == "":
raise RuntimeError("Falta la variable gdoc_sa_json.")
if not os.path.isabs(cred_path):
cred_path = os.path.join(base_dir, cred_path)
if not os.path.exists(cred_path):
raise RuntimeError("No existe credentials.json: " + cred_path)
token_path = _gvs("gdoc_token_json", "")
if token_path == "":
token_path = os.path.join(base_dir, "credentials", "token_gdocs_drive.json")
if not os.path.isabs(token_path):
token_path = os.path.join(base_dir, token_path)
docs_service, drive_service, auth_mode = get_services(cred_path, token_path)
_sv("gdoc_auth_mode", auth_mode)
doc_id, meta_file, converted = ensure_docs_api_compatible(drive_service, raw_id)
_sv("gdoc_id", doc_id)
_sv("gdoc_converted", "1" if converted else "0")
_sv("gdoc_url_final", f"https://docs.google.com/document/d/{doc_id}/edit")
_sv("gdoc_name", meta_file.get("name", ""))
marcador = _gvs("marker_detalles_tecnicos", MARCADOR_DEFAULT)
_sv("gdoc_marker_detalles_tecnicos_used", marcador)
raw_especificaciones = globals().get("especificaciones_tecnicas", None)
if raw_especificaciones is None:
raw_especificaciones = _gv("especificaciones_tecnicas", None)
especificaciones_tecnicas = parse_especificaciones(raw_especificaciones)
resultado = reemplazar_detalles_tecnicos(
docs_service=docs_service,
document_id=doc_id,
marcador=marcador,
especificaciones_tecnicas=especificaciones_tecnicas,
)
_sv("status_proceso", "DETALLES_TECNICOS insertado correctamente")
_sv("detalles_tecnicos_insertados", "1")
_sv("detalles_tecnicos_resultado", json.dumps(resultado, ensure_ascii=False))
print("DETALLES_TECNICOS insertado correctamente")
print(json.dumps(resultado, ensure_ascii=False, indent=2))
except Exception as e:
import traceback
error_msg = f"Error en script: {str(e)} - {traceback.format_exc()}"
print(error_msg)
_sv("status_proceso", "ERROR")
_sv("error_detalles_tecnicos", error_msg)
raise

View File

@@ -34,7 +34,7 @@ def rb_get_var(name, required=True, default=''):
SCOPES = {scopes_api_google} SCOPES = {scopes_api_google}
CREDENTIALS_PATH = rb_get_var('gdoc_sa_json2') CREDENTIALS_PATH = rb_get_var('gdoc_sa_json')
IMPERSONATED_USER = rb_get_var('gdoc_impersonated_user') IMPERSONATED_USER = rb_get_var('gdoc_impersonated_user')
@@ -46,7 +46,7 @@ def _load_json(path):
def get_drive_service(credentials_json_path=CREDENTIALS_PATH, impersonated_user=IMPERSONATED_USER): def get_drive_service(credentials_json_path=CREDENTIALS_PATH, impersonated_user=IMPERSONATED_USER):
info = _load_json(credentials_json_path) info = _load_json(credentials_json_path)
if not isinstance(info, dict) or info.get('type') != 'service_account': if not isinstance(info, dict) or info.get('type') != 'service_account':
raise RuntimeError('gdoc_sa_json2 debe apuntar a un JSON de cuenta de servicio.') raise RuntimeError('gdoc_sa_json debe apuntar a un JSON de cuenta de servicio.')
impersonated_user = (impersonated_user or '').strip() impersonated_user = (impersonated_user or '').strip()
if not impersonated_user: if not impersonated_user:
@@ -148,6 +148,8 @@ try:
# Retornar valores (Rocketbot compatible) # Retornar valores (Rocketbot compatible)
try: try:
SetVar('current_url', doc_url) SetVar('current_url', doc_url)
SetVar('gdoc_id', copied_doc['id'])
SetVar('gdoc_url_final', doc_url)
SetVar('doc_copy_created', True) SetVar('doc_copy_created', True)
except NameError: except NameError:
pass # Si no está en Rocketbot, solo imprime pass # Si no está en Rocketbot, solo imprime

View File

@@ -1,12 +1,10 @@
from google.auth.transport.requests import Request
from google_auth_oauthlib.flow import InstalledAppFlow
from google.oauth2.credentials import Credentials
from google.oauth2 import service_account from google.oauth2 import service_account
from googleapiclient.discovery import build from googleapiclient.discovery import build
import os import os
import sys import sys
import json import json
import re
import base64 import base64
from email.mime.base import MIMEBase from email.mime.base import MIMEBase
from email.mime.multipart import MIMEMultipart from email.mime.multipart import MIMEMultipart
@@ -26,7 +24,31 @@ if os.path.isdir(libs_dir) and libs_dir not in sys.path:
SCOPES = {scopes_api_google} SCOPES = {scopes_api_google}
CREDENTIALS_PATH = '{gdoc_sa_json}' CREDENTIALS_PATH = '{gdoc_sa_json}'
TOKEN_PATH = '{gmail_token_json}' IMPERSONATED_USER = '{gdoc_impersonated_user}'
def rb_get_var(name, required=True, default=''):
try:
value = GetVar(name)
except NameError:
value = os.environ.get(name, default)
if value is None:
value = ''
value = str(value).strip()
if value == f'{{{name}}}':
value = ''
if required and not value:
raise RuntimeError(f'La variable Rocketbot "{name}" está vacía o no fue leída correctamente.')
return value
def extract_doc_id_from_url(url):
match = re.search(r"/document/d/([a-zA-Z0-9_-]+)", url or "")
return match.group(1) if match else ""
def _load_json(path): def _load_json(path):
@@ -34,39 +56,17 @@ def _load_json(path):
return json.load(f) return json.load(f)
def get_drive_and_gmail_services(credentials_json_path=CREDENTIALS_PATH, token_json_path=TOKEN_PATH): def get_drive_and_gmail_services(credentials_json_path=CREDENTIALS_PATH, impersonated_user=IMPERSONATED_USER):
info = _load_json(credentials_json_path) info = _load_json(credentials_json_path)
if not isinstance(info, dict) or info.get('type') != 'service_account':
raise RuntimeError('gdoc_sa_json debe apuntar a un JSON de cuenta de servicio.')
impersonated_user = (impersonated_user or '').strip()
if not impersonated_user:
raise RuntimeError('Falta la variable gdoc_impersonated_user.')
# Service account
if isinstance(info, dict) and info.get('type') == 'service_account':
creds = service_account.Credentials.from_service_account_file( creds = service_account.Credentials.from_service_account_file(
credentials_json_path, scopes=SCOPES) credentials_json_path, scopes=SCOPES, subject=impersonated_user)
drive_service = build(
'drive', 'v3', credentials=creds, cache_discovery=False)
gmail_service = build(
'gmail', 'v1', credentials=creds, cache_discovery=False)
return drive_service, gmail_service
# OAuth flow
creds = None
if os.path.exists(token_json_path):
creds = Credentials.from_authorized_user_file(token_json_path, SCOPES)
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file(
credentials_json_path, SCOPES)
try:
creds = flow.run_local_server(port=0)
except Exception:
creds = flow.run_console()
os.makedirs(os.path.dirname(token_json_path), exist_ok=True)
with open(token_json_path, 'w', encoding='utf-8') as f:
f.write(creds.to_json())
drive_service = build( drive_service = build(
'drive', 'v3', credentials=creds, cache_discovery=False) 'drive', 'v3', credentials=creds, cache_discovery=False)
gmail_service = build( gmail_service = build(
@@ -121,10 +121,20 @@ def send_email(gmail_service, to_emails, subject, body_text, attachment_bytes, a
drive_service, gmail_service = get_drive_and_gmail_services( drive_service, gmail_service = get_drive_and_gmail_services(
CREDENTIALS_PATH, TOKEN_PATH) CREDENTIALS_PATH, IMPERSONATED_USER)
# Id del documento de Google Drive a enviar por email # Id del documento de Google Drive a enviar por email
DOC_ID = '{gdoc_id}' CURRENT_URL = rb_get_var('current_url')
DOC_ID = extract_doc_id_from_url(CURRENT_URL)
if not DOC_ID:
raise RuntimeError('No pude extraer documentId desde current_url: ' + CURRENT_URL)
try:
SetVar('gdoc_id', DOC_ID)
SetVar('gdoc_url_used', CURRENT_URL)
except NameError:
pass
print(DOC_ID) print(DOC_ID)
# Obtener nombre del documento de Google # Obtener nombre del documento de Google

View File

@@ -37,7 +37,7 @@ def _load_json(path):
def get_services(credentials_json_path, impersonated_user): def get_services(credentials_json_path, impersonated_user):
info = _load_json(credentials_json_path) info = _load_json(credentials_json_path)
if not isinstance(info, dict) or info.get("type") != "service_account": if not isinstance(info, dict) or info.get("type") != "service_account":
raise RuntimeError("gdoc_sa_json2 debe apuntar a un JSON de cuenta de servicio.") raise RuntimeError("gdoc_sa_json debe apuntar a un JSON de cuenta de servicio.")
impersonated_user = (impersonated_user or "").strip() impersonated_user = (impersonated_user or "").strip()
if not impersonated_user: if not impersonated_user:
@@ -294,9 +294,9 @@ try:
_sv("gdoc_original_id", raw_id) _sv("gdoc_original_id", raw_id)
cred_path = _gvs("gdoc_sa_json2", "") cred_path = _gvs("gdoc_sa_json", "")
if cred_path == "": if cred_path == "":
raise RuntimeError("Falta la variable gdoc_sa_json2.") raise RuntimeError("Falta la variable gdoc_sa_json.")
if not os.path.isabs(cred_path): if not os.path.isabs(cred_path):
cred_path = os.path.join(base_dir, cred_path) cred_path = os.path.join(base_dir, cred_path)

View File

@@ -542,7 +542,7 @@ def _load_json(path):
def get_services(credentials_json_path, impersonated_user): def get_services(credentials_json_path, impersonated_user):
info = _load_json(credentials_json_path) info = _load_json(credentials_json_path)
if not isinstance(info, dict) or info.get("type") != "service_account": if not isinstance(info, dict) or info.get("type") != "service_account":
raise RuntimeError("gdoc_sa_json2 debe apuntar a un JSON de cuenta de servicio.") raise RuntimeError("gdoc_sa_json debe apuntar a un JSON de cuenta de servicio.")
impersonated_user = (impersonated_user or "").strip() impersonated_user = (impersonated_user or "").strip()
if not impersonated_user: if not impersonated_user:
@@ -955,9 +955,9 @@ try:
_sv("gdoc_original_id", raw_id) _sv("gdoc_original_id", raw_id)
cred_path = _gvs("gdoc_sa_json2", "") cred_path = _gvs("gdoc_sa_json", "")
if cred_path == "": if cred_path == "":
raise RuntimeError("Falta la variable gdoc_sa_json2.") raise RuntimeError("Falta la variable gdoc_sa_json.")
if not os.path.isabs(cred_path): if not os.path.isabs(cred_path):
cred_path = os.path.join(base_dir, cred_path) cred_path = os.path.join(base_dir, cred_path)

View File

@@ -338,7 +338,7 @@ def _load_json(path): #Función auxiliar para cargar un archivo JSON
def get_services(credentials_json_path, impersonated_user): #Funci?n auxiliar para obtener los servicios de Google def get_services(credentials_json_path, impersonated_user): #Funci?n auxiliar para obtener los servicios de Google
info = _load_json(credentials_json_path) # carga el archivo JSON info = _load_json(credentials_json_path) # carga el archivo JSON
if not isinstance(info, dict) or info.get("type") != "service_account": if not isinstance(info, dict) or info.get("type") != "service_account":
raise RuntimeError("gdoc_sa_json2 debe apuntar a un JSON de cuenta de servicio.") raise RuntimeError("gdoc_sa_json debe apuntar a un JSON de cuenta de servicio.")
impersonated_user = (impersonated_user or "").strip() impersonated_user = (impersonated_user or "").strip()
if not impersonated_user: if not impersonated_user:
@@ -1223,9 +1223,9 @@ try:
marker = _gvs("gdoc_marker", "TABLA_PROFORMA") marker = _gvs("gdoc_marker", "TABLA_PROFORMA")
_sv("dbg_marker_var", marker) _sv("dbg_marker_var", marker)
cred_path = _gvs("gdoc_sa_json2", "") cred_path = _gvs("gdoc_sa_json", "")
if cred_path == "": if cred_path == "":
raise RuntimeError("Falta gdoc_sa_json2 (ruta al JSON de cuenta de servicio)") raise RuntimeError("Falta gdoc_sa_json (ruta al JSON de cuenta de servicio)")
if not os.path.isabs(cred_path): if not os.path.isabs(cred_path):
cred_path = os.path.join(base_dir, cred_path) cred_path = os.path.join(base_dir, cred_path)
if not os.path.exists(cred_path): if not os.path.exists(cred_path):
@@ -1239,8 +1239,6 @@ try:
docs_service, drive_service, auth_mode = get_services(cred_path, impersonated_user) docs_service, drive_service, auth_mode = get_services(cred_path, impersonated_user)
_sv("gdoc_auth_mode", auth_mode) _sv("gdoc_auth_mode", auth_mode)
url = _gvs("gdoc_url", "")
if url == "":
url = _gvs("current_url", "") url = _gvs("current_url", "")
_sv("gdoc_url_used", url) _sv("gdoc_url_used", url)

View File

@@ -485,41 +485,26 @@ def run():
with open(path, "r", encoding="utf-8") as f: with open(path, "r", encoding="utf-8") as f:
return json.load(f) return json.load(f)
def get_services(credentials_json_path, token_json_path): def get_services(credentials_json_path, impersonated_user):
from googleapiclient.discovery import build from googleapiclient.discovery import build
from google.oauth2 import service_account from google.oauth2 import service_account
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
info = _load_json(credentials_json_path) info = _load_json(credentials_json_path)
if not isinstance(info, dict) or info.get("type") != "service_account":
raise RuntimeError("gdoc_sa_json debe apuntar a un JSON de cuenta de servicio.")
if isinstance(info, dict) and info.get("type") == "service_account": impersonated_user = (impersonated_user or "").strip()
creds = service_account.Credentials.from_service_account_file(credentials_json_path, scopes=SCOPES) if not impersonated_user:
raise RuntimeError("Falta la variable gdoc_impersonated_user.")
creds = service_account.Credentials.from_service_account_file(
credentials_json_path,
scopes=SCOPES,
subject=impersonated_user,
)
docs = build("docs", "v1", credentials=creds, cache_discovery=False) docs = build("docs", "v1", credentials=creds, cache_discovery=False)
drive = build("drive", "v3", credentials=creds, cache_discovery=False) drive = build("drive", "v3", credentials=creds, cache_discovery=False)
return docs, drive, "service_account" return docs, drive, "service_account_impersonated"
creds = None
if os.path.exists(token_json_path):
creds = Credentials.from_authorized_user_file(token_json_path, SCOPES)
if (not creds) or (not creds.valid):
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file(credentials_json_path, SCOPES)
try:
creds = flow.run_local_server(port=0)
except Exception:
creds = flow.run_console()
os.makedirs(os.path.dirname(token_json_path), exist_ok=True)
with open(token_json_path, "w", encoding="utf-8") as f:
f.write(creds.to_json())
docs = build("docs", "v1", credentials=creds, cache_discovery=False)
drive = build("drive", "v3", credentials=creds, cache_discovery=False)
return docs, drive, "oauth"
def extract_doc_id_from_url(url): def extract_doc_id_from_url(url):
m = re.search(r"/document/d/([a-zA-Z0-9_-]+)", url or "") m = re.search(r"/document/d/([a-zA-Z0-9_-]+)", url or "")
@@ -1360,15 +1345,12 @@ def run():
if not os.path.isabs(cred_path): if not os.path.isabs(cred_path):
cred_path = os.path.join(base_dir, cred_path) cred_path = os.path.join(base_dir, cred_path)
if not os.path.exists(cred_path): if not os.path.exists(cred_path):
raise RuntimeError("No existe credentials.json: " + cred_path) raise RuntimeError("No existe JSON de cuenta de servicio: " + cred_path)
token_path = _gvs("gdoc_token_json", "") impersonated_user = _gvs("gdoc_impersonated_user", "")
if token_path == "":
token_path = os.path.join(base_dir, "credentials", "token_gdocs_drive.json")
if not os.path.isabs(token_path):
token_path = os.path.join(base_dir, token_path)
docs_service, drive_service, _ = get_services(cred_path, token_path) docs_service, drive_service, _ = get_services(cred_path, impersonated_user)
_sv("gdoc_impersonated_user_used", impersonated_user)
url = _gvs("current_url", "") url = _gvs("current_url", "")
if url == "": if url == "":

View File

@@ -24,9 +24,6 @@ for k in list(sys.modules.keys()):
from googleapiclient.discovery import build from googleapiclient.discovery import build
from google.oauth2 import service_account from google.oauth2 import service_account
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request as GRequest
# ----------------------------------------------------------- # -----------------------------------------------------------
# GOOGLE AUTH # GOOGLE AUTH
@@ -82,39 +79,23 @@ def _load_json(path):
def get_services(credentials_json_path, token_json_path): def get_services(credentials_json_path, impersonated_user):
info = _load_json(credentials_json_path) info = _load_json(credentials_json_path)
if not isinstance(info, dict) or info.get("type") != "service_account":
raise RuntimeError("gdoc_sa_json debe apuntar a un JSON de cuenta de servicio.")
impersonated_user = (impersonated_user or "").strip()
if not impersonated_user:
raise RuntimeError("Falta la variable gdoc_impersonated_user.")
if isinstance(info, dict) and info.get("type") == "service_account":
creds = service_account.Credentials.from_service_account_file( creds = service_account.Credentials.from_service_account_file(
credentials_json_path, credentials_json_path,
scopes=SCOPES, scopes=SCOPES,
subject=impersonated_user,
) )
docs = build("docs", "v1", credentials=creds, cache_discovery=False) docs = build("docs", "v1", credentials=creds, cache_discovery=False)
drive = build("drive", "v3", credentials=creds, cache_discovery=False) drive = build("drive", "v3", credentials=creds, cache_discovery=False)
return docs, drive, "service_account" return docs, drive, "service_account_impersonated"
creds = None
if os.path.exists(token_json_path):
creds = Credentials.from_authorized_user_file(token_json_path, SCOPES)
if (not creds) or (not creds.valid):
if creds and creds.expired and creds.refresh_token:
creds.refresh(GRequest())
else:
flow = InstalledAppFlow.from_client_secrets_file(credentials_json_path, SCOPES)
try:
creds = flow.run_local_server(port=0)
except Exception:
creds = flow.run_console()
os.makedirs(os.path.dirname(token_json_path), exist_ok=True)
with open(token_json_path, "w", encoding="utf-8") as f:
f.write(creds.to_json())
docs = build("docs", "v1", credentials=creds, cache_discovery=False)
drive = build("drive", "v3", credentials=creds, cache_discovery=False)
return docs, drive, "oauth"
@@ -497,16 +478,13 @@ try:
if not os.path.isabs(cred_path): if not os.path.isabs(cred_path):
cred_path = os.path.join(base_dir, cred_path) cred_path = os.path.join(base_dir, cred_path)
if not os.path.exists(cred_path): if not os.path.exists(cred_path):
raise RuntimeError("No existe credentials.json: " + cred_path) raise RuntimeError("No existe JSON de cuenta de servicio: " + cred_path)
token_path = _gvs("gdoc_token_json", "") impersonated_user = _gvs("gdoc_impersonated_user", "")
if token_path == "":
token_path = os.path.join(base_dir, "credentials", "token_gdocs_drive.json")
if not os.path.isabs(token_path):
token_path = os.path.join(base_dir, token_path)
docs_service, drive_service, auth_mode = get_services(cred_path, token_path) docs_service, drive_service, auth_mode = get_services(cred_path, impersonated_user)
_sv("gdoc_auth_mode", auth_mode) _sv("gdoc_auth_mode", auth_mode)
_sv("gdoc_impersonated_user_used", impersonated_user)
doc_id, meta_file, converted = ensure_docs_api_compatible(drive_service, raw_id) doc_id, meta_file, converted = ensure_docs_api_compatible(drive_service, raw_id)
_sv("gdoc_id", doc_id) _sv("gdoc_id", doc_id)

View File

@@ -1,11 +1,9 @@
from google.auth.transport.requests import Request
from google_auth_oauthlib.flow import InstalledAppFlow
from google.oauth2.credentials import Credentials
from google.oauth2 import service_account from google.oauth2 import service_account
from googleapiclient.discovery import build from googleapiclient.discovery import build
import os import os
import sys import sys
import json import json
import re
global Request, InstalledAppFlow, Credentials, service_account, build, os, sys, json, LISTA_BUSCAR_REEMPLAZAR, base_dir, libs_dir, SCOPES, CREDENTIALS_PATH, TOKEN_PATH, _load_json, get_google_services, replace_vars_doc, drive_service, docs_service, gdoc_id, lista_oficial_reemplazar, success global Request, InstalledAppFlow, Credentials, service_account, build, os, sys, json, LISTA_BUSCAR_REEMPLAZAR, base_dir, libs_dir, SCOPES, CREDENTIALS_PATH, TOKEN_PATH, _load_json, get_google_services, replace_vars_doc, drive_service, docs_service, gdoc_id, lista_oficial_reemplazar, success
@@ -26,7 +24,31 @@ SCOPES = {scopes_api_google}
CREDENTIALS_PATH = '{gdoc_sa_json}' CREDENTIALS_PATH = '{gdoc_sa_json}'
TOKEN_PATH = '{gdoc_token_json}' IMPERSONATED_USER = '{gdoc_impersonated_user}'
def rb_get_var(name, required=True, default=''):
try:
value = GetVar(name)
except NameError:
value = os.environ.get(name, default)
if value is None:
value = ''
value = str(value).strip()
if value == f'{{{name}}}':
value = ''
if required and not value:
raise RuntimeError(f'La variable Rocketbot "{name}" está vacía o no fue leída correctamente.')
return value
def extract_doc_id_from_url(url):
match = re.search(r"/document/d/([a-zA-Z0-9_-]+)", url or "")
return match.group(1) if match else ""
def _load_json(path): def _load_json(path):
@@ -34,39 +56,17 @@ def _load_json(path):
return json.load(f) return json.load(f)
def get_google_services(credentials_json_path=CREDENTIALS_PATH, token_json_path=TOKEN_PATH): def get_google_services(credentials_json_path=CREDENTIALS_PATH, impersonated_user=IMPERSONATED_USER):
info = _load_json(credentials_json_path) info = _load_json(credentials_json_path)
if not isinstance(info, dict) or info.get('type') != 'service_account':
raise RuntimeError('gdoc_sa_json debe apuntar a un JSON de cuenta de servicio.')
impersonated_user = (impersonated_user or '').strip()
if not impersonated_user:
raise RuntimeError('Falta la variable gdoc_impersonated_user.')
# Service account
if isinstance(info, dict) and info.get('type') == 'service_account':
creds = service_account.Credentials.from_service_account_file( creds = service_account.Credentials.from_service_account_file(
credentials_json_path, scopes=SCOPES) credentials_json_path, scopes=SCOPES, subject=impersonated_user)
drive_service = build(
'drive', 'v3', credentials=creds, cache_discovery=False)
docs_service = build(
'docs', 'v1', credentials=creds, cache_discovery=False)
return drive_service, docs_service
# OAuth flow
creds = None
if os.path.exists(token_json_path):
creds = Credentials.from_authorized_user_file(token_json_path, SCOPES)
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file(
credentials_json_path, SCOPES)
try:
creds = flow.run_local_server(port=0)
except Exception:
creds = flow.run_console()
os.makedirs(os.path.dirname(token_json_path), exist_ok=True)
with open(token_json_path, 'w', encoding='utf-8') as f:
f.write(creds.to_json())
drive_service = build( drive_service = build(
'drive', 'v3', credentials=creds, cache_discovery=False) 'drive', 'v3', credentials=creds, cache_discovery=False)
docs_service = build( docs_service = build(
@@ -114,14 +114,23 @@ def replace_vars_doc(docs_service, gdoc_id, replacements_values):
# Iniciar servicios (Solo Drive y Docs) # Iniciar servicios (Solo Drive y Docs)
drive_service, docs_service = get_google_services(CREDENTIALS_PATH, TOKEN_PATH) drive_service, docs_service = get_google_services(CREDENTIALS_PATH, IMPERSONATED_USER)
# Variables de entrada (esto usualmente viene de Rocketbot via GetVar) # Variables de entrada (esto usualmente viene de Rocketbot via GetVar)
gdoc_id = '{gdoc_id}' current_url = rb_get_var('current_url')
gdoc_id = extract_doc_id_from_url(current_url)
if not gdoc_id:
raise RuntimeError('No pude extraer documentId desde current_url: ' + current_url)
lista_oficial_reemplazar = {lista_oficial_reemplazar} lista_oficial_reemplazar = {lista_oficial_reemplazar}
try: try:
try:
SetVar('gdoc_id', gdoc_id)
SetVar('gdoc_url_used', current_url)
except NameError:
pass
# Ejecutar reemplazo si se cuenta con la información # Ejecutar reemplazo si se cuenta con la información
if gdoc_id and lista_oficial_reemplazar: if gdoc_id and lista_oficial_reemplazar:
success = replace_vars_doc(docs_service, gdoc_id, lista_oficial_reemplazar) success = replace_vars_doc(docs_service, gdoc_id, lista_oficial_reemplazar)

View File

@@ -1,393 +0,0 @@
from __future__ import annotations
import warnings
warnings.simplefilter("ignore", ResourceWarning)
import os
import sys
import json
import re
# --------------------
# Boot libs (Rocketbot portable)
# --------------------
base_dir = os.path.dirname(sys.executable)
libs_dir = os.path.join(base_dir, "py_libs", "py310")
sys.path.insert(0, libs_dir)
for k in list(sys.modules.keys()):
if k == "pyparsing" or k.startswith("pyparsing."):
del sys.modules[k]
from googleapiclient.discovery import build
from google.oauth2 import service_account
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request as GRequest
# -----------------------------------------------------------
# GOOGLE AUTH
# -----------------------------------------------------------
SCOPES = {scopes_api_google}
def _load_json(path):
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
def get_services(credentials_json_path, token_json_path):
info = _load_json(credentials_json_path)
if isinstance(info, dict) and info.get("type") == "service_account":
creds = service_account.Credentials.from_service_account_file(
credentials_json_path,
scopes=SCOPES,
)
docs = build("docs", "v1", credentials=creds, cache_discovery=False)
drive = build("drive", "v3", credentials=creds, cache_discovery=False)
return docs, drive, "service_account"
creds = None
if os.path.exists(token_json_path):
creds = Credentials.from_authorized_user_file(token_json_path, SCOPES)
if (not creds) or (not creds.valid):
if creds and creds.expired and creds.refresh_token:
creds.refresh(GRequest())
else:
flow = InstalledAppFlow.from_client_secrets_file(credentials_json_path, SCOPES)
try:
creds = flow.run_local_server(port=0)
except Exception:
creds = flow.run_console()
os.makedirs(os.path.dirname(token_json_path), exist_ok=True)
with open(token_json_path, "w", encoding="utf-8") as f:
f.write(creds.to_json())
docs = build("docs", "v1", credentials=creds, cache_discovery=False)
drive = build("drive", "v3", credentials=creds, cache_discovery=False)
return docs, drive, "oauth"
# -----------------------------------------------------------
# ROCKETBOT VAR HELPERS
# -----------------------------------------------------------
def _sv(name, value):
try:
SetVar(name, value)
except Exception:
pass
def _missing(v) -> bool:
if v is None:
return True
if isinstance(v, str):
s = v.strip()
return s == "" or s == "ERROR_NOT_VAR"
return False
def _gv(name, default=None):
try:
v = GetVar(name)
except Exception:
return default
return default if _missing(v) else v
def _gvs(name, default="") -> str:
v = _gv(name, default)
if v is None:
return default
s = str(v).strip()
return default if (s == "" or s == "ERROR_NOT_VAR") else s
# -----------------------------------------------------------
# DRIVE / DOC HELPERS
# -----------------------------------------------------------
def extract_doc_id_from_url(url):
m = re.search(r"/document/d/([a-zA-Z0-9_-]+)", url or "")
return m.group(1) if m else ""
def ensure_docs_api_compatible(drive_service, file_id: str):
meta = drive_service.files().get(
fileId=file_id,
fields="id,name,mimeType,shortcutDetails",
supportsAllDrives=True,
).execute()
if meta.get("mimeType") == "application/vnd.google-apps.shortcut":
target = (meta.get("shortcutDetails") or {}).get("targetId", "")
if not target:
raise RuntimeError("Es shortcut pero no trae targetId.")
file_id = target
meta = drive_service.files().get(
fileId=file_id,
fields="id,name,mimeType",
supportsAllDrives=True,
).execute()
if meta.get("mimeType") != "application/vnd.google-apps.document":
new_name = (meta.get("name") or "Documento") + " (Google Docs)"
converted = drive_service.files().copy(
fileId=file_id,
body={"name": new_name, "mimeType": "application/vnd.google-apps.document"},
fields="id,name,mimeType",
supportsAllDrives=True,
).execute()
return converted["id"], converted, True
return file_id, meta, False
def docs_get(docs_service, doc_id: str):
return docs_service.documents().get(documentId=doc_id).execute()
# -----------------------------------------------------------
# BUSINESS LOGIC
# -----------------------------------------------------------
def obtener_textos_reemplazo(firma_digital: str):
firma_digital = (firma_digital or "").strip().upper()
if firma_digital == "V":
texto_clausula = (
"Forman parte integrante e inseparable del presente contrato los Anexos "
"que se detallan a continuación, los cuales se consideran plenamente "
"conocidos y aceptados por las partes con la sola suscripción del "
"presente instrumento. "
)
texto_firmas_secundarias = ""
else:
texto_clausula = (
"Porción del presente contrato debiendo ser firmados como parte "
"integrante los siguientes ANEXOS:\n"
)
texto_firmas_secundarias = (
"___________________________________\t\t_________________________________\n"
"COMPRADOR\t\t\t\t\t\tVENDEDOR"
)
return texto_clausula, texto_firmas_secundarias
def obtener_texto_e_indices(documento: dict):
texto = []
indices = []
def recorrer_elementos(elementos):
for el in elementos or []:
parrafo = el.get("paragraph")
if parrafo:
for pe in parrafo.get("elements", []):
text_run = pe.get("textRun")
if not text_run:
continue
contenido = text_run.get("content", "")
start_index = pe.get("startIndex")
if start_index is None:
continue
for i, ch in enumerate(contenido):
texto.append(ch)
indices.append(start_index + i)
tabla = el.get("table")
if tabla:
for fila in tabla.get("tableRows", []):
for celda in fila.get("tableCells", []):
recorrer_elementos(celda.get("content", []))
toc = el.get("tableOfContents")
if toc:
recorrer_elementos(toc.get("content", []))
recorrer_elementos((documento.get("body") or {}).get("content", []))
return "".join(texto), indices
def quitar_negrita_marker_y_siguiente_char(docs_service, document_id: str, marker_text: str):
documento = docs_get(docs_service, document_id)
texto_completo, indices = obtener_texto_e_indices(documento)
if not marker_text:
return 0
requests = []
pos = 0
while True:
idx = texto_completo.find(marker_text, pos)
if idx == -1:
break
start_index = indices[idx]
end_pos = idx + len(marker_text)
if end_pos < len(indices):
end_index = indices[end_pos] + 1
else:
end_index = indices[end_pos - 1] + 1
requests.append({
"updateTextStyle": {
"range": {
"startIndex": start_index,
"endIndex": end_index,
},
"textStyle": {
"bold": False,
},
"fields": "bold",
}
})
pos = idx + len(marker_text)
if requests:
docs_service.documents().batchUpdate(
documentId=document_id,
body={"requests": requests},
).execute()
return len(requests)
def reemplazar_clausula_y_firmas(docs_service, document_id: str, firma_digital: str,
marker_clausula: str = "CLAUSULA_FIRMA_DIGITAL",
marker_firmas: str = "FIRMAS_SECUNDARIAS"):
texto_clausula, texto_firmas_secundarias = obtener_textos_reemplazo(firma_digital)
requests = [
{
"replaceAllText": {
"containsText": {
"text": marker_clausula,
"matchCase": True,
},
"replaceText": texto_clausula,
}
},
{
"replaceAllText": {
"containsText": {
"text": marker_firmas,
"matchCase": True,
},
"replaceText": texto_firmas_secundarias,
}
},
]
return docs_service.documents().batchUpdate(
documentId=document_id,
body={"requests": requests},
).execute()
# Rocketbot quirk FIX
globals().update(locals())
# -----------------------------------------------------------
# MAIN
# -----------------------------------------------------------
try:
_sv("error_firma_digital", "")
url_doc = _gvs("current_url", "")
if url_doc == "":
raise RuntimeError("Falta current_url con la URL exacta del documento a procesar.")
_sv("debug_current_url_var", url_doc)
raw_id = extract_doc_id_from_url(url_doc)
if not raw_id:
raise RuntimeError("No pude extraer documentId de la URL.")
_sv("gdoc_original_id", raw_id)
cred_path = _gvs("gdoc_sa_json", "")
if cred_path == "":
raise RuntimeError("Falta la variable gdoc_sa_json.")
if not os.path.isabs(cred_path):
cred_path = os.path.join(base_dir, cred_path)
if not os.path.exists(cred_path):
raise RuntimeError("No existe credentials.json: " + cred_path)
token_path = _gvs("gdoc_token_json", "")
if token_path == "":
token_path = os.path.join(base_dir, "credentials", "token_gdocs_drive.json")
if not os.path.isabs(token_path):
token_path = os.path.join(base_dir, token_path)
docs_service, drive_service, auth_mode = get_services(cred_path, token_path)
_sv("gdoc_auth_mode", auth_mode)
doc_id, meta_file, converted = ensure_docs_api_compatible(drive_service, raw_id)
_sv("gdoc_id", doc_id)
_sv("gdoc_converted", "1" if converted else "0")
_sv("gdoc_url_final", f"https://docs.google.com/document/d/{doc_id}/edit")
_sv("gdoc_name", meta_file.get("name", ""))
doc_before = docs_get(docs_service, doc_id)
_sv("gdoc_revision_before", doc_before.get("revisionId", ""))
firma_digital = _gvs("firma_digital", "")
marker_clausula = _gvs("marker_clausula_firma_digital", "CLAUSULA_FIRMA_DIGITAL")
marker_firmas = _gvs("marker_firmas_secundarias", "FIRMAS_SECUNDARIAS")
if firma_digital == "":
raise RuntimeError("Falta la variable firma_digital.")
_sv("firma_digital_usada", firma_digital)
_sv("marker_clausula_firma_digital_usado", marker_clausula)
_sv("marker_firmas_secundarias_usado", marker_firmas)
ocurrencias_desnegritadas = quitar_negrita_marker_y_siguiente_char(
docs_service=docs_service,
document_id=doc_id,
marker_text=marker_clausula,
)
_sv("marker_clausula_desnegritado_previo", str(ocurrencias_desnegritadas))
resultado = reemplazar_clausula_y_firmas(
docs_service=docs_service,
document_id=doc_id,
firma_digital=firma_digital,
marker_clausula=marker_clausula,
marker_firmas=marker_firmas,
)
replies = resultado.get("replies") or []
ocurrencias_clausula = 0
ocurrencias_firmas = 0
if len(replies) > 0:
ocurrencias_clausula = int(((replies[0] or {}).get("replaceAllText") or {}).get("occurrencesChanged", 0))
if len(replies) > 1:
ocurrencias_firmas = int(((replies[1] or {}).get("replaceAllText") or {}).get("occurrencesChanged", 0))
_sv("reemplazos_clausula_firma_digital", str(ocurrencias_clausula))
_sv("reemplazos_firmas_secundarias", str(ocurrencias_firmas))
_sv("status_proceso", "Cláusula y firmas reemplazadas correctamente")
print("Reemplazos realizados correctamente.")
print(resultado)
except Exception as e:
import traceback
error_msg = f"Error en script: {str(e)} - {traceback.format_exc()}"
print(error_msg)
_sv("status_proceso", "ERROR")
_sv("error_firma_digital", error_msg)
raise