Source code for src.core.routes.auth

# Copyright (C) 2025 Raccoon Survey org
# This file is part of Raccoon Survey.
# Raccoon Survey is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License v3 as published by
# the Free Software Foundation.
# See the LICENSE file distributed with this program for details.

from __future__ import annotations

from flask import Blueprint, current_app, g, jsonify, request
from flask_jwt_extended import (
    get_jwt,
    get_jwt_identity,
    jwt_required,
)

from src.core.middlewares.rbac import role_required
from src.core.middlewares.user_required import user_required
from src.core.services import auth_service

auth_bp = Blueprint("auth", __name__)


[docs] @auth_bp.post("/login") @user_required(source="json", key="email", field="email") def login(): """Handles user login and generates JWT tokens. Returns: JSON: A JSON response containing access and refresh tokens. """ data = request.get_json(silent=True) or {} email = (data.get("email") or "").strip().lower() password = data.get("password") or "" if not email or not password: return jsonify({"message": "email and password are required"}), 400 user = getattr(g, "current_user", None) if not user: return jsonify({"message": "user not found"}), 404 if not auth_service.verify_user_active_role(user): return jsonify({"message": "unauthorized"}), 403 if not auth_service.check_password(user, password): return jsonify({"message": "invalid credentials"}), 401 access_expires_seconds = int( current_app.config.get("JWT_ACCESS_TOKEN_EXPIRES", 900) ) refresh_expires_seconds = int( current_app.config.get("JWT_REFRESH_TOKEN_EXPIRES", 2592000) ) tokens = auth_service.create_tokens( user, access_expires_seconds, refresh_expires_seconds ) return jsonify(tokens), 200
[docs] @auth_bp.post("/refresh") @jwt_required(refresh=True) def refresh(): """Handles token refresh and generates a new access token. Returns: JSON: A JSON response containing a new access token. """ identity = get_jwt_identity() jwt_payload = get_jwt() claims = { "role": jwt_payload.get("role"), "team_id": jwt_payload.get("team_id"), "name": jwt_payload.get("name"), } access_expires_seconds = int( current_app.config.get("JWT_ACCESS_TOKEN_EXPIRES", 900) ) result = auth_service.refresh_access_token(identity, claims, access_expires_seconds) return jsonify(result), 200
[docs] @auth_bp.post("/logout") @jwt_required() def logout(): """Handles user logout and revokes the refresh token. Returns: JSON: A JSON response confirming successful logout. """ jti = get_jwt().get("jti") auth_service.revoke_token(jti) return jsonify({"message": "logout successful"}), 200
[docs] @auth_bp.get("/me") @role_required("admin", "rrhh") def me(): """Handles user profile retrieval. Returns: JSON: A JSON response containing user profile information. """ jwt_payload = get_jwt() identity = get_jwt_identity() profile = auth_service.profile_from_jwt(identity, jwt_payload) return jsonify(profile), 200