Skip to content
Snippets Groups Projects

Resolve "Invite a user to clowm via email"

Merged Daniel Göbel requested to merge feature/9-invite-a-user-to-clowm-via-email into main
19 files
+ 929
248
Compare changes
  • Side-by-side
  • Inline
Files
19
+ 91
49
import time
import urllib
import urllib.parse
from typing import Annotated
from uuid import UUID
from fastapi import APIRouter, BackgroundTasks, Query, Request, status
from fastapi.responses import RedirectResponse
@@ -14,33 +16,46 @@ from clowm.core.oidc import LoginException, OIDCClient
from clowm.crud import CRUDUser
from clowm.models import User
from clowm.otlp import start_as_current_span_async
from clowm.smtp.send_email import send_first_login_email
from ..background.initialize_users import initialize_user
from ..dependencies import DBSession, OIDCClientDep, RGWService
from ..utils import create_rgw_user
router = APIRouter(prefix="/auth", tags=["Auth"])
tracer = trace.get_tracer_provider().get_tracer(__name__)
NEXT_PATH_KEY = "NEXT"
INVITATION_UID_KEY = "INVITATION_UID"
def build_url(base_url: str, *path: str) -> AnyHttpUrl:
# Returns a list in the structure of urlparse.ParseResult
url_parts = list(urllib.parse.urlparse(base_url))
url_parts[2] = "/".join(path)
return AnyHttpUrl(urllib.parse.urlunparse(url_parts))
def oidc_redirect_uri(provider: OIDCClient.OIDCProvider) -> AnyHttpUrl:
return AnyHttpUrl.build(
scheme=settings.ui_uri.scheme,
host=settings.ui_uri.host, # type: ignore[arg-type]
path="/".join([settings.ui_uri.path, settings.api_prefix, router.prefix[1:], "callback", provider.name]).strip( # type: ignore[list-item]
"/"
),
port=settings.ui_uri.port,
)
@router.get(
"/login",
status_code=status.HTTP_302_FOUND,
summary="Redirect to LifeScience OIDC Login",
summary="Kickstart the login flow",
)
@start_as_current_span_async("api_login_route", tracer=tracer)
async def login(
request: Request,
oidc_client: OIDCClientDep,
db: DBSession,
invitation_token: Annotated[
str | SkipJsonSchema[None],
Query(
min_length=43,
max_length=43,
description="Unique token to validate an invitation",
),
] = None,
provider: Annotated[
OIDCClient.OIDCProvider, Query(description="The OIDC provider to use for login")
] = OIDCClient.OIDCProvider.lifescience,
@@ -49,7 +64,7 @@ async def login(
Query(
alias="next",
max_length=128,
description="Will be appended to redirect response in the callback route as URL query parameter `next_path`",
description="Will be appended to redirect response in the callback route as URL query parameter `next`",
),
] = None,
) -> RedirectResponse:
@@ -64,9 +79,13 @@ async def login(
The wrapper around the oidc client. Dependency Injection.
request : fastapi.requests.Request
Raw request object.
db : sqlalchemy.ext.asyncio.AsyncSession.
Async database session to perform query on. Dependency Injection.
next_ : str | None
Query parameter that gets stored in the session cookie.
Will be appended to RedirectResponse in the callback route as URL query parameter 'next_path'
Query parameter that gets stored in the session cookie. Query Parameter.
Will be appended to RedirectResponse in the callback route as URL query parameter 'next'
invitation_token : str | None
Token from the invitation email to connect the created account with an identity provider. Query Parameter.
Returns
-------
@@ -75,10 +94,25 @@ async def login(
"""
# Clear session to prevent an overflow
request.session.clear()
current_span = trace.get_current_span()
current_span.set_attribute("provider", provider.name)
if next_:
current_span.set_attribute("next", next_)
request.session[NEXT_PATH_KEY] = next_
redirect_uri = build_url(str(settings.ui_uri), settings.api_prefix, router.prefix[1:], "callback", provider.name)
return await oidc_client.authorize_redirect(request, redirect_uri=redirect_uri, provider=provider)
if invitation_token is not None:
user = await CRUDUser.get_by_invitation_token(invitation_token, db=db)
if user is None:
raise LoginException(error_source="invalid invitation link")
current_span.set_attribute("invitation_uid", str(user.uid))
if (
round(time.time()) - (0 if user.invitation_token_created_at is None else user.invitation_token_created_at)
> 84200
):
raise LoginException(error_source="expired invitation link")
request.session[INVITATION_UID_KEY] = str(user.uid)
return await oidc_client.authorize_redirect(
request, redirect_uri=oidc_redirect_uri(provider=provider), provider=provider
)
@router.get(
@@ -141,54 +175,49 @@ async def login_callback(
path : str
Redirect path after successful login.
"""
redirect_path = "/"
redirect_path = str(settings.ui_uri)
current_span = trace.get_current_span()
current_span.set_attribute("provider", provider.name)
next_path: str | None = request.session.get(NEXT_PATH_KEY, None) # get return path from session cookie
if next_path is not None:
current_span.set_attribute("next", next_path)
redirect_path += f"?next={urllib.parse.quote_plus(next_path)}"
try:
user_info = await oidc_client.verify_fetch_userinfo(request=request, provider=provider)
lifescience_id = user_info.sub if isinstance(user_info.sub, str) else user_info.sub[0]
current_span.set_attribute("lifescience_id", lifescience_id)
lifescience_id = lifescience_id.split("@")[0]
user = await CRUDUser.get_by_lifescience_id(lifescience_id=lifescience_id, db=db)
lifescience_id = (user_info.sub if isinstance(user_info.sub, str) else user_info.sub[0]).split("@")[0]
current_span.set_attributes({"lifescience_id": lifescience_id, "raw_lifescience_id": user_info.sub})
# if we want to block foreign users and the user is None or has no role, reject this login attempt
if settings.block_foreign_users and (user is None or len(user.roles) == 0):
raise LoginException(error_source="Access denied")
# if user does not exist in system
if user is None:
# try to get user by smtp to get registered but not initialized user
user = await CRUDUser.get_by_email(db=db, email=user_info.email)
invitation_uid: str | None = request.session.get(INVITATION_UID_KEY, None)
# if the user was invited
if invitation_uid is not None:
current_span.set_attribute("invitation_uid", invitation_uid)
user = await CRUDUser.get(uid=UUID(invitation_uid), db=db)
if user is None:
raise LoginException(error_source="unknown user")
elif await CRUDUser.get_by_lifescience_id(lifescience_id=lifescience_id, db=db) is not None:
raise LoginException(error_source="lifescience account already connected to other account")
# update the invited user and initialize him
await CRUDUser.update_invited_user(
user.uid, lifescience_id=lifescience_id, display_name=user_info.name, email=user_info.email, db=db
)
create_rgw_user(user=user, background_tasks=background_tasks, rgw=rgw)
else:
user = await CRUDUser.get_by_lifescience_id(lifescience_id=lifescience_id, db=db)
# if we want to block foreign users and the user is None or has no role, reject this login attempt
if settings.block_foreign_users:
if user is None:
raise LoginException(error_source="Access denied")
elif len(user.roles) == 0:
raise LoginException(error_source="Access denied")
# if user does not exist in system, create a new user
if user is None:
# if user is not registered by admin, create a new user
user = await CRUDUser.create(
User(lifescience_id=lifescience_id, display_name=user_info.name, email=user_info.email),
db=db,
)
else: # if smtp is connected to a user, connect the user to this lifescience account
# if user is already connected to a lifescience account, reject this login attempt
if user.lifescience_id is not None:
raise LoginException("Email already connected to other account")
await CRUDUser.update_registered_user(
uid=user.uid, db=db, lifescience_id=lifescience_id, display_name=user_info.name
)
with tracer.start_as_current_span(
"rgw_create_user", attributes={"uid": str(user.uid), "display_name": user.display_name}
):
rgw.create_user(
uid=str(user.uid),
max_buckets=-1,
display_name=user.display_name,
)
background_tasks.add_task(initialize_user, user=user)
background_tasks.add_task(send_first_login_email, user=user)
elif user.email != user_info.email:
await CRUDUser.update_email(user.uid, user_info.email, db=db)
create_rgw_user(user=user, background_tasks=background_tasks, rgw=rgw)
elif user.email != user_info.email:
await CRUDUser.update_email(user.uid, user_info.email, db=db)
jwt = create_access_token(str(user.uid))
response.set_cookie(
key="bearer",
@@ -229,5 +258,18 @@ async def login_callback(
},
)
async def logout(response: RedirectResponse) -> str:
response.set_cookie(key="bearer", secure=True, max_age=0, domain=settings.ui_uri.host)
"""
Logout the user from the system by deleting the bearer cookie.
Parameters
----------
response : fastapi.responses.RedirectResponse
Response which will delete the JWT cookie.
Returns
-------
path : str
Redirect path after successful logout.
"""
response.delete_cookie(key="bearer", secure=True, domain=settings.ui_uri.host)
return str(settings.ui_uri)
Loading