AWS Glue mutual TLS authentication for Amazon MSK

[ad_1]

In at present’s panorama, information streams repeatedly from numerous sources reminiscent of social media interactions to Web of Issues (IoT) machine readings. This torrent of real-time info presents each a problem and a possibility for companies. To harness the facility of this information successfully, organizations want sturdy techniques for ingesting, processing, and analyzing streaming information at scale. Enter Apache Kafka: a distributed streaming platform that has revolutionized how firms deal with real-time information pipelines and construct responsive, event-driven functions. AWS Glue is used to course of and analyze giant volumes of real-time information and carry out advanced transformations on the streaming information from Apache Kafka.

Amazon Managed Streaming for Apache Kafka (Amazon MSK) is a completely managed Apache Kafka service. You’ll be able to activate a mixture of authentication modes on new or present MSK clusters. The supported authentication modes are AWS Identification and Entry Administration (IAM) entry management, mutual Transport Layer Safety (TLS), and Easy Authentication and Safety Layer/Salted Problem Response Mechanism (SASL/SCRAM). For extra details about utilizing IAM authentication, discuss with Securely course of near-real-time information from Amazon MSK Serverless utilizing an AWS Glue streaming ETL job with IAM authentication.

Mutual TLS authentication requires each the server and the shopper to current certificates to show their id. It’s supreme for hybrid functions that want a standard authentication mannequin. It’s additionally a generally used authentication mechanism for business-to-business functions and is utilized in requirements reminiscent of open banking, which allows safe open API integrations for monetary establishments. For Amazon MSK, AWS Personal Certificates Authority (AWS Personal CA) is used to concern the X.509 certificates and for authenticating purchasers.

This publish describes how one can arrange AWS Glue jobs to provide, devour, and course of messages on an MSK cluster utilizing mutual TLS authentication. AWS Glue will routinely infer the schema from the streaming information and retailer the metadata within the AWS Glue Information Catalog for evaluation utilizing analytics instruments reminiscent of Amazon Athena.

Instance use case

In our instance use case, a hospital facility frequently displays the physique temperatures for sufferers admitted within the emergency ward utilizing good thermometers. Every machine routinely information the sufferers’ temperature readings and posts the information to a central monitoring utility API. Every posted report is a JSON formatted message that accommodates the deviceId that uniquely identifies the thermometer, a patientId to establish the affected person, the affected person’s temperature studying, and the eventTime when the temperature was recorded.

Record schema

The central monitoring utility checks the hourly common temperature readings for every affected person and notifies the hospital’s healthcare employees when a affected person’s common temperature exceeds accepted thresholds (36.1–37.2°C). In our case, we use the Athena console to investigate the readings.

Overview of the answer

On this publish, we use an AWS Glue Python shell job to simulate incoming information from the hospital thermometers. This job produces messages which can be securely written to an MSK cluster utilizing mutual TLS authentication.

To course of the streaming information from the MSK cluster, we deploy an AWS Glue Streaming extract, remodel, and cargo (ETL) job. This job routinely infers the schema from the incoming information, shops the schema metadata within the Information Catalog, after which shops the processed information as environment friendly Parquet information in Amazon Easy Storage Service (Amazon S3). We use Athena to question the output desk within the Information Catalog and uncover insights.

The next diagram illustrates the structure of the answer.

Solution architecture

The answer workflow consists of the next steps:

  1. Create a non-public certificates authority (CA) utilizing AWS Certificates Supervisor (ACM).
  2. Arrange an MSK cluster with mutual TLS authentication.
  3. Create a Java keystore (JKS) file and generate a shopper certificates and personal key.
  4. Create a Kafka connection in AWS Glue.
  5. Create a Python shell job in AWS Glue to create a subject and push messages to Kafka.
  6. Create an AWS Glue Streaming job to devour and course of the messages.
  7. Analyze the processed information in Athena.

Conditions

It is best to have the next stipulations:

Cloud Formation stack set

