Files
scripts_zubabot/duplicar_plantilla_contrato.py

169 lines
5.2 KiB
Python

from google.oauth2 import service_account
from googleapiclient.discovery import build
import os
import sys
import json
import re
global Request, InstalledAppFlow, Credentials, service_account, build, os, sys, json, re, base_dir, libs_dir, SCOPES, CREDENTIALS_PATH, TOKEN_PATH, _load_json, get_drive_service, extract_folder_id_from_url, get_doc_name, copy_doc_to_folder, get_doc_url, TEMPLATE_DOC_ID, FOLDER_URL, NEW_DOC_NAME, drive_service, folder_id, copied_doc, doc_url
# ===== Rocketbot-compatible py_libs path =====
base_dir = os.path.dirname(sys.executable) if getattr(
sys, 'frozen', False) else os.getcwd()
libs_dir = os.path.join(base_dir, 'librerias', 'py_libs', 'py310')
if os.path.isdir(libs_dir) and libs_dir not in sys.path:
sys.path.insert(0, libs_dir)
def rb_get_var(name, required=True, default=''):
try:
value = GetVar(name)
except NameError:
value = os.environ.get(name, default)
if value is None:
value = ''
value = str(value).strip()
if required and (not value or value == f'{{{name}}}'):
raise RuntimeError(f'La variable Rocketbot "{name}" está vacía o no fue leída correctamente.')
return value
SCOPES = {scopes_api_google}
CREDENTIALS_PATH = rb_get_var('gdoc_sa_json2')
IMPERSONATED_USER = rb_get_var('gdoc_impersonated_user')
def _load_json(path):
with open(path, 'r', encoding='utf-8') as f:
return json.load(f)
def get_drive_service(credentials_json_path=CREDENTIALS_PATH, impersonated_user=IMPERSONATED_USER):
info = _load_json(credentials_json_path)
if not isinstance(info, dict) or info.get('type') != 'service_account':
raise RuntimeError('gdoc_sa_json2 debe apuntar a un JSON de cuenta de servicio.')
impersonated_user = (impersonated_user or '').strip()
if not impersonated_user:
raise RuntimeError('Falta la variable gdoc_impersonated_user.')
creds = service_account.Credentials.from_service_account_file(
credentials_json_path, scopes=SCOPES, subject=impersonated_user)
drive_service = build(
'drive', 'v3', credentials=creds, cache_discovery=False)
return drive_service
def extract_folder_id_from_url(folder_url):
"""Extrae el ID de una carpeta desde una URL de Google Drive"""
match = re.search(r'/folders/([a-zA-Z0-9_-]+)', folder_url)
if match:
return match.group(1)
return folder_url # Si ya es un ID, lo retorna como está
def get_doc_name(drive_service, doc_id):
"""Obtiene el nombre del documento de Google Drive"""
file_metadata = drive_service.files().get(
fileId=doc_id, fields='name'
).execute()
return file_metadata.get('name', 'Documento')
def copy_doc_to_folder(drive_service, template_doc_id, folder_id, new_doc_name=None):
"""
Copia un documento a una carpeta específica
Args:
drive_service: Servicio de Drive autenticado
template_doc_id: ID del documento plantilla a copiar
folder_id: ID de la carpeta destino
new_doc_name: Nombre opcional para el documento copiado
Returns:
dict con 'id' y 'webViewLink' del documento creado
"""
if new_doc_name is None:
new_doc_name = get_doc_name(
drive_service, template_doc_id) + ' (copia)'
# Copiar el documento
body = {
'name': new_doc_name,
'parents': [folder_id]
}
copied_file = drive_service.files().copy(
fileId=template_doc_id,
body=body,
fields='id, webViewLink, name'
).execute()
return copied_file
def get_doc_url(doc_id):
"""Genera la URL de edición del documento en Google Drive"""
return f"https://docs.google.com/document/d/{doc_id}/edit"
TEMPLATE_DOC_ID = rb_get_var('template_contrato_id')
FOLDER_URL = rb_get_var('drive_contratos_carpeta')
proyecto = rb_get_var('proyecto')
numeracion_dpto = rb_get_var('numeracion_dpto')
nombre_comprador = rb_get_var('nombre_comprador')
NEW_DOC_NAME = f'{proyecto} - {numeracion_dpto} - {nombre_comprador}'
try:
if not os.path.exists(CREDENTIALS_PATH):
raise FileNotFoundError(
f'No existe el archivo de credenciales: {CREDENTIALS_PATH}')
# Inicializar servicio
drive_service = get_drive_service(CREDENTIALS_PATH, IMPERSONATED_USER)
# Extraer ID de carpeta desde URL
folder_id = extract_folder_id_from_url(FOLDER_URL)
# Copiar documento
copied_doc = copy_doc_to_folder(
drive_service, TEMPLATE_DOC_ID, folder_id, NEW_DOC_NAME)
# Generar URL del documento creado
doc_url = get_doc_url(copied_doc['id'])
print(f'Documento copiado exitosamente')
print(f'Nombre: {copied_doc.get("name")}')
print(f'ID: {copied_doc.get("id")}')
print(f'URL: {doc_url}')
# Retornar valores (Rocketbot compatible)
try:
SetVar('current_url', doc_url)
SetVar('gdoc_id', copied_doc['id'])
SetVar('gdoc_url_final', doc_url)
SetVar('doc_copy_created', True)
except NameError:
pass # Si no está en Rocketbot, solo imprime
except Exception as e:
import traceback
error = {
'error': str(e),
'descripcion': traceback.format_exc()
}
try:
SetVar('error', error)
SetVar('doc_copy_created', False)
except NameError:
pass
print(f"Error detectado: {str(e)}")