98 lines
2.3 KiB
Python
98 lines
2.3 KiB
Python
import time
|
|
|
|
from authlib.oauth2.rfc6749 import (
|
|
AuthorizationCodeMixin,
|
|
ClientMixin,
|
|
TokenMixin,
|
|
)
|
|
from layercake.dateutils import fromisoformat
|
|
|
|
|
|
class OAuth2AuthorizationCode(AuthorizationCodeMixin):
|
|
def __init__(
|
|
self,
|
|
user_id: str,
|
|
code: str,
|
|
client_id: str,
|
|
redirect_uri: str,
|
|
response_type: str,
|
|
scope: str,
|
|
code_challenge: str | None = None,
|
|
code_challenge_method: str | None = None,
|
|
nonce: str | None = None,
|
|
**kwargs,
|
|
) -> None:
|
|
self.user_id = user_id
|
|
self.code = code
|
|
self.client_id = client_id
|
|
self.redirect_uri = redirect_uri
|
|
self.response_type = response_type
|
|
self.scope = scope
|
|
self.code_challenge = code_challenge
|
|
self.code_challenge_method = code_challenge_method
|
|
self.nonce = nonce
|
|
|
|
auth_time = fromisoformat(kwargs.get('created_at', '')) or now()
|
|
self.auth_time = int(auth_time.timestamp())
|
|
|
|
def get_redirect_uri(self):
|
|
return self.redirect_uri
|
|
|
|
def get_scope(self):
|
|
return self.scope
|
|
|
|
def get_nonce(self):
|
|
return self.nonce
|
|
|
|
def get_auth_time(self):
|
|
return self.auth_time
|
|
|
|
def get_acr(self):
|
|
return '0'
|
|
|
|
def get_amr(self):
|
|
return []
|
|
|
|
|
|
class OAuth2Token(TokenMixin):
|
|
def __init__(
|
|
self,
|
|
user: dict,
|
|
client_id: str,
|
|
scope: str,
|
|
expires_in: int,
|
|
issued_at: int,
|
|
access_token: str | None = None,
|
|
refresh_token: str | None = None,
|
|
**_,
|
|
) -> None:
|
|
self.user = user
|
|
self.client_id = client_id
|
|
self.scope = scope
|
|
self.expires_in = expires_in
|
|
self.issued_at = issued_at
|
|
self.access_token = access_token
|
|
self.refresh_token = refresh_token
|
|
|
|
def get_user(self) -> dict:
|
|
return self.user
|
|
|
|
def check_client(self, client: ClientMixin):
|
|
return self.client_id == client.get_client_id()
|
|
|
|
def get_scope(self) -> str:
|
|
return self.scope
|
|
|
|
def get_expires_in(self) -> int:
|
|
return self.expires_in
|
|
|
|
def is_revoked(self) -> bool:
|
|
return False
|
|
|
|
def is_expired(self) -> bool:
|
|
if not self.expires_in:
|
|
return False
|
|
|
|
expires_at = self.issued_at + self.expires_in
|
|
return expires_at < time.time()
|