Ask AI

Source code for dagster_airlift.mwaa.auth

from typing import Any, Optional, Tuple

import boto3
import requests

from dagster_airlift.core.airflow_instance import AirflowAuthBackend


def get_session_info(mwaa: Any, env_name: str) -> Tuple[str, str]:
    # Initialize MWAA client and request a web login token
    response = mwaa.create_web_login_token(Name=env_name)

    # Extract the web server hostname and login token
    web_server_host_name = response["WebServerHostname"]
    web_token = response["WebToken"]

    # Construct the URL needed for authentication
    login_url = f"https://{web_server_host_name}/aws_mwaa/login"
    login_payload = {"token": web_token}

    # Make a POST request to the MWAA login url using the login payload
    response = requests.post(login_url, data=login_payload, timeout=10)

    # Check if login was succesfull
    if response.status_code == 200:
        # Return the hostname and the session cookie
        return (web_server_host_name, response.cookies["session"])
    else:
        raise Exception(f"Failed to get session info: {response.text}")


[docs] class MwaaSessionAuthBackend(AirflowAuthBackend): """A :py:class:`dagster_airlift.core.AirflowAuthBackend` that authenticates to AWS MWAA. Under the hood, this class uses the MWAA boto3 session to request a web login token and then uses the token to authenticate to the MWAA web server. Args: mwaa_session (boto3.Session): The boto3 MWAA session env_name (str): The name of the MWAA environment Examples: Creating an AirflowInstance pointed at a MWAA environment. .. code-block:: python import boto3 from dagster_airlift.mwaa import MwaaSessionAuthBackend from dagster_airlift.core import AirflowInstance boto_client = boto3.client("mwaa") af_instance = AirflowInstance( name="my-mwaa-instance", auth_backend=MwaaSessionAuthBackend( mwaa_client=boto_client, env_name="my-mwaa-env" ) ) """ def __init__(self, mwaa_client: Any, env_name: str) -> None: self.mwaa_client = mwaa_client self.env_name = env_name # Session info is generated when we either try to retrieve a session or retrieve the web server url self._session_info: Optional[Tuple[str, str]] = None @staticmethod def from_profile(region: str, env_name: str, profile_name: Optional[str] = None): boto_session = boto3.Session(profile_name=profile_name, region_name=region) mwaa = boto_session.client("mwaa") return MwaaSessionAuthBackend(mwaa_client=mwaa, env_name=env_name) def get_session(self) -> requests.Session: # Get the session info if not self._session_info: self._session_info = get_session_info(mwaa=self.mwaa_client, env_name=self.env_name) session_cookie = self._session_info[1] # Create a new session session = requests.Session() session.cookies.set("session", session_cookie) # Return the session return session def get_webserver_url(self) -> str: if not self._session_info: self._session_info = get_session_info(mwaa=self.mwaa_client, env_name=self.env_name) return f"https://{self._session_info[0]}"