Introducing Amazon MWAA help for Apache Airflow model 2.9.2

[ad_1]

Amazon Managed Workflows for Apache Airflow (Amazon MWAA) is a managed orchestration service for Apache Airflow that considerably improves safety and availability, and reduces infrastructure administration overhead when establishing and working end-to-end information pipelines within the cloud.

Right this moment, we’re asserting the provision of Apache Airflow model 2.9.2 environments on Amazon MWAA. Apache Airflow 2.9.2 introduces a number of notable enhancements, corresponding to new API endpoints for improved dataset administration, superior scheduling choices together with conditional expressions for dataset dependencies, the mix of dataset and time-based schedules, and customized names in dynamic job mapping for higher readability of your DAGs.

On this publish, we stroll you thru a few of these new options and capabilities, how you need to use them, and how one can arrange or improve your Amazon MWAA environments to Airflow 2.9.2.

With every new model launch, the Apache Airflow group is innovating to make Airflow extra data-aware, enabling you to construct reactive, event-driven workflows that may accommodate adjustments in datasets, both between Airflow environments or in exterior methods. Let’s undergo a few of these new capabilities.

Logical operators and conditional expressions for DAG scheduling

Previous to the introduction of this functionality, customers confronted vital limitations when working with complicated scheduling situations involving a number of datasets. Airflow’s scheduling capabilities have been restricted to logical AND mixtures of datasets, which means {that a} DAG run would solely be created in spite of everything specified datasets have been up to date for the reason that final run. This inflexible strategy posed challenges for workflows that required extra nuanced triggering situations, corresponding to operating a DAG when any considered one of a number of datasets was up to date or when particular mixtures of dataset updates occurred.

With the discharge of Airflow 2.9.2, now you can use logical operators (AND and OR) and conditional expressions to outline intricate scheduling situations based mostly on dataset updates. This characteristic permits for granular management over workflow triggers, enabling DAGs to be scheduled each time a selected dataset or mixture of datasets is up to date.

For instance, within the monetary companies business, a threat administration course of may have to be run each time buying and selling information from any regional market is refreshed, or when each buying and selling and regulatory updates can be found. The brand new scheduling capabilities obtainable in Amazon MWAA let you specific such complicated logic utilizing easy expressions. The next diagram illustrates the dependency we have to set up.

The next DAG code comprises the logical operations to implement these dependencies:

from airflow.decorators import dag, job
from airflow.datasets import Dataset
from pendulum import datetime

trading_data_asia = Dataset("s3://buying and selling/asia/information.parquet")
trading_data_europe = Dataset("s3://buying and selling/europe/information.parquet")
trading_data_americas = Dataset("s3://buying and selling/americas/information.parquet")
regulatory_updates = Dataset("s3://regulators/updates.json")

@dag(
    dag_id='risk_management_trading_data',
    start_date=datetime(2023, 5, 1),
    schedule=((trading_data_asia | trading_data_europe | trading_data_americas) & regulatory_updates),
    catchup=False
)
def risk_management_pipeline():
    @job
    def risk_analysis():
        # Activity for threat evaluation
        ...

    @job
    def reporting():
        # Activity for reporting
        ...

    @job
    def notifications():
        # Activity for notifications
        ...

    evaluation = risk_analysis()
    report = reporting()
    notify = notifications()

risk_management_pipeline()

To study extra about this characteristic, consult with Logical operators for datasets within the Airflow documentation.

Combining dataset and time-based schedules

With Airflow 2.9.2 environments, Amazon MWAA now has a extra complete scheduling mechanism that mixes the flexibleness of data-driven execution with the consistency of time-based schedules.

Contemplate a state of affairs the place your workforce is liable for managing an information pipeline that generates every day gross sales stories. This pipeline depends on information from a number of sources. Though it’s important to generate these gross sales stories every day to supply well timed insights to enterprise stakeholders, you additionally want to ensure the stories are updated and replicate essential information adjustments as quickly as attainable. For example, if there’s a major inflow of orders throughout a promotional marketing campaign, or if stock ranges change unexpectedly, the report ought to incorporate these updates to keep up relevance.

