Files
scripts_zubabot/detalles_tecnicos.py
dulcefigueredo a35e86c318 Subir archivos a "/"
añado el script que reemplaza un marcador para el anexo II sobre inventarios
2026-04-22 19:35:58 +00:00

548 lines
16 KiB
Python

from __future__ import annotations
import warnings
warnings.simplefilter("ignore", ResourceWarning)
import os
import sys
import json
import re
import ast
import unicodedata
from typing import Any, Dict, List, Tuple
# --------------------
# Boot libs (Rocketbot portable)
# --------------------
base_dir = os.path.dirname(sys.executable)
libs_dir = os.path.join(base_dir, "py_libs", "py310")
sys.path.insert(0, libs_dir)
for k in list(sys.modules.keys()):
if k == "pyparsing" or k.startswith("pyparsing."):
del sys.modules[k]
from googleapiclient.discovery import build
from google.oauth2 import service_account
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request as GRequest
# -----------------------------------------------------------
# GOOGLE AUTH
# -----------------------------------------------------------
# Este placeholder queda igual para Rocketbot.
SCOPES = {scopes_api_google}
MARCADOR_DEFAULT = "DETALLES_TECNICOS"
SIMPLE_BULLET_PRESET = "BULLET_DISC_CIRCLE_SQUARE"
# -----------------------------------------------------------
# ROCKETBOT VAR HELPERS
# -----------------------------------------------------------
def _sv(name, value):
try:
SetVar(name, value)
except Exception:
pass
def _missing(v) -> bool:
if v is None:
return True
if isinstance(v, str):
s = v.strip()
return s == "" or s == "ERROR_NOT_VAR"
return False
def _gv(name, default=None):
try:
v = GetVar(name)
except Exception:
return default
return default if _missing(v) else v
def _gvs(name, default="") -> str:
v = _gv(name, default)
if v is None:
return default
s = str(v).strip()
return default if (s == "" or s == "ERROR_NOT_VAR") else s
# -----------------------------------------------------------
# GOOGLE HELPERS
# -----------------------------------------------------------
def _load_json(path):
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
def get_services(credentials_json_path, token_json_path):
info = _load_json(credentials_json_path)
if isinstance(info, dict) and info.get("type") == "service_account":
creds = service_account.Credentials.from_service_account_file(
credentials_json_path,
scopes=SCOPES,
)
docs = build("docs", "v1", credentials=creds, cache_discovery=False)
drive = build("drive", "v3", credentials=creds, cache_discovery=False)
return docs, drive, "service_account"
creds = None
if os.path.exists(token_json_path):
creds = Credentials.from_authorized_user_file(token_json_path, SCOPES)
if (not creds) or (not creds.valid):
if creds and creds.expired and creds.refresh_token:
creds.refresh(GRequest())
else:
flow = InstalledAppFlow.from_client_secrets_file(credentials_json_path, SCOPES)
try:
creds = flow.run_local_server(port=0)
except Exception:
creds = flow.run_console()
os.makedirs(os.path.dirname(token_json_path), exist_ok=True)
with open(token_json_path, "w", encoding="utf-8") as f:
f.write(creds.to_json())
docs = build("docs", "v1", credentials=creds, cache_discovery=False)
drive = build("drive", "v3", credentials=creds, cache_discovery=False)
return docs, drive, "oauth"
def extract_doc_id_from_url(url: str) -> str:
m = re.search(r"/document/d/([a-zA-Z0-9_-]+)", url or "")
return m.group(1) if m else ""
def ensure_docs_api_compatible(drive_service, file_id: str):
meta = drive_service.files().get(
fileId=file_id,
fields="id,name,mimeType,shortcutDetails",
supportsAllDrives=True,
).execute()
if meta.get("mimeType") == "application/vnd.google-apps.shortcut":
target = (meta.get("shortcutDetails") or {}).get("targetId", "")
if not target:
raise RuntimeError("Es shortcut pero no trae targetId.")
file_id = target
meta = drive_service.files().get(
fileId=file_id,
fields="id,name,mimeType",
supportsAllDrives=True,
).execute()
if meta.get("mimeType") != "application/vnd.google-apps.document":
new_name = (meta.get("name") or "Documento") + " (Google Docs)"
converted = drive_service.files().copy(
fileId=file_id,
body={"name": new_name, "mimeType": "application/vnd.google-apps.document"},
fields="id,name,mimeType",
supportsAllDrives=True,
).execute()
return converted["id"], converted, True
return file_id, meta, False
def docs_get(docs_service, doc_id: str):
return docs_service.documents().get(documentId=doc_id).execute()
# -----------------------------------------------------------
# DOC STRUCTURE HELPERS
# -----------------------------------------------------------
def _norm_alnum(s: str) -> str:
if s is None:
return ""
s = unicodedata.normalize("NFKD", str(s)).lower()
out = []
for ch in s:
if unicodedata.category(ch) == "Mn":
continue
if ch.isalnum():
out.append(ch)
return "".join(out)
def _walk_tabs(tabs):
if not tabs:
return
for t in tabs:
yield t
for x in _walk_tabs(t.get("childTabs") or []):
yield x
def _get_tab_body_content(doc, tab_id):
tabs = doc.get("tabs")
if not tabs:
return (doc.get("body") or {}).get("content", [])
for t in _walk_tabs(tabs):
tid = ((t.get("tabProperties") or {}).get("tabId")) or ""
if tid == (tab_id or ""):
dtab = t.get("documentTab") or {}
return (dtab.get("body") or {}).get("content", [])
return []
def _iter_text_chars_from_body(doc, tab_id):
content = _get_tab_body_content(doc, tab_id)
def walk(content_list):
for el in content_list or []:
if not isinstance(el, dict):
continue
p = el.get("paragraph")
if p:
for pe in p.get("elements", []):
tr = pe.get("textRun")
if not tr:
continue
txt = tr.get("content", "")
st = pe.get("startIndex")
if st is None or txt is None:
continue
for i, ch in enumerate(txt):
yield int(st) + i, ch
continue
t = el.get("table")
if t:
for row in t.get("tableRows", []):
for cell in row.get("tableCells", []):
yield from walk(cell.get("content", []))
continue
toc = el.get("tableOfContents")
if toc:
yield from walk(toc.get("content", []))
continue
yield from walk(content)
def find_marker_in_body(doc, marker: str):
target = _norm_alnum(marker)
if not target:
return None
tabs = doc.get("tabs")
tab_ids = [None]
if tabs:
tab_ids = []
for t in _walk_tabs(tabs):
tid = ((t.get("tabProperties") or {}).get("tabId")) or ""
tab_ids.append(tid)
for tab_id in tab_ids:
norm_chars = []
norm_to_docidx = []
for doc_i, ch in _iter_text_chars_from_body(doc, tab_id):
if ch.isalnum():
norm_chars.append(ch.lower())
norm_to_docidx.append(int(doc_i))
norm_text = "".join(norm_chars)
pos = norm_text.find(target)
if pos != -1:
start_doc = norm_to_docidx[pos]
end_doc = norm_to_docidx[pos + len(target) - 1] + 1
return {
"tabId": tab_id,
"start": int(start_doc),
"end": int(end_doc),
}
return None
def _loc(index, tab_id=None):
d = {"index": int(index)}
if tab_id:
d["tabId"] = tab_id
return d
def _range(start, end, tab_id=None):
d = {"startIndex": int(start), "endIndex": int(end)}
if tab_id:
d["tabId"] = tab_id
return d
# -----------------------------------------------------------
# DATA PARSING / FORMAT HELPERS
# -----------------------------------------------------------
def normalizar_espacios(texto: str) -> str:
return re.sub(r"\s+", " ", str(texto or "").strip())
def parse_especificaciones(raw: Any) -> List[Dict[str, str]]:
if isinstance(raw, list):
return raw
if raw is None:
return []
texto = str(raw).strip()
if not texto:
return []
try:
data = json.loads(texto)
if isinstance(data, list):
return data
except Exception:
pass
try:
data = ast.literal_eval(texto)
if isinstance(data, list):
return data
except Exception:
pass
raise RuntimeError("No se pudo interpretar especificaciones_tecnicas como JSON ni como lista Python.")
def dividir_en_bullets(texto: str) -> List[str]:
texto = normalizar_espacios(texto)
if not texto:
return []
partes = re.split(r"(?<=[.!?])\s+(?=(?:[A-ZÁÉÍÓÚÑÜ]|\d))", texto)
return [p.strip() for p in partes if p.strip()]
def construir_bloque_formateado(especificaciones_tecnicas: List[Dict[str, str]]):
partes: List[str] = []
rangos_titulos: List[Tuple[int, int]] = []
rangos_bullets: List[Tuple[int, int]] = []
cursor = 0
items_validos = []
for item in especificaciones_tecnicas or []:
clave = normalizar_espacios((item or {}).get("clave", ""))
valor = (item or {}).get("valor", "")
if clave:
items_validos.append({"clave": clave, "valor": valor})
for i, item in enumerate(items_validos):
titulo_completo = item["clave"]
bullets = dividir_en_bullets(item.get("valor", ""))
ini_titulo = cursor
partes.append(titulo_completo)
cursor += len(titulo_completo)
fin_titulo = cursor
rangos_titulos.append((ini_titulo, fin_titulo))
partes.append("\n")
cursor += 1
for bullet in bullets:
ini_bullet = cursor
partes.append(bullet)
cursor += len(bullet)
fin_bullet = cursor
rangos_bullets.append((ini_bullet, fin_bullet))
partes.append("\n")
cursor += 1
if i < len(items_validos) - 1:
partes.append("\n")
cursor += 1
return "".join(partes), rangos_titulos, rangos_bullets
# -----------------------------------------------------------
# MAIN ACTION
# -----------------------------------------------------------
def reemplazar_detalles_tecnicos(
docs_service,
document_id: str,
marcador: str,
especificaciones_tecnicas: List[Dict[str, str]],
):
documento = docs_get(docs_service, document_id)
hit = find_marker_in_body(documento, marcador)
if not hit:
raise RuntimeError(f"No se encontró el marcador en el BODY del documento: {marcador}")
marcador_inicio = int(hit["start"])
marcador_fin = int(hit["end"])
tab_id = hit.get("tabId")
texto_insertar, rangos_titulos, rangos_bullets = construir_bloque_formateado(especificaciones_tecnicas)
if not texto_insertar.strip():
raise RuntimeError("especificaciones_tecnicas está vacío o no contiene elementos válidos.")
docs_service.documents().batchUpdate(
documentId=document_id,
body={
"requests": [
{
"deleteContentRange": {
"range": _range(marcador_inicio, marcador_fin, tab_id)
}
}
]
},
).execute()
docs_service.documents().batchUpdate(
documentId=document_id,
body={
"requests": [
{
"insertText": {
"location": _loc(marcador_inicio, tab_id),
"text": texto_insertar,
}
}
]
},
).execute()
requests_estilo = []
# 1) cada título queda como texto normal del documento, con su número real incluido y en negrita.
for ini, fin in rangos_titulos:
requests_estilo.append(
{
"updateTextStyle": {
"range": _range(marcador_inicio + ini, marcador_inicio + fin, tab_id),
"textStyle": {"bold": True},
"fields": "bold",
}
}
)
# 2) solo los detalles van con viñeta simple.
for ini, fin in rangos_bullets:
requests_estilo.append(
{
"createParagraphBullets": {
"range": _range(marcador_inicio + ini, marcador_inicio + fin + 1, tab_id),
"bulletPreset": SIMPLE_BULLET_PRESET,
}
}
)
if requests_estilo:
docs_service.documents().batchUpdate(
documentId=document_id,
body={"requests": requests_estilo},
).execute()
return {
"ok": True,
"document_id": document_id,
"marcador": marcador,
"titulos_formateados": len(rangos_titulos),
"bullets_formateados": len(rangos_bullets),
"texto_insertado": texto_insertar,
}
# Rocketbot quirk FIX
globals().update(locals())
# -----------------------------------------------------------
# MAIN
# -----------------------------------------------------------
try:
_sv("error_detalles_tecnicos", "")
url_doc = _gvs("current_url", "")
if url_doc == "":
raise RuntimeError("Falta current_url con la URL exacta del documento a procesar.")
raw_id = extract_doc_id_from_url(url_doc)
if not raw_id:
raise RuntimeError("No pude extraer documentId de la URL.")
cred_path = _gvs("gdoc_sa_json", "")
if cred_path == "":
raise RuntimeError("Falta la variable gdoc_sa_json.")
if not os.path.isabs(cred_path):
cred_path = os.path.join(base_dir, cred_path)
if not os.path.exists(cred_path):
raise RuntimeError("No existe credentials.json: " + cred_path)
token_path = _gvs("gdoc_token_json", "")
if token_path == "":
token_path = os.path.join(base_dir, "credentials", "token_gdocs_drive.json")
if not os.path.isabs(token_path):
token_path = os.path.join(base_dir, token_path)
docs_service, drive_service, auth_mode = get_services(cred_path, token_path)
_sv("gdoc_auth_mode", auth_mode)
doc_id, meta_file, converted = ensure_docs_api_compatible(drive_service, raw_id)
_sv("gdoc_id", doc_id)
_sv("gdoc_converted", "1" if converted else "0")
_sv("gdoc_url_final", f"https://docs.google.com/document/d/{doc_id}/edit")
_sv("gdoc_name", meta_file.get("name", ""))
marcador = _gvs("marker_detalles_tecnicos", MARCADOR_DEFAULT)
_sv("gdoc_marker_detalles_tecnicos_used", marcador)
raw_especificaciones = globals().get("especificaciones_tecnicas", None)
if raw_especificaciones is None:
raw_especificaciones = _gv("especificaciones_tecnicas", None)
especificaciones_tecnicas = parse_especificaciones(raw_especificaciones)
resultado = reemplazar_detalles_tecnicos(
docs_service=docs_service,
document_id=doc_id,
marcador=marcador,
especificaciones_tecnicas=especificaciones_tecnicas,
)
_sv("status_proceso", "DETALLES_TECNICOS insertado correctamente")
_sv("detalles_tecnicos_insertados", "1")
_sv("detalles_tecnicos_resultado", json.dumps(resultado, ensure_ascii=False))
print("DETALLES_TECNICOS insertado correctamente")
print(json.dumps(resultado, ensure_ascii=False, indent=2))
except Exception as e:
import traceback
error_msg = f"Error en script: {str(e)} - {traceback.format_exc()}"
print(error_msg)
_sv("status_proceso", "ERROR")
_sv("error_detalles_tecnicos", error_msg)
raise