"""Logto OIDC helpers for external sign-in.""" import logging import os from authlib.integrations.flask_client import OAuth REQUIRED_LOGTO_VARS = ( 'LOGTO_OIDC_ENDPOINT', 'LOGTO_APP_ID', 'LOGTO_APP_SECRET', ) oauth = OAuth() def logto_sso_available() -> bool: return all(os.environ.get(k) for k in REQUIRED_LOGTO_VARS) def _endpoint() -> str: return os.environ['LOGTO_OIDC_ENDPOINT'].rstrip('/') def init_logto_oauth(app): """Register Authlib OAuth client when Logto env vars are present.""" oauth.init_app(app) if not logto_sso_available(): return endpoint = _endpoint() oauth.register( name='logto', client_id=app.config['LOGTO_APP_ID'], client_secret=app.config['LOGTO_APP_SECRET'], server_metadata_url=f'{endpoint}/oidc/.well-known/openid-configuration', client_kwargs={'scope': 'openid email profile'}, ) def extract_logto_email(oauth_client, token) -> str | None: """Resolve the user's email from Logto token claims or the userinfo endpoint.""" if not isinstance(token, dict): return None def normalize(value) -> str | None: if not value or not isinstance(value, str): return None normalized = value.strip().lower() return normalized if '@' in normalized else None userinfo = token.get('userinfo') for source in (userinfo, token): if not source: continue email = normalize(source.get('email')) if email: return email email = normalize(source.get('preferred_username')) if email: return email try: fetched = oauth_client.userinfo(token=token) email = normalize(fetched.get('email')) if email: return email email = normalize(fetched.get('preferred_username')) if email: return email logging.warning( 'Logto userinfo did not include an email claim (scope=%s)', token.get('scope'), ) except Exception: logging.exception('Failed to fetch Logto userinfo') return None