import random import string from dataclasses import dataclass from urllib.parse import quote_plus, urlparse import requests from aws_lambda_powertools.event_handler.exceptions import BadRequestError from glom import glom from config import KONVIVA_API_URL, KONVIVA_SECRET_KEY ALU = 2 GES = 3 headers = { 'Authorization': f'KONVIVA {KONVIVA_SECRET_KEY}', 'Content-Type': 'application/json', } def random_str(maxlen: int = 10) -> str: """Returns a random string of letters.""" return ''.join(random.choice(string.ascii_letters) for _ in range(maxlen)) class KonvivaError(BadRequestError): pass class EmailAlreadyExists(KonvivaError): pass @dataclass(frozen=True) class User: IDUsuario: int Identificador: str NomeUsuario: str Email: str CPF: str | None = None def create_user( id: str, name: str, email: str, cpf: str | None, ) -> dict: url = urlparse(KONVIVA_API_URL)._replace( path='/action/api/integrarUsuario', query='sendMail=false', ) r = requests.post( url=url.geturl(), headers=headers, json={ 'Identificador': id, 'NomeUsuario': name, 'Login': email, 'Email': email, 'CPF': cpf, 'Senha': random_str(30), 'Situacao': 'ATIVO', 'UnidadesPerfil': [ { 'IDPerfil': ALU, 'CODPerfil': 'ALU', 'CODUnidade': '1380e6d0-fee8-4cb9-9bf0-a087e703b1a4', } ], }, ) r.raise_for_status() # Because Konviva does not return the proper HTTP status code if err := glom(r.json(), 'errors', default=None): err = err[0] if isinstance(err, list) else err raise EmailAlreadyExists(err) return r.json().get('IDUsuario') def get_users_by_email(email: str) -> list[dict]: url = urlparse(KONVIVA_API_URL)._replace( path='/action/api/getUsuariosByQuery', query=f'q=Email=={quote_plus(email)}', ) r = requests.get( url=url.geturl(), headers=headers, ) r.raise_for_status() if glom(r.json(), '0.errors', default=None): return [] return r.json()