Files
saladeaula.digital/konviva-events/app/konviva.py
2025-07-18 00:14:45 -03:00

154 lines
3.4 KiB
Python

from urllib.parse import quote_plus, urlparse
import requests
from aws_lambda_powertools.event_handler.exceptions import BadRequestError
from glom import glom
from layercake.strutils import random_str
from config import KONVIVA_API_URL, KONVIVA_SECRET_KEY
ADM = 1
ALU = 2
GES = 3
headers = {
'Authorization': f'KONVIVA {KONVIVA_SECRET_KEY}',
'Content-Type': 'application/json',
}
class KonvivaError(BadRequestError):
pass
class EmailAlreadyExistsError(KonvivaError):
pass
def create_user(
id: str,
name: str,
email: str,
cpf: str | None,
) -> int:
url = urlparse(KONVIVA_API_URL)._replace(
path='/action/api/integrarUsuario',
query='sendMail=false',
)
r = requests.post(
url=url.geturl(),
headers=headers,
json={
'Identificador': id,
'NomeUsuario': name,
'Login': email,
'Email': email,
'CPF': cpf,
'Senha': random_str(30),
'Situacao': 'ATIVO',
'UnidadesPerfil': [
{
'IDPerfil': ALU,
'CODPerfil': 'ALU',
'CODUnidade': '1380e6d0-fee8-4cb9-9bf0-a087e703b1a4',
}
],
},
)
r.raise_for_status()
data = r.json()
# Because Konviva does not return the proper HTTP status code
if err := glom(data, 'errors', default=None):
err = err[0] if isinstance(err, list) else err
if err == 'Login já existente':
raise EmailAlreadyExistsError(err)
else:
raise KonvivaError(err)
return int(data.get('IDUsuario'))
def update_user(id: str, **kwargs) -> dict:
url = urlparse(KONVIVA_API_URL)._replace(
path='/action/api/integrarUsuario',
query='sendMail=false',
)
r = requests.put(
url=url.geturl(),
headers=headers,
json={
'IDUsuario': int(id),
**kwargs,
},
)
r.raise_for_status()
data = r.json()
if err := glom(data, 'errors', default=None):
err = err[0] if isinstance(err, list) else err
raise KonvivaError(err)
return data
def get_users_by_email(email: str) -> list[dict]:
url = urlparse(KONVIVA_API_URL)._replace(
path='/action/api/getUsuariosByQuery',
query=f'q=Email=={quote_plus(email)} or Login=={quote_plus(email)}',
)
r = requests.get(
url=url.geturl(),
headers=headers,
)
r.raise_for_status()
data = r.json()
if glom(data, '0.errors', default=None):
return []
return data
def _post_enrollment(json: dict) -> dict:
url = urlparse(KONVIVA_API_URL)._replace(path='/action/api/integrarMatricula')
r = requests.post(
url=url.geturl(),
headers=headers,
json=json,
)
r.raise_for_status()
data = r.json()
if err := glom(data, 'errors', default=None):
err = err[0] if isinstance(err, list) else err
raise KonvivaError(err)
return data
def enroll(user_id: str, class_id: str) -> int:
r = _post_enrollment(
{
'IDUsuario': int(user_id),
'IDTurma': int(class_id),
'StatusMatricula': 'MATRICULADO',
}
)
return int(r.get('IDMatricula')) # type: ignore
def cancel_enrollment(id: str) -> dict:
return _post_enrollment(
{
'IDMatricula': str(id),
'StatusMatricula': 'CANCELADO',
}
)