112 lines
2.7 KiB
Python
112 lines
2.7 KiB
Python
import time
|
|
from dataclasses import dataclass, field
|
|
|
|
from authlib.oauth2.rfc6749 import (
|
|
AuthorizationCodeMixin,
|
|
ClientMixin,
|
|
TokenMixin,
|
|
)
|
|
from layercake.dateutils import fromisoformat, now
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class User:
|
|
id: str
|
|
name: str
|
|
email: str
|
|
email_verified: bool = False
|
|
scope: list[str] | None = None
|
|
|
|
def get_user_id(self):
|
|
return self.id
|
|
|
|
|
|
class OAuth2AuthorizationCode(AuthorizationCodeMixin):
|
|
def __init__(
|
|
self,
|
|
user_id: str,
|
|
code: str,
|
|
client_id: str,
|
|
redirect_uri: str,
|
|
response_type: str,
|
|
scope: str,
|
|
code_challenge: str | None = None,
|
|
code_challenge_method: str | None = None,
|
|
nonce: str | None = None,
|
|
**kwargs,
|
|
) -> None:
|
|
self.user_id = user_id
|
|
self.code = code
|
|
self.client_id = client_id
|
|
self.redirect_uri = redirect_uri
|
|
self.response_type = response_type
|
|
self.scope = scope
|
|
self.code_challenge = code_challenge
|
|
self.code_challenge_method = code_challenge_method
|
|
self.nonce = nonce
|
|
|
|
created_at = kwargs.get('created_at', '')
|
|
auth_time = fromisoformat(created_at) or now()
|
|
self.auth_time = int(auth_time.timestamp())
|
|
|
|
def get_redirect_uri(self) -> str:
|
|
return self.redirect_uri
|
|
|
|
def get_scope(self) -> str:
|
|
return self.scope
|
|
|
|
def get_nonce(self) -> str | None:
|
|
return self.nonce
|
|
|
|
def get_auth_time(self) -> int:
|
|
return self.auth_time
|
|
|
|
def get_acr(self):
|
|
return '0'
|
|
|
|
def get_amr(self) -> list:
|
|
return []
|
|
|
|
|
|
class OAuth2Token(TokenMixin):
|
|
def __init__(
|
|
self,
|
|
user: dict,
|
|
client_id: str,
|
|
scope: str,
|
|
expires_in: int,
|
|
issued_at: int,
|
|
access_token: str | None = None,
|
|
refresh_token: str | None = None,
|
|
**_,
|
|
) -> None:
|
|
self.user = user
|
|
self.client_id = client_id
|
|
self.scope = scope
|
|
self.expires_in = expires_in
|
|
self.issued_at = issued_at
|
|
self.access_token = access_token
|
|
self.refresh_token = refresh_token
|
|
|
|
def get_user(self) -> User:
|
|
return User(**self.user)
|
|
|
|
def check_client(self, client: ClientMixin) -> bool:
|
|
return self.client_id == client.get_client_id()
|
|
|
|
def get_scope(self) -> str:
|
|
return self.scope
|
|
|
|
def get_expires_in(self) -> int:
|
|
return self.expires_in
|
|
|
|
def is_revoked(self) -> bool:
|
|
return False
|
|
|
|
def is_expired(self) -> bool:
|
|
if not self.expires_in:
|
|
return False
|
|
|
|
expires_at = self.issued_at + self.expires_in
|
|
return expires_at < time.time()
|