agrego el scriot responsable de personalizar el contrato dependiendo del tipo de firma ya se digital o no
This commit is contained in:
393
firma_digital_personalizacion.py
Normal file
393
firma_digital_personalizacion.py
Normal file
@@ -0,0 +1,393 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import warnings
|
||||
warnings.simplefilter("ignore", ResourceWarning)
|
||||
|
||||
import os
|
||||
import sys
|
||||
import json
|
||||
import re
|
||||
|
||||
# --------------------
|
||||
# Boot libs (Rocketbot portable)
|
||||
# --------------------
|
||||
base_dir = os.path.dirname(sys.executable)
|
||||
libs_dir = os.path.join(base_dir, "py_libs", "py310")
|
||||
sys.path.insert(0, libs_dir)
|
||||
|
||||
for k in list(sys.modules.keys()):
|
||||
if k == "pyparsing" or k.startswith("pyparsing."):
|
||||
del sys.modules[k]
|
||||
|
||||
from googleapiclient.discovery import build
|
||||
from google.oauth2 import service_account
|
||||
from google.oauth2.credentials import Credentials
|
||||
from google_auth_oauthlib.flow import InstalledAppFlow
|
||||
from google.auth.transport.requests import Request as GRequest
|
||||
|
||||
|
||||
# -----------------------------------------------------------
|
||||
# GOOGLE AUTH
|
||||
# -----------------------------------------------------------
|
||||
SCOPES = {scopes_api_google}
|
||||
|
||||
|
||||
def _load_json(path):
|
||||
with open(path, "r", encoding="utf-8") as f:
|
||||
return json.load(f)
|
||||
|
||||
|
||||
def get_services(credentials_json_path, token_json_path):
|
||||
info = _load_json(credentials_json_path)
|
||||
|
||||
if isinstance(info, dict) and info.get("type") == "service_account":
|
||||
creds = service_account.Credentials.from_service_account_file(
|
||||
credentials_json_path,
|
||||
scopes=SCOPES,
|
||||
)
|
||||
docs = build("docs", "v1", credentials=creds, cache_discovery=False)
|
||||
drive = build("drive", "v3", credentials=creds, cache_discovery=False)
|
||||
return docs, drive, "service_account"
|
||||
|
||||
creds = None
|
||||
if os.path.exists(token_json_path):
|
||||
creds = Credentials.from_authorized_user_file(token_json_path, SCOPES)
|
||||
|
||||
if (not creds) or (not creds.valid):
|
||||
if creds and creds.expired and creds.refresh_token:
|
||||
creds.refresh(GRequest())
|
||||
else:
|
||||
flow = InstalledAppFlow.from_client_secrets_file(credentials_json_path, SCOPES)
|
||||
try:
|
||||
creds = flow.run_local_server(port=0)
|
||||
except Exception:
|
||||
creds = flow.run_console()
|
||||
|
||||
os.makedirs(os.path.dirname(token_json_path), exist_ok=True)
|
||||
with open(token_json_path, "w", encoding="utf-8") as f:
|
||||
f.write(creds.to_json())
|
||||
|
||||
docs = build("docs", "v1", credentials=creds, cache_discovery=False)
|
||||
drive = build("drive", "v3", credentials=creds, cache_discovery=False)
|
||||
return docs, drive, "oauth"
|
||||
|
||||
|
||||
# -----------------------------------------------------------
|
||||
# ROCKETBOT VAR HELPERS
|
||||
# -----------------------------------------------------------
|
||||
def _sv(name, value):
|
||||
try:
|
||||
SetVar(name, value)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
def _missing(v) -> bool:
|
||||
if v is None:
|
||||
return True
|
||||
if isinstance(v, str):
|
||||
s = v.strip()
|
||||
return s == "" or s == "ERROR_NOT_VAR"
|
||||
return False
|
||||
|
||||
|
||||
def _gv(name, default=None):
|
||||
try:
|
||||
v = GetVar(name)
|
||||
except Exception:
|
||||
return default
|
||||
return default if _missing(v) else v
|
||||
|
||||
|
||||
def _gvs(name, default="") -> str:
|
||||
v = _gv(name, default)
|
||||
if v is None:
|
||||
return default
|
||||
s = str(v).strip()
|
||||
return default if (s == "" or s == "ERROR_NOT_VAR") else s
|
||||
|
||||
|
||||
# -----------------------------------------------------------
|
||||
# DRIVE / DOC HELPERS
|
||||
# -----------------------------------------------------------
|
||||
def extract_doc_id_from_url(url):
|
||||
m = re.search(r"/document/d/([a-zA-Z0-9_-]+)", url or "")
|
||||
return m.group(1) if m else ""
|
||||
|
||||
|
||||
|
||||
def ensure_docs_api_compatible(drive_service, file_id: str):
|
||||
meta = drive_service.files().get(
|
||||
fileId=file_id,
|
||||
fields="id,name,mimeType,shortcutDetails",
|
||||
supportsAllDrives=True,
|
||||
).execute()
|
||||
|
||||
if meta.get("mimeType") == "application/vnd.google-apps.shortcut":
|
||||
target = (meta.get("shortcutDetails") or {}).get("targetId", "")
|
||||
if not target:
|
||||
raise RuntimeError("Es shortcut pero no trae targetId.")
|
||||
file_id = target
|
||||
meta = drive_service.files().get(
|
||||
fileId=file_id,
|
||||
fields="id,name,mimeType",
|
||||
supportsAllDrives=True,
|
||||
).execute()
|
||||
|
||||
if meta.get("mimeType") != "application/vnd.google-apps.document":
|
||||
new_name = (meta.get("name") or "Documento") + " (Google Docs)"
|
||||
converted = drive_service.files().copy(
|
||||
fileId=file_id,
|
||||
body={"name": new_name, "mimeType": "application/vnd.google-apps.document"},
|
||||
fields="id,name,mimeType",
|
||||
supportsAllDrives=True,
|
||||
).execute()
|
||||
return converted["id"], converted, True
|
||||
|
||||
return file_id, meta, False
|
||||
|
||||
|
||||
|
||||
def docs_get(docs_service, doc_id: str):
|
||||
return docs_service.documents().get(documentId=doc_id).execute()
|
||||
|
||||
|
||||
# -----------------------------------------------------------
|
||||
# BUSINESS LOGIC
|
||||
# -----------------------------------------------------------
|
||||
def obtener_textos_reemplazo(firma_digital: str):
|
||||
firma_digital = (firma_digital or "").strip().upper()
|
||||
|
||||
if firma_digital == "V":
|
||||
texto_clausula = (
|
||||
"Forman parte integrante e inseparable del presente contrato los Anexos "
|
||||
"que se detallan a continuación, los cuales se consideran plenamente "
|
||||
"conocidos y aceptados por las partes con la sola suscripción del "
|
||||
"presente instrumento. "
|
||||
)
|
||||
texto_firmas_secundarias = ""
|
||||
else:
|
||||
texto_clausula = (
|
||||
"Porción del presente contrato debiendo ser firmados como parte "
|
||||
"integrante los siguientes ANEXOS:\n"
|
||||
)
|
||||
texto_firmas_secundarias = (
|
||||
"___________________________________\t\t_________________________________\n"
|
||||
"COMPRADOR\t\t\t\t\t\tVENDEDOR"
|
||||
)
|
||||
|
||||
return texto_clausula, texto_firmas_secundarias
|
||||
|
||||
|
||||
|
||||
def obtener_texto_e_indices(documento: dict):
|
||||
texto = []
|
||||
indices = []
|
||||
|
||||
def recorrer_elementos(elementos):
|
||||
for el in elementos or []:
|
||||
parrafo = el.get("paragraph")
|
||||
if parrafo:
|
||||
for pe in parrafo.get("elements", []):
|
||||
text_run = pe.get("textRun")
|
||||
if not text_run:
|
||||
continue
|
||||
contenido = text_run.get("content", "")
|
||||
start_index = pe.get("startIndex")
|
||||
if start_index is None:
|
||||
continue
|
||||
for i, ch in enumerate(contenido):
|
||||
texto.append(ch)
|
||||
indices.append(start_index + i)
|
||||
|
||||
tabla = el.get("table")
|
||||
if tabla:
|
||||
for fila in tabla.get("tableRows", []):
|
||||
for celda in fila.get("tableCells", []):
|
||||
recorrer_elementos(celda.get("content", []))
|
||||
|
||||
toc = el.get("tableOfContents")
|
||||
if toc:
|
||||
recorrer_elementos(toc.get("content", []))
|
||||
|
||||
recorrer_elementos((documento.get("body") or {}).get("content", []))
|
||||
return "".join(texto), indices
|
||||
|
||||
|
||||
|
||||
def quitar_negrita_marker_y_siguiente_char(docs_service, document_id: str, marker_text: str):
|
||||
documento = docs_get(docs_service, document_id)
|
||||
texto_completo, indices = obtener_texto_e_indices(documento)
|
||||
|
||||
if not marker_text:
|
||||
return 0
|
||||
|
||||
requests = []
|
||||
pos = 0
|
||||
while True:
|
||||
idx = texto_completo.find(marker_text, pos)
|
||||
if idx == -1:
|
||||
break
|
||||
|
||||
start_index = indices[idx]
|
||||
end_pos = idx + len(marker_text)
|
||||
if end_pos < len(indices):
|
||||
end_index = indices[end_pos] + 1
|
||||
else:
|
||||
end_index = indices[end_pos - 1] + 1
|
||||
|
||||
requests.append({
|
||||
"updateTextStyle": {
|
||||
"range": {
|
||||
"startIndex": start_index,
|
||||
"endIndex": end_index,
|
||||
},
|
||||
"textStyle": {
|
||||
"bold": False,
|
||||
},
|
||||
"fields": "bold",
|
||||
}
|
||||
})
|
||||
pos = idx + len(marker_text)
|
||||
|
||||
if requests:
|
||||
docs_service.documents().batchUpdate(
|
||||
documentId=document_id,
|
||||
body={"requests": requests},
|
||||
).execute()
|
||||
|
||||
return len(requests)
|
||||
|
||||
|
||||
def reemplazar_clausula_y_firmas(docs_service, document_id: str, firma_digital: str,
|
||||
marker_clausula: str = "CLAUSULA_FIRMA_DIGITAL",
|
||||
marker_firmas: str = "FIRMAS_SECUNDARIAS"):
|
||||
texto_clausula, texto_firmas_secundarias = obtener_textos_reemplazo(firma_digital)
|
||||
|
||||
requests = [
|
||||
{
|
||||
"replaceAllText": {
|
||||
"containsText": {
|
||||
"text": marker_clausula,
|
||||
"matchCase": True,
|
||||
},
|
||||
"replaceText": texto_clausula,
|
||||
}
|
||||
},
|
||||
{
|
||||
"replaceAllText": {
|
||||
"containsText": {
|
||||
"text": marker_firmas,
|
||||
"matchCase": True,
|
||||
},
|
||||
"replaceText": texto_firmas_secundarias,
|
||||
}
|
||||
},
|
||||
]
|
||||
|
||||
return docs_service.documents().batchUpdate(
|
||||
documentId=document_id,
|
||||
body={"requests": requests},
|
||||
).execute()
|
||||
|
||||
|
||||
# Rocketbot quirk FIX
|
||||
globals().update(locals())
|
||||
|
||||
|
||||
# -----------------------------------------------------------
|
||||
# MAIN
|
||||
# -----------------------------------------------------------
|
||||
try:
|
||||
_sv("error_firma_digital", "")
|
||||
|
||||
url_doc = _gvs("current_url", "")
|
||||
if url_doc == "":
|
||||
raise RuntimeError("Falta current_url con la URL exacta del documento a procesar.")
|
||||
|
||||
_sv("debug_current_url_var", url_doc)
|
||||
|
||||
raw_id = extract_doc_id_from_url(url_doc)
|
||||
if not raw_id:
|
||||
raise RuntimeError("No pude extraer documentId de la URL.")
|
||||
|
||||
_sv("gdoc_original_id", raw_id)
|
||||
|
||||
cred_path = _gvs("gdoc_sa_json", "")
|
||||
if cred_path == "":
|
||||
raise RuntimeError("Falta la variable gdoc_sa_json.")
|
||||
|
||||
if not os.path.isabs(cred_path):
|
||||
cred_path = os.path.join(base_dir, cred_path)
|
||||
|
||||
if not os.path.exists(cred_path):
|
||||
raise RuntimeError("No existe credentials.json: " + cred_path)
|
||||
|
||||
token_path = _gvs("gdoc_token_json", "")
|
||||
if token_path == "":
|
||||
token_path = os.path.join(base_dir, "credentials", "token_gdocs_drive.json")
|
||||
if not os.path.isabs(token_path):
|
||||
token_path = os.path.join(base_dir, token_path)
|
||||
|
||||
docs_service, drive_service, auth_mode = get_services(cred_path, token_path)
|
||||
_sv("gdoc_auth_mode", auth_mode)
|
||||
|
||||
doc_id, meta_file, converted = ensure_docs_api_compatible(drive_service, raw_id)
|
||||
_sv("gdoc_id", doc_id)
|
||||
_sv("gdoc_converted", "1" if converted else "0")
|
||||
_sv("gdoc_url_final", f"https://docs.google.com/document/d/{doc_id}/edit")
|
||||
_sv("gdoc_name", meta_file.get("name", ""))
|
||||
|
||||
doc_before = docs_get(docs_service, doc_id)
|
||||
_sv("gdoc_revision_before", doc_before.get("revisionId", ""))
|
||||
|
||||
firma_digital = _gvs("firma_digital", "")
|
||||
marker_clausula = _gvs("marker_clausula_firma_digital", "CLAUSULA_FIRMA_DIGITAL")
|
||||
marker_firmas = _gvs("marker_firmas_secundarias", "FIRMAS_SECUNDARIAS")
|
||||
|
||||
if firma_digital == "":
|
||||
raise RuntimeError("Falta la variable firma_digital.")
|
||||
|
||||
_sv("firma_digital_usada", firma_digital)
|
||||
_sv("marker_clausula_firma_digital_usado", marker_clausula)
|
||||
_sv("marker_firmas_secundarias_usado", marker_firmas)
|
||||
|
||||
ocurrencias_desnegritadas = quitar_negrita_marker_y_siguiente_char(
|
||||
docs_service=docs_service,
|
||||
document_id=doc_id,
|
||||
marker_text=marker_clausula,
|
||||
)
|
||||
_sv("marker_clausula_desnegritado_previo", str(ocurrencias_desnegritadas))
|
||||
|
||||
resultado = reemplazar_clausula_y_firmas(
|
||||
docs_service=docs_service,
|
||||
document_id=doc_id,
|
||||
firma_digital=firma_digital,
|
||||
marker_clausula=marker_clausula,
|
||||
marker_firmas=marker_firmas,
|
||||
)
|
||||
|
||||
replies = resultado.get("replies") or []
|
||||
ocurrencias_clausula = 0
|
||||
ocurrencias_firmas = 0
|
||||
|
||||
if len(replies) > 0:
|
||||
ocurrencias_clausula = int(((replies[0] or {}).get("replaceAllText") or {}).get("occurrencesChanged", 0))
|
||||
if len(replies) > 1:
|
||||
ocurrencias_firmas = int(((replies[1] or {}).get("replaceAllText") or {}).get("occurrencesChanged", 0))
|
||||
|
||||
_sv("reemplazos_clausula_firma_digital", str(ocurrencias_clausula))
|
||||
_sv("reemplazos_firmas_secundarias", str(ocurrencias_firmas))
|
||||
_sv("status_proceso", "Cláusula y firmas reemplazadas correctamente")
|
||||
|
||||
print("Reemplazos realizados correctamente.")
|
||||
print(resultado)
|
||||
|
||||
except Exception as e:
|
||||
import traceback
|
||||
|
||||
error_msg = f"Error en script: {str(e)} - {traceback.format_exc()}"
|
||||
print(error_msg)
|
||||
_sv("status_proceso", "ERROR")
|
||||
_sv("error_firma_digital", error_msg)
|
||||
raise
|
||||
Reference in New Issue
Block a user