Optimize write throughput for Amazon Kinesis Information Streams


Amazon Kinesis Information Streams is utilized by many purchasers to seize, course of, and retailer knowledge streams at any scale. This stage of unparalleled scale is enabled by dividing every knowledge stream into a number of shards. Every shard in a stream has a 1 Mbps or 1,000 information per second write throughput restrict. Whether or not your knowledge streaming utility is amassing clickstream knowledge from an online utility or recording telemetry knowledge from billions of Web of Issues (IoT) gadgets, streaming purposes are extremely vulnerable to a various quantity of information ingestion. Typically such a big and sudden quantity of information might be the factor we least anticipate. For example, take into account utility logic with a retry mechanism when writing information to a Kinesis knowledge stream. In case of a community failure, it’s frequent to buffer knowledge regionally and write them when connectivity is restored. Relying on the speed that knowledge is buffered and the length of connectivity situation, the native buffer can accumulate sufficient knowledge that would saturate the accessible write throughput quota of a Kinesis knowledge stream.

When an utility makes an attempt to jot down extra knowledge than what’s allowed, it’s going to obtain write throughput exceeded errors. In some cases, not having the ability to handle these errors in a well timed method can lead to knowledge loss, sad clients, and different undesirable outcomes. On this publish, we discover the everyday causes behind write throughput exceeded errors, together with strategies to determine them. We then information you on swift responses to those occasions and supply a number of options for mitigation. Lastly, we delve into how on-demand capability mode could be beneficial in addressing these errors.

Why will we get write throughput exceeded errors?

Write throughput exceeded errors are usually brought on by three totally different eventualities:

  • The only is the case the place the producer utility is producing extra knowledge than the throughput accessible within the Kinesis knowledge stream (the sum of all shards).
  • Subsequent, now we have the case the place knowledge distribution is just not even throughout all shards, generally known as scorching shard situation.
  • Write all through errors will also be brought on by an utility selecting a partition key to jot down information at a charge exceeding the throughput supplied by a single shard. This case is considerably just like scorching shard situation, however as we see later on this publish, in contrast to a scorching shard situation, you possibly can’t remedy this downside by including extra shards to the information stream. This habits is usually generally known as a scorching key situation.

Earlier than we focus on how one can diagnose these points, let’s take a look at how Kinesis knowledge streams arrange knowledge and its relationship to jot down throughput exceeded errors.

A Kinesis knowledge stream has a number of shards to retailer knowledge. Every shard is assigned a key vary in 128-bit integer house. For those who view the main points of an information stream utilizing the describe-stream operation within the AWS Command Line Interface (AWS CLI), you possibly can truly see this key vary project:

$ aws kinesis describe-stream --stream-name my-data-stream
"StreamDescription": {
  "Shards": [
    {
      "ShardId": "shardId-000000000000",
      "HashKeyRange": {
        "StartingHashKey": "0",
        "EndingHashKey": 
        "85070591730234615865843651857942052863"
       }
    },
    {
       "ShardId": "shardId-000000000001",
       "HashKeyRange": {
       "StartingHashKey": 
          "85070591730234615865843651857942052864",
       "EndingHashKey": 
         "170141183460469231731687303715884105727"
       }
    }
  ]
}

When a producer utility invokes the PutRecord or PutRecords API, the service calculates a MD5 hash for the PartitionKey specified within the document. The ensuing hash is used to find out which shard to retailer that document. You possibly can take extra management over this course of by setting the ExplicitHashKey property within the PutRecord request to a hash key that falls inside a selected shard’s key vary. For example, setting ExplicitHashKey to 0 will assure that document is written to shard ID shardId-0 within the stream described within the previous code snippet.

How partition keys are distributed throughout accessible shards performs a significant position in maximizing the accessible throughput in a Kinesis knowledge stream. When the partition key getting used is repeated incessantly in a means that some keys are extra frequent than the others, shards storing these information shall be utilized extra. We additionally get the identical internet impact if we use ExplicitHashKey and our logic for selecting the hash key’s biased in the direction of a subset of shards.

