Skip to content
Home » Introducing Amazon MWAA help for the Airflow REST API and net server auto scaling

Introducing Amazon MWAA help for the Airflow REST API and net server auto scaling


Apache Airflow is a well-liked platform for enterprises trying to orchestrate complicated information pipelines and workflows. Amazon Managed Workflows for Apache Airflow (Amazon MWAA) is a managed service that streamlines the setup and operation of safe and extremely out there Airflow environments within the cloud.

On this publish, we’re excited to introduce two new options that tackle frequent buyer challenges and unlock new potentialities for constructing strong, scalable, and versatile information orchestration options utilizing Amazon MWAA. First, the Airflow REST API help permits programmatic interplay with Airflow assets like connections, Directed Acyclic Graphs (DAGs), DAGRuns, and Activity situations. Second, the choice to horizontally scale net server capability helps you deal with elevated demand, whether or not from REST API requests, command line interface (CLI) utilization, or extra concurrent Airflow UI customers. Each options can be found for all actively supported Amazon MWAA variations, together with model 2.4.3 and newer.

Airflow REST API help

A often requested characteristic from Amazon MWAA clients has been the power to work together with their workflows programmatically utilizing Airflow’s APIs. The introduction of REST API help in Amazon MWAA addresses this want, offering a standardized approach to entry and handle your Airflow atmosphere. With the brand new REST API, now you can invoke DAG runs, handle datasets, or get the standing of Airflow’s metadata database, set off, and scheduler—all with out counting on the Airflow net UI or CLI.

One other instance is constructing monitoring dashboards that combination the standing of your DAGs throughout a number of Amazon MWAA environments, or invoke workflows in response to occasions from exterior methods, reminiscent of accomplished database jobs or new consumer signups.

This characteristic opens up a world of potentialities for integrating your Amazon MWAA environments with different methods and constructing customized options that use the ability of your information orchestration pipelines.

To reveal this new functionality, we use the REST API to invoke a brand new DAG run. Comply with the method detailed within the following sections.

Authenticate with the Airflow REST API

For a consumer to authenticate with the REST API, they want the mandatory permissions to create an online login token, much like the way it works with the Airflow UI. Check with Creating an Apache Airflow net login token for extra particulars. The consumer’s AWS Identification and Entry Administration (IAM) function or coverage should embody the CreateWebLoginToken permission to generate a token for authenticating. Moreover, the consumer’s permissions for interacting with the REST API are decided by the Airflow function assigned to them inside Amazon MWAA. The Airflow roles govern the consumer’s skill to carry out numerous operations, reminiscent of invoking DAG runs, checking statuses, or modifying configurations, by way of the REST API endpoints.

The next is an instance of the authentication course of:

def get_session_info(area, env_name):
    """
    Retrieves the net server hostname and session cookie for an MWAA atmosphere.
    
    Args:
        area (str): The AWS area the place the MWAA atmosphere is situated.
        env_name (str): The title of the MWAA atmosphere.

    Returns:
        tuple: A tuple containing the net server hostname and session cookie, or (None, None) on failure.
    """

    logging.basicConfig(stage=logging.INFO)

    attempt:
        # Initialize MWAA consumer and request an online login token
        mwaa = boto3.consumer('mwaa', region_name=area)
        response = mwaa.create_web_login_token(Title=env_name)
        
        # Extract the net server hostname and login token
        web_server_host_name = response["WebServerHostname"]
        web_token = response["WebToken"]
        
        # Assemble the URL wanted for authentication 
        login_url = f"https://{web_server_host_name}/aws_mwaa/login"
        login_payload = {"token": web_token}

        # Make a POST request to the MWAA login url utilizing the login payload
        response = requests.publish(
            login_url,
            information=login_payload,
            timeout=10
        )

        # Examine if login was succesfull 
        if response.status_code == 200:
        
            # Return the hostname and the session cookie 
            return (
                web_server_host_name,
                response.cookies["session"]
            )
        else:
            # Log an error
            logging.error("Didn't log in: HTTP %d", response.status_code)
            return None
    besides requests.RequestException as e:
         # Log any exceptions raised in the course of the request to the MWAA login endpoint
        logging.error("Request failed: %s", str(e))
        return None
    besides Exception as e:
        # Log another surprising exceptions
        logging.error("An surprising error occurred: %s", str(e))
        return None