This template creates two NAT gateways as proven within the following diagram. Nevertheless, it’s potential to route the site visitors to a single NAT gateway in a single Availability Zone for check and growth workloads. For redundancy in manufacturing workloads, it’s really useful that there’s one NAT gateway out there in every Availability Zone.

VPC setup

The stack additionally creates a safety group with a self-referencing rule to permit communication between AWS Glue parts.

Create a non-public CA utilizing ACM

Full the next steps to create a root CA. For extra particulars, discuss with Creating a non-public CA.

  1. On the AWS Personal CA console, select Create a non-public CA.
  2. For Mode choices, choose both Basic-purpose or Brief-lived certificates for decrease pricing.
  3. For CA sort choices, choose Root.
  4. Present certificates particulars by offering no less than one distinguished identify.

Create private CA

  1. Go away the remaining default choices and choose the acknowledge checkbox.
  2. Select Create CA.
  3. On the Actions menu, select Set up CA certificates and select Affirm and set up.

Install certificate

Arrange an MSK cluster with mutual TLS authentication

Earlier than organising the MSK cluster, be sure to have a VPC with no less than two non-public subnets in numerous Availability Zones and a NAT gateway with a path to the web. A CloudFormation template is offered within the stipulations part.

Full the next steps to arrange your cluster:

  1. On the Amazon MSK console, select Create cluster.
  2. For Creation methodology, Customized create.
  3. For Cluster sort, choose Provisioned.
  4. For Dealer measurement, you may select kafka.t3.small for the aim of this publish.
  5. For Variety of zones, select 2.
  6. Select Subsequent.
  7. Within the Networking part, choose the VPC, non-public subnets, and safety group you created within the stipulations part.
  8. Within the Safety settings part, beneath Entry management strategies, choose TLS shopper authentication by AWS Certificates Supervisor (ACM).
  9. For AWS Personal CAs, select the AWS non-public CA you created earlier.

The MSK cluster creation can take as much as half-hour to finish.

Create a JKS file and generate a shopper certificates and personal key

Utilizing the foundation CA, you generate shopper certificates to make use of for authentication. The next directions are for CloudShell, however will also be tailored for a shopper machine with Java and the AWS CLI put in.

  1. Open a brand new CloudShell session and run the next instructions to create the certs listing and set up Java:
mkdir certs
cd certs
sudo yum -y set up java-11-amazon-corretto-headless

  1. Run the next command to create a keystore file with a non-public key in JKS format. Exchange Distinguished-TitleInstance-AliasYour-Retailer-Go, and Your-Key-Go with strings of your selection:

keytool -genkey -keystore kafka.shopper.keystore.jks -validity 300 -storepass Your-Retailer-Go -keypass Your-Key-Go -dname "CN=Distinguished-Title" -alias Instance-Alias -storetype pkcs12

  1. Generate a certificates signing request (CSR) with the non-public key created within the previous step:

keytool -keystore kafka.shopper.keystore.jks -certreq -file csr.pem -alias Instance-Alias -storepass Your-Retailer-Go -keypass Your-Key-Go

  1. Run the next command to take away the phrase NEW (and the only area that follows it) from the start and finish of the file:

sed -i -E '1,$ s/NEW //' csr.pem

The file ought to begin with -----BEGIN CERTIFICATE REQUEST----- and finish with -----END CERTIFICATE REQUEST-----

  1. Utilizing the CSR file, create a shopper certificates utilizing the next command. Exchange Personal-CA-ARN with the ARN of the non-public CA you created.

aws acm-pca issue-certificate --certificate-authority-arn Personal-CA-ARN --csr fileb://csr.pem --signing-algorithm "SHA256WITHRSA" --validity Worth=300,Kind="DAYS"

The command ought to print out the ARN of the issued certificates. Save the CertificateArn worth to be used within the subsequent step.

{
"CertificateArn": "arn:aws:acm-pca:area:account:certificate-authority/CA_ID/certificates/certificate_ID"
}

  1. Use the Personal-CA-ARN along with the CertificateArn (arn:aws:acp-pca:<area>:...) generated within the previous step to retrieve the signed shopper certificates. It will create a client-cert.pem file.