Think about you’ve a fleet of net servers logging efficiency metrics for every net request served right into a Kinesis knowledge stream with two shards and also you used a request URL because the partition key. Every time a request is served, the applying makes a name to the PutRecord API carrying a 10-bytes document. Let’s say that you’ve a complete of 10 URLs and every receives 10 requests per second. Underneath these circumstances, complete throughput required for the workload is 1,000 bytes per second and 100 requests per second. If we assume excellent distribution of 10 URLs throughout the 2 shards, every shard will obtain 500 bytes per second and 50 requests per second.

Now think about considered one of these URLs went viral and it began receiving 1,000 requests per second. Though the state of affairs is optimistic from a enterprise perspective, you’re now getting ready to making customers sad. After the web page gained recognition, you’re now counting 1,040 requests per second for the shard storing the favored URL (1000 + 10 * 4). At this level, you’ll obtain write throughput exceeded errors from that shard. You’re throttled based mostly on the requests per second quota as a result of even with elevated requests, you’re nonetheless producing roughly 11 KB of information.

You possibly can remedy this downside both through the use of a UUID for every request because the partition key so that you simply share the whole load throughout each shards, or by including extra shards to the Kinesis knowledge stream. The strategy you select is dependent upon the way you wish to devour knowledge. Altering the partition key to a UUID could be problematic if you’d like efficiency metrics from a given URL to be at all times processed by the identical client occasion or if you wish to preserve the order of information on a per-URL foundation.

Figuring out the precise reason for write all through exceeded errors is a crucial step in remediating them. Within the subsequent sections, we focus on how one can determine the basis trigger and remediate this downside.

Figuring out the reason for write throughput exceeded errors

Step one in fixing an issue is that figuring out that it exists. You should use the WriteProvisionedThrougputExceeded metric in Amazon CloudWatch on this case. You possibly can correlate the spikes within the WriteProvisionedThrougputExceeded metric to the IncomingBytes and IncomingRecords metrics to determine whether or not an utility is getting throttled because of the dimension of information or the variety of information written.

Let’s take a look at a number of checks we carried out in a stream with two shards for instance numerous eventualities. On this occasion, with two shards in our stream, complete throughput accessible to our producer utility is both 2 Mbps or 2,000 information per second.

Within the first take a look at, we ran a producer to jot down batches of 30 information, every being 100 KB, utilizing the PutRecords API. As you possibly can see within the graph on the left of the next determine, our WriteProvisionedThroughputExceedded errors rely went up. The graph on the precise exhibits that we’re reaching the two Mbps restrict, however our incoming information charge is way decrease than the two,000 information per second restrict (Kinesis metrics are revealed at 1-minute intervals, therefore 125.8 and 120,000 as higher limits).Record size based throttling example

The next figures present how the identical three metrics modified once we modified the producer to jot down batches of 500 information, every being 50 bytes, within the second take a look at. This time, we exceeded the two,000 information per second throughput restrict, however our incoming bytes charge is effectively below the restrict.

Record count based throttling

Now that we all know that downside exists, we must always search for clues to see if we’re exceeding the general throughput accessible within the stream or if we’re having a scorching shard situation on account of an imbalanced partition key distribution as mentioned earlier. One method to that is to make use of enhanced shard-level metrics. Previous to our checks, we enabled enhanced shard-level metrics, and we are able to see within the following determine that each shards equally reached their quota in our first take a look at.

Enhanced shard level metrics

We now have seen Kinesis knowledge streams containing 1000’s of shards harnessing the facility of infinite scale in Kinesis knowledge streams. Nevertheless, plotting enhanced shard-level metrics on a such massive stream might not present a straightforward to solution to discover out which shards are over-utilized. In that occasion, it’s higher to make use of CloudWatch Metrics Insights to run queries to view top-n gadgets, as proven within the following code (regulate the LIMIT 5 clause accordingly):

-- Present prime 5 shards with highest incoming bytes
SELECT
SUM(IncomingBytes)
FROM "AWS/Kinesis"
GROUP BY ShardId, StreamName
ORDER BY MAX() DESC
LIMIT 5

-- Present prime 5 shards with highest incoming information
SELECT
SUM(IncomingRecords)
FROM "AWS/Kinesis"
GROUP BY ShardId, StreamName
ORDER BY MAX() DESC
LIMIT 5