The get_session_info operate makes use of the AWS SDK for Python (Boto3) and the python request library for the preliminary steps required for authentication, retrieving an online token and a session cookie, which is legitimate for 12 hours. These can be used for subsequent REST API requests.

Invoke the Airflow REST API endpoint

When authentication is full, you will have the credentials to start out sending requests to the API endpoints. Within the following instance, we use the endpoint /dags/{dag_id}/dagRuns to provoke a DAG run:

def trigger_dag(area, env_name, dag_name):
    """
    Triggers a DAG in a specified MWAA atmosphere utilizing the Airflow REST API.

    Args:
    area (str): AWS area the place the MWAA atmosphere is hosted.
    env_name (str): Title of the MWAA atmosphere.
    dag_name (str): Title of the DAG to set off.
    """

    logging.information(f"Trying to set off DAG {dag_name} in atmosphere {env_name} at area {area}")

    # Retrieve the net server hostname and session cookie for authentication
    attempt:
        web_server_host_name, session_cookie = get_session_info(area, env_name)
        if not session_cookie:
            logging.error("Authentication failed, no session cookie retrieved.")
            return
    besides Exception as e:
        logging.error(f"Error retrieving session information: {str(e)}")
        return

    # Put together headers and payload for the request
    cookies = {"session": session_cookie}
    json_body = {"conf": {}}

    # Assemble the URL for triggering the DAG
    url = f"https://{web_server_host_name}/api/v1/dags/{dag_id}/dagRuns"

    # Ship the POST request to set off the DAG
    attempt:
        response = requests.publish(url, cookies=cookies, json=json_body)
        # Examine the response standing code to find out if the DAG was triggered efficiently
        if response.status_code == 200:
            logging.information("DAG triggered efficiently.")
        else:
            logging.error(f"Didn't set off DAG: HTTP {response.status_code} - {response.textual content}")
    besides requests.RequestException as e:
        logging.error(f"Request to set off DAG failed: {str(e)}")

if __name__ == "__main__":
    logging.basicConfig(stage=logging.INFO)

    # Examine if the right variety of arguments is supplied
    if len(sys.argv) != 4:
        logging.error("Incorrect utilization. Correct format: python script_name.py <area> <env_name> <dag_name>")
        sys.exit(1)

    area = sys.argv[1]
    env_name = sys.argv[2]
    dag_name = sys.argv[3]

    # Set off the DAG with the supplied arguments
    trigger_dag(area, env_name, dag_name)

The next is the entire code of trigger_dag.py:

import sys
import boto3
import requests
import logging

def get_session_info(area, env_name):

    """
    Retrieves the net server hostname and session cookie for an MWAA atmosphere.
    
    Args:
        area (str): The AWS area the place the MWAA atmosphere is situated.
        env_name (str): The title of the MWAA atmosphere.

    Returns:
        tuple: A tuple containing the net server hostname and session cookie, or (None, None) on failure.
    """

    logging.basicConfig(stage=logging.INFO)

    attempt:
        # Initialize MWAA consumer and request an online login token
        mwaa = boto3.consumer('mwaa', region_name=area)
        response = mwaa.create_web_login_token(Title=env_name)
        
        # Extract the net server hostname and login token
        web_server_host_name = response["WebServerHostname"]
        web_token = response["WebToken"]
        
        # Assemble the URL wanted for authentication 
        login_url = f"https://{web_server_host_name}/aws_mwaa/login"
        login_payload = {"token": web_token}

        # Make a POST request to the MWAA login url utilizing the login payload
        response = requests.publish(
            login_url,
            information=login_payload,
            timeout=10
        )

        # Examine if login was succesfull 
        if response.status_code == 200:
        
            # Return the hostname and the session cookie 
            return (
                web_server_host_name,
                response.cookies["session"]
            )
        else:
            # Log an error
            logging.error("Didn't log in: HTTP %d", response.status_code)
            return None
    besides requests.RequestException as e:
         # Log any exceptions raised in the course of the request to the MWAA login endpoint
        logging.error("Request failed: %s", str(e))
        return None
    besides Exception as e:
        # Log another surprising exceptions
        logging.error("An surprising error occurred: %s", str(e))
        return None

def trigger_dag(area, env_name, dag_name):
    """
    Triggers a DAG in a specified MWAA atmosphere utilizing the Airflow REST API.

    Args:
    area (str): AWS area the place the MWAA atmosphere is hosted.
    env_name (str): Title of the MWAA atmosphere.
    dag_name (str): Title of the DAG to set off.
    """

    logging.information(f"Trying to set off DAG {dag_name} in atmosphere {env_name} at area {area}")

    # Retrieve the net server hostname and session cookie for authentication
    attempt:
        web_server_host_name, session_cookie = get_session_info(area, env_name)
        if not session_cookie:
            logging.error("Authentication failed, no session cookie retrieved.")
            return
    besides Exception as e:
        logging.error(f"Error retrieving session information: {str(e)}")
        return

    # Put together headers and payload for the request
    cookies = {"session": session_cookie}
    json_body = {"conf": {}}

    # Assemble the URL for triggering the DAG
    url = f"https://{web_server_host_name}/api/v1/dags/{dag_name}/dagRuns"

    # Ship the POST request to set off the DAG
    attempt:
        response = requests.publish(url, cookies=cookies, json=json_body)
        # Examine the response standing code to find out if the DAG was triggered efficiently
        if response.status_code == 200:
            logging.information("DAG triggered efficiently.")
        else:
            logging.error(f"Didn't set off DAG: HTTP {response.status_code} - {response.textual content}")
    besides requests.RequestException as e:
        logging.error(f"Request to set off DAG failed: {str(e)}")

if __name__ == "__main__":
    logging.basicConfig(stage=logging.INFO)

    # Examine if the right variety of arguments is supplied
    if len(sys.argv) != 4:
        logging.error("Incorrect utilization. Correct format: python script_name.py <area> <env_name> <dag_name>")
        sys.exit(1)

    area = sys.argv[1]
    env_name = sys.argv[2]
    dag_name = sys.argv[3]

    # Set off the DAG with the supplied arguments
    trigger_dag(area, env_name, dag_name)

Run the request script

Run the request script with the next code, offering your AWS Area, Amazon MWAA atmosphere title, and DAG title:

python3 trigger_dag.py <area> <env_name> <dag_name>

Validate the API end result

The next screenshot exhibits the end result within the CLI.

Examine the DAG run within the Airflow UI

The next screenshot exhibits the DAG run standing within the Airflow UI.

You need to use another endpoint within the REST API to allow programmatic management, automation, integration, and administration of Airflow workflows and assets. To be taught extra concerning the Airflow REST API and its numerous endpoints, check with the Airflow documentation.

Internet server auto scaling

One other key request from Amazon MWAA clients has been the power to dynamically scale their net servers to deal with fluctuating workloads. Beforehand, you have been constrained by two net servers supplied with an Airflow atmosphere on Amazon MWAA and had no approach to horizontally scale net server capability, which may result in efficiency points throughout peak masses. The brand new net server auto scaling characteristic in Amazon MWAA solves this drawback. By robotically scaling the variety of net servers based mostly on CPU utilization and lively connection rely, Amazon MWAA makes certain your Airflow atmosphere can seamlessly accommodate elevated demand, whether or not from REST API requests, CLI utilization, or extra concurrent Airflow UI customers.

Arrange net server auto scaling

To arrange auto scaling on your Amazon MWAA atmosphere net servers, comply with these steps:

  1. On the Amazon MWAA console, navigate to the atmosphere you wish to configure auto scaling for.
  2. Select Edit.
  3. Select Subsequent.
  4. On the Configure superior settings web page, within the Setting class part, add the utmost and minimal net server rely. For this instance, we set the higher restrict to five and decrease restrict to 2.

These settings permit Amazon MWAA to robotically scale up the Airflow net server when demand will increase and scale down conservatively when demand decreases, optimizing useful resource utilization and price.

Set off auto scaling programmatically

After you configure auto scaling, you would possibly wish to check the way it behaves below simulated situations. Utilizing the Python code construction we mentioned earlier for invoking a DAG, you too can use the Airflow REST API to simulate a load check and see how effectively your auto scaling setup responds. For the aim of load testing, we have now configured our Amazon MWAA atmosphere with an mw1.small occasion class. The next is an instance implementation utilizing load_test.py:

import sys
import time
import boto3
import requests
import logging
import concurrent.futures

def get_session_info(area, env_name):
    """
    Retrieves the net server hostname and session cookie for an MWAA atmosphere.
    
    Args:
        area (str): The AWS area the place the MWAA atmosphere is situated.
        env_name (str): The title of the MWAA atmosphere.

    Returns:
        tuple: A tuple containing the net server hostname and session cookie, or (None, None) on failure.
    """
    attempt:
        # Create an MWAA consumer within the specified area
        mwaa = boto3.consumer('mwaa', region_name=area)
        # Request an online login token for the desired atmosphere
        response = mwaa.create_web_login_token(Title=env_name)
        web_server_host_name = response["WebServerHostname"]
        web_token = response["WebToken"]

        # Assemble the login URL and payload for authentication
        login_url = f"https://{web_server_host_name}/aws_mwaa/login"
        login_payload = {"token": web_token}

        # Authenticate and procure the session cookie
        response = requests.publish(login_url, information=login_payload, timeout=10)
        if response.status_code == 200:
            return web_server_host_name, response.cookies["session"]
        else:
            logging.error(f"Didn't log in: HTTP {response.status_code}")
            return None, None
    besides requests.RequestException as e:
        logging.error(f"Request failed: {e}")
        return None, None
    besides Exception as e:
        logging.error(f"An surprising error occurred: {e}")
        return None, None
    
def call_rest_api(web_server_host_name, session_cookie):
    """
    Calls the Airflow net server API to fetch particulars of all DAGs and measures the time taken for the decision.

    Args:
        web_server_host_name (str): The hostname of the MWAA net server.
        session_cookie (str): The session cookie for authentication.
    """
    # Outline the endpoint for fetching all DAGs
    url = f"https://{web_server_host_name}/api/v1/dags"
    headers = {'Content material-Sort': 'utility/json', 'Cookie': f'session={session_cookie}'}

    attempt:
        start_time = time.time()
        response = requests.get(url, headers=headers)
        elapsed_time = time.time() - start_time

        if response.status_code == 200:
            logging.information(f"API name profitable, fetched {len(response.json()['dags'])} DAGs, took {elapsed_time:.2f} seconds")
        else:
            logging.error(f"API name failed with standing code: {response.status_code}, took {elapsed_time:.2f} seconds")
    besides requests.RequestException as e:
        logging.error(f"Error throughout API name: {e}")

def run_load_test(web_server_host_name, session_cookie, qps, period):
    """
    Performs a load check by sending concurrent requests at a specified charge over a given period.

    Args:
        web_server_host_name (str): The hostname of the MWAA net server.
        session_cookie (str): The session cookie for authentication.
        qps (int): Queries per second.
        period (int): Period of the check in seconds.
    """
    interval = 1.0 / qps
    start_time = time.time()

    with concurrent.futures.ThreadPoolExecutor(max_workers=qps) as executor:
        whereas time.time() - start_time < period:
            futures = [executor.submit(call_rest_api, web_server_host_name, session_cookie) for _ in range(qps)]
            concurrent.futures.wait(futures)
            time.sleep(interval)
    
    logging.information("Load check accomplished.")

def essential(area, env_name, qps, period):
    """
    Essential operate to execute the load testing script.

    Args:
        area (str): AWS area the place the MWAA atmosphere is hosted.
        env_name (str): Title of the MWAA atmosphere.
        qps (int): Queries per second.
        period (int): Period in seconds.
    """
    web_server_host_name, session_cookie = get_session_info(area, env_name)
    if not web_server_host_name or not session_cookie:
        logging.error("Didn't retrieve session data")
        return

    run_load_test(web_server_host_name, session_cookie, qps, period)

if __name__ == "__main__":
    logging.basicConfig(stage=logging.INFO)
    if len(sys.argv) != 5:
        logging.error("Incorrect utilization. Correct format: python load_test.py <area> <env_name> <qps> <period>")
        sys.exit(1)

    area = sys.argv[1]
    env_name = sys.argv[2]
    qps = int(sys.argv[3])
    period = int(sys.argv[4])

    essential(area, env_name, qps, period)

The Python code makes use of thread pooling and concurrency ideas to assist check the auto scaling efficiency of your net server by simulating visitors. This script automates the method of sending a selected variety of requests per second to your net server, enabling you to set off an auto scaling occasion.

You need to use the next command to run the script. It’s important to present the Area, Amazon MWAA atmosphere title, what number of queries per seconds you wish to run in opposition to the net server, and the period for which you need the load check to run.

python load_test.py <area> <env_name> <qps> <period>

For instance:

python load_test.py us_west_2 MyMWAAEnvironment 10 1080

The previous command will run 10 queries per second for 18 minutes.

When the script is working, you’ll begin seeing rows that present how lengthy (in seconds) it took for the net server to course of the request.

This time will progressively begin to enhance. As lively connection rely or CPU utilization enhance, Amazon MWAA will dynamically scale the net servers to accommodate the load.

As new net servers come on-line, your atmosphere will have the ability to deal with elevated load, and the response time will drop. Amazon MWAA gives net server container metrics within the AWS/MWAA service namespace in Amazon CloudWatch, permitting you to watch the net server efficiency. The next screenshots present an instance of the auto scaling occasion.

Suggestion

Figuring out the suitable minimal and most net server rely includes fastidiously contemplating your typical workload patterns, efficiency necessities, and price constrains. To set these values, contemplate metrics just like the required REST API throughput at peak occasions and the utmost variety of concurrent UI customers you anticipate to have. It’s vital to notice that Amazon MWAA can help as much as 10 queries per second (QPS) for the Airflow REST API at full scale for any atmosphere measurement, supplied you comply with the beneficial variety of DAGs.

Amazon MWAA integration with CloudWatch gives granular metrics and monitoring capabilities that can assist you discover the optimum configuration on your particular use case. For those who anticipate durations of constantly excessive demand or elevated workloads for an prolonged period, you’ll be able to configure your Amazon MWAA atmosphere to take care of the next minimal variety of net servers. By setting the minimal net server setting to 2 or extra, you can also make certain your atmosphere all the time has adequate capability to deal with load peaks with no need to attend for auto scaling to provision further assets. This comes at the price of working extra net server situations, which is a trade-off between cost-optimization and responsiveness.

Conclusion

Right this moment, we’re saying the provision of the Airflow REST API and net server auto scaling in Amazon MWAA. The REST API gives a standardized approach to programmatically work together with and handle assets in your Amazon MWAA environments. This permits seamless integration, automation, and extensibility of Amazon MWAA inside your group’s current information and utility panorama. With net server auto scaling, you’ll be able to robotically enhance the variety of net server situations based mostly on useful resource utilization, and Amazon MWAA makes certain your Airflow workflows can deal with fluctuating workloads with out guide intervention.

These options lay the inspiration so that you can construct extra strong, scalable, and versatile information orchestration pipelines. We encourage you to make use of them to streamline your information engineering operations and unlock new potentialities for your enterprise.

To begin constructing with Amazon MWAA, see Get began with Amazon Managed Workflows for Apache Airflow.

Keep tuned for future updates and enhancements to Amazon MWAA that may proceed to boost the developer expertise and unlock new alternatives for data-driven organizations.


In regards to the Authors

Mansi Bhutada is an ISV Options Architect based mostly within the Netherlands. She helps clients design and implement well-architected options in AWS that tackle their enterprise issues. She is obsessed with information analytics and networking. Past work, she enjoys experimenting with meals, enjoying pickleball, and diving into enjoyable board video games.

Kartikay KhatorKartikay Khator is a Options Architect throughout the International Life Sciences at AWS, the place he dedicates his efforts to growing modern and scalable options that cater to the evolving wants of shoppers. His experience lies in harnessing the capabilities of AWS Analytics companies. Extending past his skilled pursuits, he finds pleasure and achievement on this planet of working and climbing. Having already accomplished two marathons, he’s at present making ready for his subsequent marathon problem.

Kamen SharlandjievKamen Sharlandjiev is a Sr. Huge Knowledge and ETL Options Architect, MWAA and AWS Glue ETL skilled. He’s on a mission to make life simpler for patrons who’re going through complicated information integration and orchestration challenges. His secret weapon? Totally managed AWS companies that may get the job executed with minimal effort. Comply with Kamen on LinkedIn to maintain updated with the newest MWAA and AWS Glue options and information!

Leave a Reply

Your email address will not be published. Required fields are marked *