Source code for src.core.services.auth_service

# Copyright (C) 2025 Raccoon Survey org
# This file is part of Raccoon Survey.
# Raccoon Survey is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License v3 as published by
# the Free Software Foundation.
# See the LICENSE file distributed with this program for details.

from __future__ import annotations

from datetime import timedelta

from flask_jwt_extended import create_access_token, create_refresh_token
from werkzeug.security import check_password_hash

from src.core.models import User

from . import jwt_blocklist


[docs] def build_claims(user: User) -> dict: """Builds JWT claims from a user object. Args: user (User): User object Returns: dict: JWT claims """ return { "role": getattr(getattr(user, "role", None), "name", None), "team_id": getattr(user, "team_id", None), "name": getattr(user, "name", None), }
[docs] def verify_user_active_role(user: User) -> bool: """Verifies if a user has an active role. Args: user (User): User object Returns: bool: True if user has an active role, False otherwise """ role = getattr(user, "role", None) return bool(role) and bool(getattr(role, "state", True))
[docs] def check_password(user: User, password: str) -> bool: """Checks if a password matches the user's password hash. Args: user (User): User object password (str): Password to check Returns: bool: True if password matches, False otherwise """ return check_password_hash(user.password_hash, password)
[docs] def create_tokens( user: User, access_expires_seconds: int, refresh_expires_seconds: int ) -> dict: """Creates access and refresh tokens for a user. Args: user (User): User object access_expires_seconds (int): Access token expiration time in seconds refresh_expires_seconds (int): Refresh token expiration time in seconds Returns: dict: A dictionary containing access and refresh tokens """ claims = build_claims(user) access_expires = timedelta(seconds=int(access_expires_seconds)) refresh_expires = timedelta(seconds=int(refresh_expires_seconds)) access_token = create_access_token( identity=str(user.id), additional_claims=claims, expires_delta=access_expires ) refresh_token = create_refresh_token( identity=str(user.id), additional_claims=claims, expires_delta=refresh_expires ) return {"access_token": access_token, "refresh_token": refresh_token}
[docs] def refresh_access_token( identity: str, claims: dict, access_expires_seconds: int ) -> dict: """Refreshes an access token for a user. Args: identity (str): User identity claims (dict): JWT claims access_expires_seconds (int): Access token expiration time in seconds Returns: dict: A dictionary containing a new access token """ access_expires = timedelta(seconds=int(access_expires_seconds)) access_token = create_access_token( identity=identity, additional_claims=claims, expires_delta=access_expires ) return {"access_token": access_token}
[docs] def revoke_token(jti: str | None) -> None: """Revokes a JWT token. Args: jti (str | None): JWT token ID """ if jti: jwt_blocklist.revoke_token(jti)
[docs] def profile_from_jwt(identity: str | None, jwt_payload: dict) -> dict: """Builds a user profile from JWT payload. Args: identity (str | None): User identity jwt_payload (dict): JWT payload Returns: dict: User profile """ return { "id": identity, "role": jwt_payload.get("role"), "team_id": jwt_payload.get("team_id"), "name": jwt_payload.get("name"), }
__all__ = [ "build_claims", "check_password", "create_tokens", "profile_from_jwt", "refresh_access_token", "revoke_token", "verify_user_active_role", ]