Enhanced shard-level metrics aren’t enabled by default. For those who didn’t allow them and also you wish to carry out root trigger evaluation after an incident, this selection isn’t very useful. As well as, you possibly can solely question the most up-to-date 3 hours of information. Enhanced shard-level metrics additionally incur extra prices for CloudWatch metrics and it could be value prohibitive to have it at all times on in knowledge streams with a whole lot of shards.

One attention-grabbing state of affairs is when the workload is bursty, which might make the ensuing CloudWatch metrics graphs somewhat baffling. It is because Kinesis publishes CloudWatch metric knowledge aggregated at 1-minute intervals. Consequently, though you possibly can see write throughput exceeded errors, your incoming bytes/information graphs could also be nonetheless inside the limits. As an instance this state of affairs, we modified our take a look at to create a burst of writes exceeding the bounds after which sleep for a number of seconds. Then we repeated this cycle for a number of minutes to yield the graphs within the following determine, which present write throughput exceeded errors on the left, however the IncomingBytes and IncomingRecords graphs on the precise appear tremendous.

Effect of one data aggregated at 1-minute intervals

To reinforce the method of figuring out write throughput exceeded errors, we developed a CLI instrument referred to as Kinesis Sizzling Shard Advisor (KHS). With KHS, you possibly can view shard utilization when shard-level metrics aren’t enabled. That is significantly helpful for investigating a difficulty retrospectively. It might probably additionally present most incessantly written keys to a selected shard. KHS stories shard utilization by studying information and aggregating them per second intervals based mostly on the ApproximateArrivalTimestamp within the document. Due to this, you can even perceive shard utilization drivers throughout bursty write workloads.

By operating the next command, we are able to get KHS to examine the information that arrived in 1 minute throughout our first take a look at and generate a report:

khs -stream my-data-stream -from "2023-06-22 17:35:00" -to "2023-06-22 17:36:00"

For the given time window, the abstract part within the generated report exhibits the utmost bytes per second charge noticed, complete bytes ingested, most information per second noticed, and the whole variety of information ingested for every shard.

KHS report summary

Selecting a shard ID within the first column will show a graph of incoming bytes and information for that shard. That is just like the graph you get in CloudWatch metrics, besides the KHS graph stories on a per-second foundation. For example, within the following determine, we are able to see how the producer was going by way of a collection of bursty writes adopted by a throttling occasion throughout our take a look at case.

KHS shard level metrics display

Operating the identical command with the -aggregate-key possibility permits partition key distribution evaluation. It generates an extra graph for every shard exhibiting the important thing distribution, as proven within the following determine. For our take a look at state of affairs, we are able to solely see every key getting used one time as a result of we used a brand new UUID for every document.

KHS key distribution graph

As a result of KHS stories based mostly on knowledge saved in streams, it creates an enhanced fan-out client at startup to stop utilizing the learn throughput quota accessible for different shoppers. When the evaluation is full, it deletes that enhanced fan-out client.

Due its nature of studying knowledge streams, KHS can switch a whole lot of knowledge throughout evaluation. For example, assume you’ve a stream with 100 shards. If all of them are absolutely utilized throughout a minute window specified utilizing -from and -to arguments, the host operating KHS will obtain a minimum of 1 MB * 100 * 60 = 6000 MB = roughly 6 GB knowledge. To keep away from this type of extreme knowledge switch and pace up the evaluation course of, we advocate first utilizing the WriteProvisionedThroughoutExceeded CloudWatch metric to determine a time interval once you skilled throttling and use a small window (corresponding to 10 seconds) with KHS. You may also run KHS in an Amazon Elastic Compute Cloud (Amazon EC2) occasion in the identical AWS Area as your Kinesis knowledge stream to attenuate community latency throughout reads.

KHS is designed to run in a single machine to diagnose large-scale workloads. Utilizing a naive in-memory-based counting algorithm (corresponding to a hash map storing the partition key and rely) for partition key distribution evaluation might simply exhaust the accessible reminiscence within the host system. Subsequently, we use a probabilistic knowledge construction referred to as count-min-sketch to estimate the variety of occasions a key has been used. Because of this, the quantity you see within the report must be taken as an approximate worth somewhat than an absolute worth. In any case, with this report, we simply wish to discover out if there’s an imbalance within the keys written to a shard.