Relying solely on time-based scheduling for this kind of information pipeline might result in potential points corresponding to outdated info and infrastructure useful resource wastage.

The DatasetOrTimeSchedule characteristic launched in Airflow 2.9 provides the potential to mix conditional dataset expressions with time-based schedules. Which means your workflow might be invoked not solely at predefined intervals but in addition each time there are updates to the required datasets, with the particular dependency relationship amongst them. The next diagram illustrates how you need to use this functionality to accommodate such situations.

See the next DAG code for an instance implementation:

from airflow.decorators import dag, job
from airflow.timetables.datasets import DatasetOrTimeSchedule
from airflow.timetables.set off import CronTriggerTimetable
from airflow.datasets import Dataset
from datetime import datetime

# Outline datasets
orders_dataset = Dataset("s3://path/to/orders/information")
inventory_dataset = Dataset("s3://path/to/stock/information")
customer_dataset = Dataset("s3://path/to/buyer/information")

# Mix datasets utilizing logical operators
combined_dataset = (orders_dataset & inventory_dataset) | customer_dataset

@dag(
    dag_id="dataset_time_scheduling",
    start_date=datetime(2024, 1, 1),
    schedule=DatasetOrTimeSchedule(
        timetable=CronTriggerTimetable("0 0 * * *", timezone="UTC"),  # Every day at midnight
        datasets=combined_dataset
    ),
    catchup=False,
)
def dataset_time_scheduling_pipeline():
    @job
    def process_orders():
        # Activity logic for processing orders
        cross

    @job
    def update_inventory():
        # Activity logic for updating stock
        cross

    @job
    def update_customer_data():
        # Activity logic for updating buyer information
        cross

    orders_task = process_orders()
    inventory_task = update_inventory()
    customer_task = update_customer_data()

dataset_time_scheduling_pipeline()

Within the instance, the DAG will probably be run underneath two situations:

  • When the time-based schedule is met (every day at midnight UTC)
  • When the mixed dataset situation is met, when there are updates to each orders and stock information, or when there are updates to buyer information, whatever the different datasets

This flexibility allows you to create subtle scheduling guidelines that cater to the distinctive necessities of your information pipelines, in order that they run when essential and incorporate the most recent information updates from a number of sources.

For extra particulars on data-aware scheduling, consult with Information-aware scheduling within the Airflow documentation.

Dataset occasion REST API endpoints

Previous to the introduction of this characteristic, making your Airflow atmosphere conscious of adjustments to datasets in exterior methods was a problem—there was no choice to mark a dataset as externally up to date. With the brand new dataset occasion endpoints characteristic, you possibly can programmatically provoke dataset-related occasions. The REST API has endpoints to create, record, and delete dataset occasions.

This functionality allows exterior methods and purposes to seamlessly combine and work together along with your Amazon MWAA atmosphere. It considerably improves your potential to broaden your information pipeline’s capability for dynamic information administration.

For instance, operating the next code from an exterior system means that you can invoke a dataset occasion within the goal Amazon MWAA atmosphere. This occasion might then be dealt with by downstream processes or workflows, enabling higher connectivity and responsiveness in data-driven workflows that depend on well timed information updates and interactions.

curl -X POST <https://{web_server_host_name}>/api/v1/datasets/occasions 
   -H 'Content material-Sort: software/json' 
   -d '{"dataset_uri": "s3://bucket_name/bucket_key", "additional": { }}'

The next diagram illustrates how the completely different parts within the state of affairs work together with one another.

To get extra particulars on easy methods to use the Airflow REST API in Amazon MWAA, consult with Introducing Amazon MWAA help for the Airflow REST API and internet server auto scaling. To study extra concerning the dataset occasion REST API endpoints, consult with Dataset UI Enhancements within the Airflow documentation.

Airflow 2.9.2 additionally contains options to ease the operation and monitoring of your environments. Let’s discover a few of these new capabilities.

Dag auto-pausing

