76 lines
2.2 KiB
Python
76 lines
2.2 KiB
Python
from aws_lambda_powertools import Logger
|
|
|
|
logger = Logger(__name__)
|
|
|
|
|
|
class UnauthorizedError(Exception):
|
|
pass
|
|
|
|
|
|
def get_user(access_token: str, /, idp_client) -> dict[str, str]:
|
|
"""Gets the user attributes and metadata for a user."""
|
|
try:
|
|
user = idp_client.get_user(AccessToken=access_token)
|
|
except idp_client.exceptions.ClientError:
|
|
raise UnauthorizedError()
|
|
else:
|
|
return {attr['Name']: attr['Value'] for attr in user['UserAttributes']}
|
|
|
|
|
|
def admin_get_user(
|
|
sub: str,
|
|
user_pool_id: str,
|
|
*,
|
|
idp_client,
|
|
) -> dict[str, str] | None:
|
|
"""Gets the specified user by user name in a user pool as an administrator.
|
|
Works on any user.
|
|
|
|
- https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/cognito-idp/client/admin_get_user.html
|
|
- https://docs.aws.amazon.com/cognito-user-identity-pools/latest/APIReference/API_AdminGetUser.html
|
|
"""
|
|
try:
|
|
user = idp_client.admin_get_user(Username=sub, UserPoolId=user_pool_id)
|
|
except idp_client.exceptions as err:
|
|
logger.exception(err)
|
|
return None
|
|
else:
|
|
return user
|
|
|
|
|
|
def admin_set_user_password(
|
|
username: str,
|
|
password: str,
|
|
*,
|
|
user_pool_id: str,
|
|
permanent: bool = False,
|
|
idp_client,
|
|
) -> bool:
|
|
"""Sets the specified user's password in a user pool as an administrator.
|
|
Works on any user.
|
|
|
|
The password can be temporary or permanent. If it is temporary, the user
|
|
status enters the FORCE_CHANGE_PASSWORD state.
|
|
|
|
When the user next tries to sign in, the InitiateAuth/AdminInitiateAuth
|
|
response will contain the NEW_PASSWORD_REQUIRED challenge.
|
|
|
|
If the user doesn't sign in before it expires, the user won't be able
|
|
to sign in, and an administrator must reset their password.
|
|
|
|
Once the user has set a new password, or the password is permanent,
|
|
the user status is set to Confirmed.
|
|
"""
|
|
try:
|
|
idp_client.admin_set_user_password(
|
|
UserPoolId=user_pool_id,
|
|
Username=username,
|
|
Password=password,
|
|
Permanent=permanent,
|
|
)
|
|
except idp_client.exceptions as err:
|
|
logger.exception(err)
|
|
return False
|
|
else:
|
|
return True
|