Scripst ajustados para soportar cuenta de servicio

This commit is contained in:
mabejoyb
2026-05-13 14:20:55 -03:00
parent 912c5b74c3
commit d0c1f77ec3
4 changed files with 65 additions and 152 deletions

View File

@@ -1,6 +1,3 @@
from google.auth.transport.requests import Request
from google_auth_oauthlib.flow import InstalledAppFlow
from google.oauth2.credentials import Credentials
from google.oauth2 import service_account
from googleapiclient.discovery import build
import os
@@ -25,8 +22,8 @@ if os.path.isdir(libs_dir) and libs_dir not in sys.path:
SCOPES = {scopes_api_google}
CREDENTIALS_PATH = '{gdoc_sa_json}'
TOKEN_PATH = '{gdoc_token_json}'
CREDENTIALS_PATH = '{gdoc_sa_json2}'
IMPERSONATED_USER = '{gdoc_impersonated_user}'
def _load_json(path):
@@ -34,39 +31,17 @@ def _load_json(path):
return json.load(f)
def get_google_services(credentials_json_path=CREDENTIALS_PATH, token_json_path=TOKEN_PATH):
def get_google_services(credentials_json_path=CREDENTIALS_PATH, impersonated_user=IMPERSONATED_USER):
info = _load_json(credentials_json_path)
if not isinstance(info, dict) or info.get('type') != 'service_account':
raise RuntimeError('gdoc_sa_json2 debe apuntar a un JSON de cuenta de servicio.')
# Service account
if isinstance(info, dict) and info.get('type') == 'service_account':
creds = service_account.Credentials.from_service_account_file(
credentials_json_path, scopes=SCOPES)
drive_service = build(
'drive', 'v3', credentials=creds, cache_discovery=False)
docs_service = build(
'docs', 'v1', credentials=creds, cache_discovery=False)
return drive_service, docs_service
# OAuth flow
creds = None
if os.path.exists(token_json_path):
creds = Credentials.from_authorized_user_file(token_json_path, SCOPES)
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file(
credentials_json_path, SCOPES)
try:
creds = flow.run_local_server(port=0)
except Exception:
creds = flow.run_console()
os.makedirs(os.path.dirname(token_json_path), exist_ok=True)
with open(token_json_path, 'w', encoding='utf-8') as f:
f.write(creds.to_json())
impersonated_user = (impersonated_user or '').strip()
if not impersonated_user:
raise RuntimeError('Falta la variable gdoc_impersonated_user.')
creds = service_account.Credentials.from_service_account_file(
credentials_json_path, scopes=SCOPES, subject=impersonated_user)
drive_service = build(
'drive', 'v3', credentials=creds, cache_discovery=False)
docs_service = build(
@@ -114,7 +89,7 @@ def replace_vars_doc(docs_service, gdoc_id, replacements_values):
# Iniciar servicios (Solo Drive y Docs)
drive_service, docs_service = get_google_services(CREDENTIALS_PATH, TOKEN_PATH)
drive_service, docs_service = get_google_services(CREDENTIALS_PATH, IMPERSONATED_USER)
# Variables de entrada (esto usualmente viene de Rocketbot via GetVar)
gdoc_id = '{gdoc_id}'