aws acm-pca get-certificate --certificate-authority-arn Personal-CA-ARN --certificate-arn Certificates-ARN | jq -r '.Certificates + "n" + .CertificateChain' >> client-cert.pem

  1. Add the certificates into the Java keystore so you may current it once you speak to the MSK brokers:

keytool -keystore kafka.shopper.keystore.jks -import -file client-cert.pem -alias Instance-Alias -storepass Your-Retailer-Go -keypass Your-Key-Go -noprompt

  1. Extract the non-public key from the JKS file. Present the identical destkeypass and deststorepass and enter the keystore password when prompted.

keytool -importkeystore -srckeystore kafka.shopper.keystore.jks -destkeystore keystore.p12 -srcalias Instance-Alias -deststorepass Your-Retailer-Go -destkeypass Your-Key-Go -deststoretype PKCS12

  1. Convert the non-public key to PEM format. Enter the keystore password you offered within the earlier step when prompted.

openssl pkcs12 -in keystore.p12 -nodes -nocerts -out private-key.pem

  1. Take away the traces that start with Bag Attributes.. from the highest of the file:

sed -i -ne '/-BEGIN PRIVATE KEY-/,/-END PRIVATE KEY-/p' private-key.pem

  1. Add the client-cert.pem, shopper.keystore.jks, and private-key.pem information to Amazon S3. You’ll be able to both create a brand new S3 bucket or use an present bucket to retailer the next objects. Exchange <s3://aws-glue-assets-11111111222222-us-east-1/certs/> along with your S3 location.

aws s3 sync ~/certs s3://aws-glue-assets-11111111222222-us-east-1/certs/ --exclude '*' --include 'client-cert.pem' --include 'private-key.pem' --include 'kafka.shopper.keystore.jks'

Create a Kafka connection in AWS Glue

Full the next steps to create a Kafka connection:

  1. On the AWS Glue console, select Information connections within the navigation pane.
  2. Select Create connection.
  3. Choose Apache Kafka and select Subsequent.
  4. For Amazon Managed Streaming for Apache Kafka Cluster, select the MSK cluster you created earlier.

Create Glue Kafka connection

  1. Select TLS shopper authentication for Authentication methodology.
  2. Enter the S3 path to the keystore you created earlier and supply the keystore and shopper key passwords you used for the -storepass and -keypass

Add authentication method to connection

  1. Beneath Networking choices, select your VPC, a non-public subnet, and a safety group. The safety group ought to include a self-referencing rule.
  2. On the following web page, present a reputation for the connection (for instance, Kafka-connection) and select Create connection.

Create a Python shell job in AWS Glue to create a subject and push messages to Kafka

On this part, you create a Python shell job to create a brand new Kafka matter and push JSON messages to the subject. Full the next steps:

  1. On the AWS Glue console, select ETL jobs.
  2. Within the Script part, for Engine, select Python shell.
  3. Select Create script.

Create Python shell job

  1. Enter the next script within the editor:
import sys
from awsglue.utils import getResolvedOptions
from kafka.admin import KafkaAdminClient, NewTopic
from kafka import KafkaProducer
from kafka.errors import TopicAlreadyExistsError
from urllib.parse import urlparse

import json
import uuid
import datetime
import boto3
import time
import random

# Fetch job parameters
args = getResolvedOptions(sys.argv, ['connection-names', 'client-cert', 'private-key'])

# Obtain shopper certificates and personal key information from S3
TOPIC = 'example_topic'
client_cert = urlparse(args['client_cert'])
private_key = urlparse(args['private_key'])

s3 = boto3.shopper('s3')
s3.download_file(client_cert.netloc, client_cert.path.lstrip('/'),  client_cert.path.cut up('/')[-1])
s3.download_file(private_key.netloc, private_key.path.lstrip('/'),  private_key.path.cut up('/')[-1])

# Fetch bootstrap servers from connection
args = getResolvedOptions(sys.argv, ['connection-names'])
if ',' in args['connection_names']:
    elevate ValueError("Select just one connection identify within the job particulars tab!")
glue_client = boto3.shopper('glue')
response = glue_client.get_connection(Title=args['connection_names'], HidePassword=True)
bootstrapServers = response['Connection']['ConnectionProperties']['KAFKA_BOOTSTRAP_SERVERS']

# Create matter and push messages 
admin_client = KafkaAdminClient(bootstrap_servers= bootstrapServers, security_protocol="SSL", ssl_certfile= client_cert.path.cut up('/')[-1], ssl_keyfile= private_key.path.cut up('/')[-1])
strive:
    admin_client.create_topics(new_topics=[NewTopic(name=TOPIC, num_partitions=1, replication_factor=1)], validate_only=False)
besides TopicAlreadyExistsError:
    # Matter already exists
    move
admin_client.shut()

# Generate JSON messages for the brand new matter
producer = KafkaProducer(value_serializer=lambda m: json.dumps(m).encode('ascii'), bootstrap_servers=bootstrapServers, security_protocol="SSL", 
                         ssl_check_hostname=True, ssl_certfile= client_cert.path.cut up('/')[-1], ssl_keyfile= private_key.path.cut up('/')[-1])
                         
for i in vary(1200):
    _event = {
        "deviceId": str(uuid.uuid4()),
        "patientId": "PI" + str(random.randint(1,15)).rjust(5, '0'),
        "temperature": spherical(random.uniform(32.1, 40.9), 1),
        "eventTime": str(datetime.datetime.now())
    }
    producer.ship(TOPIC, _event)
    time.sleep(3)
    
producer.shut()

  1. On the Job particulars tab, present a reputation to your job, reminiscent of Kafka-msk-producer.
  2. Select an IAM function. In case you don’t have one, create one following the directions in Configuring IAM permissions for AWS Glue.
  3. Beneath Superior properties, for Connections, select the Kafka-connection connection you created.
  4. Beneath Job parameters, add the next parameters and values:
    1. Key: --additional-python-modules, worth: kafka-python.
    2. Key: --client-cert, worth: s3://aws-glue-assets-11111111222222-us-east-1/certs/client-cert.pem. Exchange along with your client-cert.pem Amazon S3 location from earlier.
    3. Key: --private-key, worth: s3://aws-glue-assets-11111111222222-us-east-1/certs/private-key.pem. Exchange along with your private-key.pem Amazon S3 location from earlier.
      AWS Glue Job parameters
  5. Save and run the job.

You’ll be able to affirm that the job run standing is Operating on the Runs tab.

At this level, we’ve efficiently created a Python shell job to simulate the thermometers sending temperature readings to the monitoring utility. The job will run for roughly 1 hour and push 1,200 information to Amazon MSK.

Alternatively, you may substitute the Python shell job with a Scala ETL job to behave as a producer to ship messages to the MSK cluster. On this case, use the JKS file for authentication utilizing ssl.keystore.sort=JKS. In case you’re utilizing PEM format keys, the present model of Kafka purchasers libraries (2.4.1) put in in AWS Glue model 4 don’t but assist authentication by certificates in PEM format (as of this writing).

Create an AWS Glue Streaming job to devour and course of the messages

Now you can create an AWS Glue ETL job to devour and course of the messages within the Kafka matter. AWS Glue will routinely infer the schema from the information. Full the next steps:

  1. On the AWS Glue console, select Visible ETL within the navigation pane.
  2. Select Visible ETL to creator a brand new job.
  3. For Sources, select Apache Kafka.
  4. For Connection identify, select the node and connection identify you created earlier.
  5. For Matter identify, enter the subject identify (example_topic) you created earlier.
  6. Go away the remainder of the choices as default.

Kafka data source

  1. Add a brand new goal node known as Amazon S3 to retailer the output Parquet information generated from the streaming information.
  2. Select Parquet as the info format and supply an S3 output location for the generated information.
  3. Choose the choice to permit AWS Glue to create a desk within the Information Catalog and supply the database and desk names.

S3 Output node

  1. On the job particulars tab, present the next choices:
    1. For the requested variety of employees, enter 2.
    2. For IAM Position, select an IAM function with permissions to learn and write to the S3 output location.
    3. For Job timeout, enter 60 (for the job to cease after 60 minutes).
    4. Beneath Superior properties, for Connections, select the connection you created.
  2. Save and run the job.

You’ll be able to affirm the S3 output location for brand new Parquet information created beneath the prefixes s3://<output-location>/ingest_year=XXXX/ingest_month=XX/ingest_day=XX/ingest_hour=XX/.

At this level, you might have created a streaming job to course of occasions from Amazon MSK and retailer the JSON formatted information as Parquet information in Amazon S3. AWS Glue streaming jobs are supposed to be working repeatedly to course of streaming information. We now have set the timeout to cease the job after 60 minutes. You can too cease the job manually after the information have been processed to Amazon S3.

Analyze the info in Athena

Going again to our instance use case, you may run the next question in Athena to observe and observe the hourly common temperature readings for sufferers that exceed the traditional thresholds (36.1–37.2°C):

SELECT
date_format(parse_datetime(eventTime, 'yyyy-MM-dd HH:mm:ss.SSSSSS'), '%h %p') hour,
patientId,
spherical(avg(temperature), 1) average_temperature,
rely(temperature) readings
FROM "default"."devices_data"
GROUP BY 1, 2
HAVING avg(temperature) > 37.2 or avg(temperature) < 36.1
ORDER BY 2, 1 DESC

Amazon Athena Console

Run the question a number of occasions and observe how the average_temperature and the variety of readings adjustments with new incoming information from the AWS Glue Streaming job. In our instance situation, healthcare employees can use this info to establish sufferers who’re experiencing constant excessive or low physique temperatures and provides the required consideration.

At this level, we’ve efficiently created and ingested streaming information to our MSK cluster utilizing mutual TLS authentication. We solely wanted the certificates generated by AWS Personal CA to authenticate our AWS Glue purchasers to the MSK cluster and course of the streaming information with an AWS Glue Streaming job. Lastly, we used Athena to visualise the info and noticed how the info adjustments in close to actual time.

Clear up

To scrub up the assets created on this publish, full the next steps:

  1. Delete the non-public CA you created.
  2. Delete the MSK cluster you created.
  3. Delete the AWS Glue connection you created.
  4. Cease the roles if they’re nonetheless working and delete the roles you created.
  5. In case you used the CloudFormation stack offered within the stipulations, delete the CloudFormation stack to delete the VPC and different networking parts.

Conclusion

This publish demonstrated how you should use AWS Glue to devour, course of, and retailer streaming information for Amazon MSK utilizing mutual TLS authentication. AWS Glue Streaming routinely infers the schema and creates a desk within the Information Catalog. You’ll be able to then question the desk utilizing different information evaluation instruments like Athena, Amazon Redshift, and Amazon QuickSight to offer insights into the streaming information.

Check out the answer for your self, and tell us your questions and suggestions within the feedback part.


Concerning the Authors

Edward Okemwa OndariEdward Okemwa is a Massive Information Cloud Help Engineer (ETL) at AWS Nairobi specializing in AWS Glue and Amazon Athena. He’s devoted to offering prospects with technical steerage and resolving points associated to processing and analyzing giant volumes of knowledge. In his free time, he enjoys singing choral music and enjoying soccer.

Edward Okemwa OndariEmmanuel Mashandudze is a Senior Massive Information Cloud Engineer specializing in AWS Glue. He collaborates with product groups to assist prospects effectively remodel information within the cloud. He helps prospects design and implements sturdy information pipelines. Exterior of labor, Emmanuel is an avid marathon runner, sports activities fanatic and enjoys creating reminiscences together with his household.

[ad_2]

Leave a Reply

Your email address will not be published. Required fields are marked *