Files
saladeaula.digital/api.saladeaula.digital/app/routes/courses/__init__.py
2025-10-03 21:58:35 -03:00

84 lines
2.2 KiB
Python

from http import HTTPStatus
from io import BytesIO
from typing import Any
from aws_lambda_powertools import Logger
from aws_lambda_powertools.event_handler.api_gateway import Router
from aws_lambda_powertools.event_handler.exceptions import (
BadRequestError,
NotFoundError,
)
from layercake.dateutils import now
from layercake.dynamodb import DynamoDBPersistenceLayer, KeyPair
from pydantic import UUID4, BaseModel
from api_gateway import JSONResponse
from boto3clients import dynamodb_client
from config import COURSE_TABLE
from form_data import parse
logger = Logger(__name__)
router = Router()
dyn = DynamoDBPersistenceLayer(COURSE_TABLE, dynamodb_client)
@router.get('/<course_id>')
def get_course(course_id: str):
return dyn.collection.get_item(
KeyPair(course_id, '0'),
exc_cls=NotFoundError,
)
class Cert(BaseModel):
exp_interval: int | None = None
rawfile: bytes | None = None
def model_dump(self, **kwargs) -> dict[str, Any]:
return super().model_dump(
exclude={'rawfile'},
exclude_none=True,
**kwargs,
)
class Course(BaseModel):
id: UUID4
name: str
access_period: int
cert: Cert | None = None
@router.put('/<course_id>')
def edit_course(course_id: str):
event = router.current_event
if not event.decoded_body:
raise BadRequestError('Invalid request body')
body = BytesIO(event.decoded_body.encode())
course = Course.model_validate(
{'id': course_id} | parse(event.headers, body),
)
now_ = now()
with dyn.transact_writer() as transact:
transact.update(
key=KeyPair(str(course.id), '0'),
update_expr='SET #name = :name, access_period = :access_period, \
cert = :cert, updated_at = :updated_at',
expr_attr_names={
'#name': 'name',
},
expr_attr_values={
':name': course.name,
':cert': course.cert.model_dump() if course.cert else None,
':access_period': course.access_period,
':updated_at': now_,
},
cond_expr='attribute_exists(sk)',
exc_cls=BadRequestError,
)
return JSONResponse(HTTPStatus.NO_CONTENT)