Ship Amazon CloudWatch logs to Amazon OpenSearch Serverless

[ad_1]

Amazon CloudWatch Logs acquire, mixture, and analyze logs from totally different programs in a single place. CloudWatch gives subcriptions as a real-time feed of those logs to different providers like Amazon Kinesis Knowledge Streams, AWS Lambda, and Amazon OpenSearch Service. These subscriptions are a preferred mechanism to allow customized processing and superior evaluation of log knowledge to realize extra priceless insights. On the time of publishing this weblog submit, these subscription filters assist delivering logs to Amazon OpenSearch Service provisioned clusters solely. Clients are more and more adopting Amazon OpenSearch Serverless as an economical choice for rare, intermittent and unpredictable workloads.

On this weblog submit, we are going to present how you can use Amazon OpenSearch Ingestion to ship CloudWatch logs to OpenSearch Serverless in close to real-time. We define a mechanism to attach a Lambda subscription filter with OpenSearch Ingestion and ship logs to OpenSearch Serverless with out explicitly needing a separate subscription filter for it.

Answer overview

The next diagram illustrates the answer structure.

  1. CloudWatch Logs: Collects and shops logs from varied AWS sources and purposes. It serves because the supply of log knowledge on this answer.
  2. Subscription filter : A CloudWatch Logs subscription filter filters and routes particular log knowledge from CloudWatch Logs to the subsequent element within the pipeline.
  3. CloudWatch exporter Lambda operate: It is a Lambda operate that receives the filtered log knowledge from the subscription filter. Its function is to rework and put together the log knowledge for ingestion into the OpenSearch Ingestion pipeline.
  4. OpenSearch Ingestion: It is a element of OpenSearch Service. The Ingestion pipeline is answerable for processing and enriching the log knowledge acquired from the CloudWatch exporter Lambda operate earlier than storing it within the OpenSearch Serverless assortment.
  5. OpenSearch Service: That is totally managed service that shops and indexes log knowledge, making it searchable and obtainable for evaluation and visualization. OpenSearch Service gives two configurations: provisioned domains and serverless. On this setup, we use serverless, which is an auto-scaling configuration for OpenSearch Service.

Conditions

Deploy the answer

With the stipulations in place, you possibly can create and deploy the items of the answer.

Step 1: Create PipelineRole for ingestion

  • Open the AWS Administration Console for AWS Id and Entry Administration (IAM).
  • Select Insurance policies, after which select Create coverage.
  • Choose JSON and paste the next coverage into the editor:
{
    "Model": "2012-10-17",
    "Assertion": [
        {
            "Action": [
                "aoss:BatchGetCollection",
                "aoss:APIAccessAll"
            ],
            "Impact": "Enable",
            "Useful resource": "arn:aws:aoss:us-east-1:{accountId}:assortment/{collectionId}"
        },
        {
            "Motion": [
                "aoss:CreateSecurityPolicy",
                "aoss:GetSecurityPolicy",
                "aoss:UpdateSecurityPolicy"
            ],
            "Impact": "Enable",
            "Useful resource": "*",
            "Situation": {
                "StringEquals": {
                    "aoss:assortment": "{assortment}"
                }
            }
        }
    ]
}

// Change {accountId}, {collectionId}, and {assortment} with your personal values

  • Select Subsequent, select Subsequent, and identify your coverage collection-pipeline-policy.
  • Select Create coverage.
  • Subsequent, create a job and fasten the coverage to it. Select Roles, after which select Create function.
  • Choose Customized belief coverage and paste the next coverage into the editor:
{
   "Model":"2012-10-17",
   "Assertion":[
      {
         "Effect":"Allow",
         "Principal":{
            "Service":"osis-pipelines.amazonaws.com"
         },
         "Action":"sts:AssumeRole"
      }
   ]
}

  • Select Subsequent, after which seek for and choose the collection-pipeline-policy you simply created.
  • Select Subsequent and identify the function PipelineRole.
  • Select Create function.

Step 2: Configure the community and knowledge coverage for OpenSearch assortment

  • Within the OpenSearch Service console, navigate to the Serverless menu.
  • Create a VPC endpoint by following the instruction in Create an interface endpoint for OpenSearch Serverless.
  • Go to Safety and select Community insurance policies.
  • Select Create community coverage.
  • Configure the next coverage