Now that we perceive what causes scorching shards and how one can determine them, let’s take a look at how one can take care of this in producer purposes and remediation steps.

Remediation steps

Having producers retry writes is a step in the direction of making our producers resilient to jot down throughput exceeded errors. Think about our earlier pattern utility logging efficiency metrics knowledge for every net request served by a fleet of net servers. When implementing this retry mechanism, it is best to do not forget that information that aren’t written to the Kinesis stream are going to be in host system’s reminiscence. The primary situation with that is, if the host crashes earlier than the information might be written, you’ll expertise knowledge loss. Situations corresponding to monitoring net request efficiency knowledge is likely to be extra forgiving for one of these knowledge loss than eventualities like monetary transactions. It is best to consider sturdiness ensures required in your utility and make use of methods to attain them.

The second situation is that information ready to be written to the Kinesis knowledge stream are going to devour the host system’s reminiscence. If you begin getting throttled and have some retry logic in place, it is best to discover that your reminiscence utilization goes up. A retry mechanism ought to have a solution to keep away from exhausting the host system’s reminiscence.

With the suitable retry logic in place, in the event you obtain write throughput exceeded errors, you should utilize the strategies we mentioned earlier to determine the trigger. After you determine the basis trigger, you possibly can select the suitable remediation step:

  • If the producer utility is exceeding the general stream’s throughput, you possibly can add extra shards to the stream to extend its write throughput capability. When including shards, the Kinesis knowledge stream makes the brand new shards accessible incrementally, minimizing the time that producers expertise write throughput exceeded errors. So as to add shards to a stream, you should utilize the Kinesis console, the update-shard-count operation within the AWS CLI, the UpdateShardCount API by way of the AWS SDK, or the ShardCount property within the AWS CloudFormation template used to create the stream.
  • If the producer utility is exceeding the throughput restrict of some shards (scorching shard situation), decide one of many following choices based mostly on client necessities:
    • If locality of information is required (information with the identical partition key are at all times processed by the identical client) or an order based mostly on partition key’s required, use the split-shard operation within the AWS CLI or the SplitShard API within the AWS SDK to separate these shards.
    • If locality or order based mostly on the present partition key is just not required, change the partition key scheme to extend its distribution.
  • If the producer utility is exceeding the throughput restrict of a shard on account of a single partition key (scorching key situation), change the partition key scheme to extend its distribution.

Kinesis Information Streams additionally has an on-demand capability mode. In on-demand capability mode, Kinesis Information Streams routinely scales streams when wanted. Moreover, you possibly can swap between on-demand and provisioned capability modes with out inflicting an outage. This might be significantly helpful once you’re experiencing write throughput exceeded errors however require speedy response to maintain your utility accessible to your customers. In such cases, you possibly can swap a provisioned capability mode knowledge stream to an on-demand knowledge stream and let Kinesis Information Streams deal with the required scale appropriately. You possibly can then carry out root trigger evaluation within the background and take corrective actions. Lastly, if mandatory, you possibly can change the capability mode again to provisioned.

Conclusion

It is best to now have a strong understanding of the frequent causes of write throughput exceeded errors in Kinesis knowledge streams, how one can diagnose them, and what actions to take to appropriately take care of them. We hope that this publish will assist you make your Kinesis Information Streams purposes extra sturdy. In case you are simply beginning with Kinesis Information Streams, we advocate referring to the Developer Information.

In case you have any questions or suggestions, please go away them within the feedback part.


In regards to the Authors

Buddhike de Silva is a Senior Specialist Options Architect at Amazon Net Providers. Buddhike helps clients run massive scale streaming analytics workloads on AWS and make one of the best out of their cloud journey.

Nihar Sheth is a Senior Product Supervisor at Amazon Net Providers. He’s obsessed with growing intuitive product experiences that remedy complicated buyer issues and allow clients to attain their enterprise objectives.

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *