53 lines
1.3 KiB
Python
53 lines
1.3 KiB
Python
from datetime import timedelta
|
|
|
|
from aws_lambda_powertools.event_handler.exceptions import ForbiddenError
|
|
from jose import jwt
|
|
from layercake.dateutils import now
|
|
|
|
from config import (
|
|
ISSUER,
|
|
JWT_ALGORITHM,
|
|
JWT_EXP_SECONDS,
|
|
JWT_SECRET,
|
|
OAUTH2_REFRESH_TOKEN_EXPIRES_IN,
|
|
)
|
|
|
|
|
|
def generate_jwt(user_id: str, email: str) -> str:
|
|
now_ = now()
|
|
payload = {
|
|
'sub': user_id,
|
|
'email': email,
|
|
'iat': int(now_.timestamp()),
|
|
'exp': int((now_ + timedelta(seconds=JWT_EXP_SECONDS)).timestamp()),
|
|
'iss': ISSUER,
|
|
}
|
|
return jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGORITHM)
|
|
|
|
|
|
def generate_refresh_token(user_id: str) -> str:
|
|
now_ = now()
|
|
exp = now_ + timedelta(seconds=OAUTH2_REFRESH_TOKEN_EXPIRES_IN)
|
|
payload = {
|
|
'sub': user_id,
|
|
'iat': int(now_.timestamp()),
|
|
'exp': int(exp.timestamp()),
|
|
'iss': ISSUER,
|
|
'typ': 'refresh',
|
|
}
|
|
return jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGORITHM)
|
|
|
|
|
|
def verify_jwt(token: str) -> dict:
|
|
payload = jwt.decode(
|
|
token,
|
|
JWT_SECRET,
|
|
algorithms=[JWT_ALGORITHM],
|
|
issuer=ISSUER,
|
|
options={
|
|
'require': ['exp', 'sub', 'iss'],
|
|
'leeway': 60,
|
|
},
|
|
)
|
|
return payload
|