[
  {
    "Rules": [
      {
        "Resource": [
          "collection/{collection name}"
        ],
        "ResourceType": "assortment"
      }
    ],
    "AllowFromPublic": false,
    "SourceVPCEs": [
      "{VPC Enddpoint Id}"
    ]
  },
  {
    "Guidelines": [
      {
        "Resource": [
          "collection/{collection name}"
        ],
        "ResourceType": "dashboard"
      }
    ],
    "AllowFromPublic": true
  }
]

  • Go to Safety and select Knowledge entry insurance policies.
  • Select Create entry coverage.
  • Configure the next coverage:
[
  {
    "Rules": [
      {
        "Resource": [
          "index/{collection name}/*"
        ],
        "Permission": [
          "aoss:CreateIndex",
          "aoss:UpdateIndex",
          "aoss:DescribeIndex",
          "aoss:ReadDocument",
          "aoss:WriteDocument"
        ],
        "ResourceType": "index"
      }
    ],
    "Principal": [
      "arn:aws:iam::{accountId}:role/PipelineRole",
      "arn:aws:iam::{accountId}:role/Admin"
    ],
    "Description": "Rule 1"
  }
]

Step 3: Create an OpenSearch Ingestion pipeline

  • Navigate to the OpenSearch Service.
  • Go to the Ingestion pipelines part.
  • Select Create pipeline.
  • Outline the pipeline configuration.
model: "2"
 cwlogs-ingestion-pipeline:

  supply:

    http:

      path: /logs/ingest

  sink:

    - opensearch:

        # Present an AWS OpenSearch Service area endpoint

        hosts: ["https://{collectionId}.{region}.aoss.amazonaws.com"]

        index: "cwl-%{yyyy-MM-dd}"

        aws:

          # Present a Position ARN with entry to the area. This function ought to have a belief relationship with osis-pipelines.amazonaws.com

          sts_role_arn: "arn:aws:iam::{accountId}:function/PipelineRole"

          # Present the area of the area.

          area: "{area}"

          serverless: true

          serverless_options:

            network_policy_name: "{Community coverage identify}"
 # To get the values for the placeholders: 
 # 1. {collectionId}: You will discover the gathering ID by navigating to the Amazon OpenSearch Serverless Assortment within the AWS Administration Console, after which clicking on the Assortment. The gathering ID is listed below the "Overview" part. 
 # 2. {area}: That is the AWS area the place your Amazon OpenSearch Service area is situated. You will discover this data within the AWS Administration Console once you navigate to the area. 
 # 3. {accountId}: That is your AWS account ID. You will discover your account ID by clicking in your username within the top-right nook of the AWS Administration Console and choosing "My Account" from the dropdown menu. 
 # 4. {Community coverage identify}: That is the identify of the community coverage you might have configured to your Amazon OpenSearch Serverless Assortment. If you have not configured a community coverage, you possibly can depart this placeholder as is or take away it from the configuration.
 # After acquiring the required values, change the placeholders within the configuration with the precise values.            

Step 4: Create a Lambda operate

  • Create a Lambda layer for requests and sigv4 packages. Run the next instructions in AWS Cloudshell.
mkdir lambda_layers
 cd lambda_layers
 mkdir python
 cd python
 pip set up requests -t ./
 pip set up requests_auth_aws_sigv4 -t ./
 cd ..
 zip -r python_modules.zip .


 aws lambda publish-layer-version --layer-name Knowledge-requests --description "My Python layer" --zip-file fileb://python_modules.zip --compatible-runtimes python3.x

import base64
 import gzip
 import json
 import logging
 import json
 import jmespath
 import requests
 from datetime import datetime
 from requests_auth_aws_sigv4 import AWSSigV4
 import boto3


 LOGGER = logging.getLogger(__name__)
 LOGGER.setLevel(logging.INFO)


 def lambda_handler(occasion, context):

    """Extract the information from the occasion"""

    knowledge = jmespath.search("awslogs.knowledge", occasion)

    """Decompress the logs"""

    cwLogs = decompress_json_data(knowledge)

    """Assemble the payload to ship to OpenSearch Ingestion"""

    payload = prepare_payload(cwLogs)

    print(payload)

    """Ingest the set of occasions to the pipeline"""    

    response = ingestData(payload)

    return {

        'statusCode': 200

    }
 def decompress_json_data(knowledge):

    compressed_data = base64.b64decode(knowledge)

    uncompressed_data = gzip.decompress(compressed_data)

    return json.masses(uncompressed_data)


 def prepare_payload(cwLogs):

    payload = []

    logEvents = cwLogs['logEvents']

    for logEvent in logEvents:

        request = {}

        request['id'] = logEvent['id']

        dt = datetime.fromtimestamp(logEvent['timestamp'] / 1000) 

        request['timestamp'] = dt.isoformat()

        request['message'] = logEvent['message'];

        request['owner'] = cwLogs['owner'];

        request['log_group'] = cwLogs['logGroup'];

        request['log_stream'] = cwLogs['logStream'];

        payload.append(request)

    return payload

 def ingestData(payload):

    ingestionEndpoint="{OpenSearch Pipeline Endpoint}"

    endpoint="https://" + ingestionEndpoint

    headers = {'Content material-Sort': 'utility/json', 'Settle for':'utility/json'}

    r = requests.request('POST', f'{endpoint}/logs/ingest', json=payload, auth=AWSSigV4('osis'), headers=headers)

    LOGGER.information('Response acquired: ' + r.textual content)

    return r

  • Change {OpenSearch Pipeline Endpoint}’ with the endpoint of your OpenSearch Ingestion pipeline.
  • Connect the next inline coverage in execution function.
{

    "Model": "2012-10-17",

    "Assertion": [

        {

            "Sid": "PermitsWriteAccessToPipeline",

            "Effect": "Allow",

            "Action": "osis:Ingest",

            "Resource": "arn:aws:osis:{region}:{accountId}:pipeline/{OpenSearch Pipeline Name}"

        }

    ]
 }

Step 5: Arrange a CloudWatch Logs subscription

  • Grant permission to a particular AWS service or AWS account to invoke the desired Lambda operate. The next command grants permission to the CloudWatch Logs service to invoke the cloud-logs Lambda operate for the desired log group. That is mandatory as a result of CloudWatch Logs can not immediately invoke a Lambda operate with out being granted permission. Run the next command in CloudShell so as to add permission.
aws lambda add-permission
 --function-name "{operate identify}"
 --statement-id "{operate identify}"
 --principal "logs.amazonaws.com"
 --action "lambda:InvokeFunction"
 --source-arn "arn:aws:logs:{area}:{accountId}:log-group:{log_group}:*"
 --source-account "{accountId}"

  • Create a subscription filter for a log group. The next command creates a subscription filter on the log group, which forwards all log occasions (as a result of the filter sample is an empty string) to the Lambda operate. Run the next command in Cloudshell to create the subscription filter.
aws logs put-subscription-filter
 --log-group-name {log_group}
 --filter-name {filter identify}
 --filter-pattern ""
 --destination-arn arn:aws:lambda:{area}:{accountId}:operate:{operate identify}

Step 6: Testing and verification

  • Generate some logs in your CloudWatch log group. Run the next command in Cloudshell to create pattern logs in log group.
aws logs put-log-events --log-group-name {log_group} --log-stream-name {stream_name} --log-events "[{"timestamp":{timestamp in millis} , "message": "Simple Lambda Test"}]"

  • Verify the OpenSearch assortment to make sure logs are listed appropriately.

Clear up

Take away the infrastructure for this answer when not in use to keep away from incurring pointless prices.

Conclusion

You noticed how you can arrange a pipeline to ship CloudWatch logs to an OpenSearch Serverless assortment inside a VPC. This integration makes use of CloudWatch for log aggregation, Lambda for log processing, and OpenSearch Serverless for querying and visualization. You need to use this answer to make the most of the pay-as-you-go pricing mannequin for OpenSearch Serverless to optimize operational prices for log evaluation.

To additional discover, you possibly can:


In regards to the Authors

Balaji Mohan is a senior modernization architect specializing in utility and knowledge modernization to the cloud. His business-first strategy ensures seamless transitions, aligning know-how with organizational targets. Utilizing cloud-native architectures, he delivers scalable, agile, and cost-effective options, driving innovation and progress.

Souvik Bose is a Software program Improvement Engineer engaged on Amazon OpenSearch Service.

Muthu Pitchaimani is a Search Specialist with Amazon OpenSearch Service. He builds large-scale search purposes and options. Muthu is within the subjects of networking and safety, and relies out of Austin, Texas.

[ad_2]

Leave a Reply

Your email address will not be published. Required fields are marked *