Skip to content
Home » Detect and deal with information skew on AWS Glue

Detect and deal with information skew on AWS Glue


AWS Glue is a completely managed, serverless information integration service offered by Amazon Net Companies (AWS) that makes use of Apache Spark as one in every of its backend processing engines (as of this writing, you should use Python Shell, Spark, or Ray).

Knowledge skew happens when the info being processed isn’t evenly distributed throughout the Spark cluster, inflicting some duties to take considerably longer to finish than others. This will result in inefficient useful resource utilization, longer processing instances, and finally, slower efficiency. Knowledge skew can come up from numerous components, together with uneven information distribution, skewed be part of keys, or uneven information processing patterns. Although the largest problem is commonly having nodes working out of disk throughout shuffling, which ends up in nodes falling like dominoes and job failures, it’s additionally vital to say that information skew is hidden. The stealthy nature of knowledge skew means it may possibly usually go undetected as a result of monitoring instruments may not flag an uneven distribution as a essential problem, and logs don’t at all times make it evident. Because of this, a developer could observe that their AWS Glue jobs are finishing with out obvious errors, but the system could possibly be working removed from its optimum effectivity. This hidden inefficiency not solely will increase operational prices resulting from longer runtimes however may result in unpredictable efficiency points which can be troublesome to diagnose with no deep dive into the info distribution and process run patterns.

For instance, in a dataset of buyer transactions, if one buyer has considerably extra transactions than the others, it may possibly trigger a skew within the information distribution.

Figuring out and dealing with information skew points is essential to having good efficiency on Apache Spark and subsequently on AWS Glue jobs that use Spark as a backend. On this publish, we present how one can establish information skew and focus on the totally different methods to mitigate information skew.

detect information skew

When an AWS Glue job has points with native disks (cut up disk points), doesn’t scale with the variety of staff, or has low CPU utilization (you’ll be able to allow Amazon CloudWatch metrics to your job to have the ability to see this), you’ll have a knowledge skew problem. You may detect information skew with information evaluation or by utilizing the Spark UI. On this part, we focus on the way to use the Spark UI.

The Spark UI gives a complete view of Spark functions, together with the variety of duties, levels, and their length. To make use of it it’s essential allow Spark UI occasion logs to your job runs. It’s enabled by default on Glue console and as soon as enabled, Spark occasion log recordsdata can be created through the job run and saved in your S3 bucket. Then, these logs are parsed, and you should use the AWS Glue serverless Spark UI to visualise them. You may consult with this blogpost for extra particulars. In these jobs the place the AWS Glue serverless Spark UI doesn’t work because it has a restrict of 512 MB of logs, you’ll be able to arrange the Spark UI utilizing an EC2 occasion.

You need to use the Spark UI to establish which duties are taking longer to finish than others, and if the info distribution amongst partitions is balanced or not (do not forget that in Spark, one partition is mapped to 1 process). If there’s information skew, you will notice that some partitions have considerably extra information than others. The next determine exhibits an instance of this. We will see that one process is taking much more time than the others, which may point out information skew.

One other factor that you should use is the abstract metrics for every stage. The next screenshot exhibits one other instance of knowledge skew.

These metrics characterize the task-related metrics beneath which a sure proportion of duties accomplished. For instance, the seventy fifth percentile process length signifies that 75% of duties accomplished in much less time than this worth. When the duties are evenly distributed, you will notice comparable numbers in all of the percentiles. When there’s information skew, you will notice very biased values in every percentile. Within the previous instance, it didn’t write many shuffle recordsdata (lower than 50 MiB) in Min, twenty fifth percentile, Median, and seventy fifth percentile. Nevertheless, in Max, it wrote 460 MiB, 10 instances the seventy fifth percentile. It means there was a minimum of one process (or as much as 25% of duties) that wrote a lot greater shuffle recordsdata than the remainder of the duties. You can even see that the length of the tax in Max is 46 seconds and the Median is 2 seconds. These are all indicators that your dataset could have information skew.

AWS Glue interactive classes

You need to use interactive classes to load your information from the AWS Glue Knowledge Catalog or simply use Spark strategies to load the recordsdata akin to Parquet or CSV that you just need to analyze. You need to use an analogous script to the next to detect information skew from the partition measurement perspective; the extra vital problem is said to information skew whereas shuffling, and this script doesn’t detect that type of skew:

from pyspark.sql.features import spark_partition_id, asc, desc
#input_dataframe being the dataframe the place you need to test for information skew
partition_sizes_df=input_dataframe
    .withColumn("partitionId", spark_partition_id())
    .groupBy("partitionId")
    .rely()
    .orderBy(asc("rely"))
    .withColumnRenamed("rely","partition_size")
#calculate common and standar deviation for the partition sizes
avg_size = partition_sizes_df.agg({"partition_size": "avg"}).acquire()[0][0]
std_dev_size = partition_sizes_df.agg({"partition_size": "stddev"}).acquire()[0][0]

""" 
 the code calculates absolutely the distinction between every worth within the "partition_size" column and the calculated common (avg_size).
 then, calculates twice the usual deviation (std_dev_size) and use 
 that as a boolean masks the place the situation checks if absolutely the distinction is larger than twice the usual deviation
 so as to mark a partition 'skewed'
"""
skewed_partitions_df = partition_sizes_df.filter(abs(partition_sizes_df["partition_size"] - avg_size) > 2 * std_dev_size)
if skewed_partitions_df.rely() > 0:
    skewed_partitions = [row["partition_id"] for row in skewed_partitions_df.acquire()]
    print(f"The next partitions have considerably totally different sizes: {skewed_partitions}")
else:
    print("No information skew detected.")

You may calculate the typical and normal deviation of partition sizes utilizing the agg() operate and establish partitions with considerably totally different sizes utilizing the filter() operate, and you’ll print their indexes if any skewed partitions are detected. In any other case, the output prints that no information skew is detected.

This code assumes that your information is structured, and you might want to switch it in case your information is of a unique kind.

deal with information skew

You need to use totally different methods in AWS Glue to deal with information skew; there is no such thing as a single common resolution. The very first thing to do is affirm that you just’re utilizing newest AWS Glue model, for instance AWS Glue 4.0 based mostly on Spark 3.3 has enabled by default some configs like Adaptative Question Execution (AQE) that may assist enhance efficiency when information skew is current.

The next are a few of the methods that you would be able to make use of to deal with information skew:

  • Filter and carry out – If you understand which keys are inflicting the skew, you’ll be able to filter them out, carry out your operations on the non-skewed information, after which deal with the skewed keys individually.
  • Implementing incremental aggregation – If you’re performing a big aggregation operation, you’ll be able to break it up into smaller levels as a result of in massive datasets, a single aggregation operation (like sum, common, or rely) could be resource-intensive. In these circumstances, you’ll be able to carry out intermediate actions. This might contain filtering, grouping, or further aggregations. This may help distribute the workload throughout the nodes and scale back the scale of intermediate information.
  • Utilizing a customized partitioner – In case your information has a particular construction or distribution, you’ll be able to create a customized partitioner that partitions your information based mostly on its traits. This may help be sure that information with comparable traits is in the identical partition and scale back the scale of the biggest partition.
  • Utilizing broadcast be part of – In case your dataset is small however exceeds the spark.sql.autoBroadcastJoinThreshold worth (default is 10 MB), you have got the choice to both present a touch to make use of broadcast be part of or regulate the brink worth to accommodate your dataset. This may be an efficient technique to optimize be part of operations and mitigate information skew points ensuing from shuffling massive quantities of knowledge throughout nodes.
  • Salting – This includes including a random prefix to the important thing of skewed information. By doing this, you distribute the info extra evenly throughout the partitions. After processing, you’ll be able to take away the prefix to get the unique key values.

These are only a few methods to deal with information skew in PySpark; one of the best method will rely on the traits of your information and the operations you might be performing.

The next is an instance of becoming a member of skewed information with the salting approach:

from pyspark.sql import SparkSession
from pyspark.sql.features import lit, ceil, rand, concat, col

# Outline the variety of salt values
num_salts = 3

# Operate to establish skewed keys
def identify_skewed_keys(df, key_column, threshold):
    key_counts = df.groupBy(key_column).rely()
    return key_counts.filter(key_counts['count'] > threshold).choose(key_column)

# Establish skewed keys
skewed_keys = identify_skewed_keys(skewed_data, "key", skew_threshold)