Prospects are utilizing Amazon MWAA to construct complicated information pipelines with a number of interconnected duties and dependencies. When considered one of these pipelines encounters a difficulty or failure, it may end up in a cascade of pointless and redundant job runs, resulting in wasted assets. This downside is especially prevalent in situations the place pipelines run at frequent intervals, corresponding to hourly or every day. A typical state of affairs is a important pipeline that begins failing through the night, and as a result of failure, it continues to run and fails repeatedly till somebody manually intervenes the following morning. This may end up in dozens of pointless duties, consuming precious compute assets and probably inflicting information corruption or inconsistencies.

The DAG auto-pausing characteristic goals to deal with this problem by introducing two new configuration parameters:

  • max_consecutive_failed_dag_runs_per_dag – This can be a international Airflow configuration setting. It means that you can specify the utmost variety of consecutive failed DAG runs earlier than the DAG is robotically paused.
  • max_consecutive_failed_dag_runs – This can be a DAG-level argument. It overrides the earlier international configuration, permitting you to set a customized threshold for every DAG.

Within the following code instance, we outline a DAG with a single PythonOperator. The failing_task is designed to fail by elevating a ValueError. The important thing configuration for DAG auto-pausing is the max_consecutive_failed_dag_runs parameter set within the DAG object. By setting max_consecutive_failed_dag_runs=3, we’re instructing Airflow to robotically pause the DAG after it fails three consecutive occasions.

from airflow.decorators import dag, job
from datetime import datetime, timedelta

@job
def failing_task():
    elevate ValueError("This job is designed to fail")

@dag(
    dag_id="auto_pause",
    start_date=datetime(2023, 1, 1),
    schedule_interval=timedelta(minutes=1),  # Run each minute
    catchup=False,
    max_consecutive_failed_dag_runs=3,  # Set the utmost variety of consecutive failed DAG runs
)
def example_dag_with_auto_pause():
    failing_task_instance = failing_task()

example_dag_with_auto_pause()

With this parameter, now you can configure your Airflow DAGs to robotically pause after a specified variety of consecutive failures.

To study extra, consult with DAG Auto-pausing within the Airflow documentation.

CLI help for bulk pause and resume of DAGs

Because the variety of DAGs in your atmosphere grows, managing them turns into more and more difficult. Whether or not for upgrading or migrating environments, or different operational actions, you could must pause or resume a number of DAGs. This course of can turn into a frightening cyclical endeavor as a result of you must navigate via the Airflow UI, manually pausing or resuming DAGs one after the other. These handbook actions are time consuming and enhance the chance of human error that may end up in missteps and result in information inconsistencies or pipeline disruptions. The earlier CLI instructions for pausing and resuming DAGs might solely deal with one DAG at a time, making it inefficient.

Airflow 2.9.2 improves these CLI instructions by including the potential to deal with DAG IDs as common expressions, permitting you to pause or resume a number of DAGs with a single command. This new characteristic eliminates the necessity for repetitive handbook intervention or particular person DAG operations, considerably decreasing the chance of human error, offering reliability and consistency in your information pipelines.

For instance, to pause all DAGs producing every day liquidity reporting utilizing Amazon Redshift as an information supply, you need to use the next CLI command with an everyday expression:

airflow dags pause —treat-dag-id-as-regex -y "^(redshift|daily_liquidity_reporting)"

Customized names for Dynamic Activity Mapping

Dynamic Activity Mapping was added in Airflow 2.3. This highly effective characteristic permits workflows to create duties dynamically at runtime based mostly on information. As a substitute of counting on the DAG writer to foretell the variety of duties wanted prematurely, the scheduler can generate the suitable variety of copies of a job based mostly on the output of a earlier job. In fact, with nice powers comes nice duties. By default, dynamically mapped duties have been assigned numeric indexes as names. In complicated workflows involving excessive numbers of mapped duties, it turns into more and more difficult to pinpoint the particular duties that require consideration, resulting in potential delays and inefficiencies in managing and sustaining your information workflows.

Airflow 2.9 introduces the map_index_template parameter, a extremely requested characteristic that addresses the problem of job identification in Dynamic Activity Mapping. With this functionality, now you can present customized names to your dynamically mapped duties, enhancing visibility and manageability throughout the Airflow UI.

See the next instance:

from airflow.decorators import dag
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

def process_data(information):
    # Carry out information processing logic right here
    print(f"Processing information: {information}")

@dag(
    dag_id="custom_task_mapping_example",
    start_date=datetime(2023, 1, 1),
    schedule_interval=None,
    catchup=False,
)
def custom_task_mapping_example():
    mapped_processes = PythonOperator.partial(
        task_id="process_data_source",
        python_callable=process_data,
        map_index_template="Processing supply={{ job.op_args[0] }}",
    ).broaden(op_args=[["source_a"], ["source_b"], ["source_c"]])

custom_task_mapping_example()

The important thing side within the code is the map_index_template parameter specified within the PythonOperator.partial name. This Jinja template instructs Airflow to make use of the values of the ops_args atmosphere variable because the map index for every dynamically mapped job occasion. Within the Airflow UI, you will notice three job situations with the indexes source_a, source_b, and source_c, making it easy to establish and monitor the duties related to every information supply. In case of failures, this functionality improves monitoring and troubleshooting.

The map_index_template characteristic goes past easy template rendering, providing dynamic injection capabilities into the rendering context. This performance unlocks higher ranges of flexibility and customization when naming dynamically mapped duties.

Consult with Named mapping within the Airflow documentation to study extra about named mapping.

TaskFlow decorator for Bash instructions

Writing complicated Bash instructions and scripts utilizing the normal Airflow BashOperator could convey challenges in areas corresponding to code consistency, job dependencies definition, and dynamic command technology. The brand new @job.bash decorator addresses these challenges, permitting you to outline Bash statements utilizing Python features, making the code extra readable and maintainable. It seamlessly integrates with Airflow’s TaskFlow API, enabling you to outline dependencies between duties and create complicated workflows. You can too use Airflow’s scheduling and monitoring capabilities whereas sustaining a constant coding type.

The next pattern code showcases how the @job.bash decorator simplifies the combination of Bash instructions into DAGs, whereas utilizing the complete capabilities of Python for dynamic command technology and information processing:

from airflow.decorators import dag, job
from datetime import datetime, timedelta

default_args = {
    'proprietor': 'airflow',
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# Pattern buyer information
customer_data = """
id,identify,age,metropolis
1,John Doe,35,New York
2,Jane Smith,42,Los Angeles
3,Michael Johnson,28,Chicago
4,Emily Williams,31,Houston
5,David Brown,47,Phoenix
"""

# Pattern order information
order_data = """
order_id,customer_id,product,amount,value
101,1,Product A,2,19.99
102,2,Product B,1,29.99
103,3,Product A,3,19.99
104,4,Product C,2,14.99
105,5,Product B,1,29.99
"""

@dag(
    dag_id='task-bash-customer_order_analysis',
    default_args=default_args,
    start_date=datetime(2023, 1, 1),
    schedule_interval=timedelta(days=1),
    catchup=False,
)
def customer_order_analysis_dag():
    @job.bash
    def clean_data():
        # Clear buyer information
        customer_cleaning_commands = """
            echo '{}' > cleaned_customers.csv
            cat cleaned_customers.csv | sed 's/,/;/g' > cleaned_customers.csv
            cat cleaned_customers.csv | awk 'NR > 1' > cleaned_customers.csv
        """.format(customer_data)

        # Clear order information
        order_cleaning_commands = """
            echo '{}' > cleaned_orders.csv
            cat cleaned_orders.csv | sed 's/,/;/g' > cleaned_orders.csv
            cat cleaned_orders.csv | awk 'NR > 1' > cleaned_orders.csv
        """.format(order_data)

        return customer_cleaning_commands + "n" + order_cleaning_commands

    @job.bash
    def transform_data(cleaned_customers, cleaned_orders):
        # Remodel buyer information
        customer_transform_commands = """
            cat {cleaned_customers} | awk -F';' '{{printf "%s,%s,%sn", $1, $2, $3}}' > transformed_customers.csv
        """.format(cleaned_customers=cleaned_customers)

        # Remodel order information
        order_transform_commands = """
            cat {cleaned_orders} | awk -F';' '{{printf "%s,%s,%s,%s,%sn", $1, $2, $3, $4, $5}}' > transformed_orders.csv
        """.format(cleaned_orders=cleaned_orders)

        return customer_transform_commands + "n" + order_transform_commands

    @job.bash
    def analyze_data(transformed_customers, transformed_orders):
        analysis_commands = """
            # Calculate complete income
            total_revenue=$(awk -F',' '{{sum += $5 * $4}} END {{printf "%.2f", sum}}' {transformed_orders})
            echo "Whole income: $total_revenue"

            # Discover clients with a number of orders
            customers_with_multiple_orders=$(
                awk -F',' '{{orders[$2]++}} END {{for (c in orders) if (orders[c] > 1) printf "%s,", c}}' {transformed_orders}
            )
            echo "Prospects with a number of orders: $customers_with_multiple_orders"

            # Discover hottest product
            popular_product=$(
                awk -F',' '{{merchandise[$3]++}} END {{max=0; for (p in merchandise) if (merchandise[p] > max) {{max=merchandise[p]; fashionable=p}}}} END {{print fashionable}}'
            {transformed_orders})
            echo "Hottest product: $popular_product"
        """.format(transformed_customers=transformed_customers, transformed_orders=transformed_orders)

        return analysis_commands

    cleaned_data = clean_data()
    transformed_data = transform_data(cleaned_data, cleaned_data)
    analysis_results = analyze_data(transformed_data, transformed_data)

customer_order_analysis_dag()

You possibly can study extra concerning the @job.bash decorator within the Airflow documentation.

Arrange a brand new Airflow 2.9.2 atmosphere in Amazon MWAA

You possibly can provoke the setup in your account and most popular AWS Area utilizing the AWS Administration Console, API, or AWS Command Line Interface (AWS CLI). When you’re adopting infrastructure as code (IaC), you possibly can automate the setup utilizing AWS CloudFormation, the AWS Cloud Improvement Package (AWS CDK), or Terraform scripts.

Upon profitable creation of an Airflow 2.9 atmosphere in Amazon MWAA, sure packages are robotically put in on the scheduler and employee nodes. For an entire record of put in packages and their variations, consult with Apache Airflow supplier packages put in on Amazon MWAA environments. You possibly can set up further packages utilizing a necessities file.

Improve from older variations of Airflow to model 2.9.2

You possibly can make the most of these newest capabilities by upgrading your older Airflow model 2.x-based environments to model 2.9 utilizing in-place model upgrades. To study extra about in-place model upgrades, consult with Upgrading the Apache Airflow model or Introducing in-place model upgrades with Amazon MWAA.

Conclusion

On this publish, we introduced the provision of Apache Airflow 2.9 environments in Amazon MWAA. We mentioned how among the newest options added within the launch allow you to design extra reactive, event-driven workflows, corresponding to DAG scheduling based mostly on the results of logical operations, and the provision of endpoints within the REST API to programmatically create dataset occasions. We additionally supplied some pattern code to indicate the implementation in Amazon MWAA.

For the whole record of adjustments, consult with Airflow’s launch notes. For extra particulars and code examples on Amazon MWAA, go to the Amazon MWAA Person Information and the Amazon MWAA examples GitHub repo.

Apache, Apache Airflow, and Airflow are both registered logos or logos of the Apache Software program Basis in america and/or different international locations.


In regards to the authors

Hernan Garcia is a Senior Options Architect at AWS, based mostly out of Amsterdam, working with enterprises within the Monetary Companies Business. He focuses on software modernization and helps clients within the adoption of serverless applied sciences.

Parnab Basak is a Options Architect and a Serverless Specialist at AWS. He focuses on creating new options which can be cloud native utilizing trendy software program growth practices like serverless, DevOps, and analytics. Parnab works carefully within the analytics and integration companies house serving to clients undertake AWS companies for his or her workflow orchestration wants.

[ad_2]

Leave a Reply

Your email address will not be published. Required fields are marked *