# Splitting the dataset
skewed_data_subset = skewed_data.be part of(skewed_keys, ["key"], "interior")
non_skewed_data_subset = skewed_data.be part of(skewed_keys, ["key"], "left_anti")

# Apply salting to skewed information
skewed_data_subset = skewed_data_subset.withColumn("salt", ceil((rand() * 10) % num_salts))
skewed_data_subset = skewed_data_subset.withColumn("salted_key", concat(col("key"), lit("_"), col("salt")))

# Replicate skewed rows in non-skewed dataset
def replicate_skewed_rows(df, keys, multiplier):
    replicated_df = df.be part of(keys, ["key"]).crossJoin(spark.vary(multiplier).withColumnRenamed("id", "salt"))
    replicated_df = replicated_df.withColumn("salted_key", concat(col("key"), lit("_"), col("salt")))
    return replicated_df.drop("salt")

replicated_non_skewed_data = replicate_skewed_rows(non_skewed_data, skewed_keys, num_salts)

# Carry out the JOIN operation on the salted keys for skewed information
result_skewed = skewed_data_subset.be part of(replicated_non_skewed_data, "salted_key")

# Carry out common be part of on non-skewed information
result_non_skewed = non_skewed_data_subset.be part of(non_skewed_data, "key")

# Mix outcomes
final_result = result_skewed.union(result_non_skewed)

On this code, we first outline a salt worth, which could be a random integer or every other worth. We then add a salt column to our DataFrame utilizing the withColumn() operate, the place we set the worth of the salt column to a random quantity utilizing the rand() operate with a set seed. The operate replicate_salt_rows is outlined to duplicate every row within the non-skewed dataset (non_skewed_data) num_salts instances. This ensures that every key within the non-skewed information has matching salted keys. Lastly, a be part of operation is carried out on the salted_key column between the skewed and non-skewed datasets. This be part of is extra balanced in comparison with a direct be part of on the unique key, as a result of salting and replication have mitigated the info skew.

The rand() operate used on this instance generates a random quantity between 0–1 for every row, so it’s vital to make use of a set seed to attain constant outcomes throughout totally different runs of the code. You may select any fastened integer worth for the seed.

The next figures illustrate the info distribution earlier than (left) and after (proper) salting. Closely skewed key2 recognized and salted into key2_0, key2_1, and key2_2, balancing the info distribution and stopping any single node from being overloaded. After processing, the outcomes could be aggregated again, in order that that the ultimate output is according to the unsalted key values.

Different methods to make use of on skewed information through the be part of operation

If you’re performing skewed joins, you should use salting or broadcasting methods, or divide your information into skewed and common components earlier than becoming a member of the common information and broadcasting the skewed information.

If you’re utilizing Spark 3, there are computerized optimizations for making an attempt to optimize Knowledge Skew points on joins. These could be tuned as a result of they’ve devoted configs on Apache Spark.

Conclusion

This publish offered particulars on the way to detect information skew in your information integration jobs utilizing AWS Glue and totally different methods for dealing with it. Having a superb information distribution is essential to reaching one of the best efficiency on distributed processing techniques like Apache Spark.

Though this publish targeted on AWS Glue, the identical ideas apply to jobs you might be working on Amazon EMR utilizing Apache Spark or Amazon Athena for Apache Spark.

As at all times, AWS welcomes your suggestions. Please depart your feedback and questions within the feedback part.


In regards to the Authors

Salim Tutuncu is a Sr. PSA Specialist on Knowledge & AI, based mostly from Amsterdam with a concentrate on the EMEA North and EMEA Central areas. With a wealthy background within the expertise sector that spans roles as a Knowledge Engineer, Knowledge Scientist, and Machine Studying Engineer, Salim has constructed a formidable experience in navigating the advanced panorama of knowledge and synthetic intelligence. His present function includes working intently with companions to develop long-term, worthwhile companies leveraging the AWS Platform, significantly in Knowledge and AI use circumstances.

Angel Conde Manjon is a Sr. PSA Specialist on Knowledge & AI, based mostly in Madrid, and focuses on EMEA South and Israel. He has beforehand labored on analysis associated to Knowledge Analytics and Synthetic Intelligence in various European analysis tasks. In his present function, Angel helps companions develop companies centered on Knowledge and AI.

Leave a Reply

Your email address will not be published. Required fields are marked *