Skip to main content

AWS Glue

You can trigger an AWS Glue job based on an event. You can use the following events to trigger a Glue job CloudWatch Events: You can create a CloudWatch Events rule that triggers a Glue job when a specific event occurs. For example, you could create a rule that triggers a Glue job when a new file is created in an S3 bucket. Step Functions: You can use a Step Functions state machine to trigger a Glue job. For example, you could create a state machine that triggers a Glue job when a specific event occurs in another AWS service. Manually: You can manually trigger a Glue job by clicking the Run button in the AWS Glue console. To trigger a Glue job based on an event, you will need to create a trigger in the AWS Glue console. When you create a trigger, you will need to specify the event that you want to trigger the job on, as well as the job that you want to run. Here are the steps on how to trigger a Glue job based on an event: Go to the AWS Glue console. Click Triggers . Click Cr...

AWS lambda

 

AWS lambda


AWS Lambda is a serverless computing service that allows you to run code without provisioning or managing servers. Lambda automatically scales your code based on demand, so you only pay for the compute time you use.

Here are  examples of how you can use AWS Lambda:

  1. Process event data. Lambda can be used to process event data from a variety of sources, such as Amazon CloudWatch logs, Amazon Kinesis streams, and Amazon Simple Notification Service (SNS) topics.
  2. Generate dynamic content. Lambda can be used to generate dynamic content, such as personalized web pages or email messages.
  3. Automate tasks. Lambda can be used to automate tasks, such as sending out marketing emails or updating customer records.
  4. Build microservices. Lambda can be used to build microservices, which are small, independent services that can be easily scaled and deployed.
  5. Create custom APIs. Lambda can be used to create custom APIs that can be used by other applications.
  6. Run batch jobs. Lambda can be used to run batch jobs, such as processing large datasets or generating reports.
  7. Build serverless applications. Lambda can be used to build serverless applications, which are applications that do not require any servers to run.
  8. Secure your applications. Lambda can be used to secure your applications by providing a variety of features, such as VPC integration and API Gateway authentication.
  9. Cost-effectively scale your applications. Lambda can be used to cost-effectively scale your applications by only paying for the compute time you use.

AWS Lambda is a powerful tool that can be used to build a variety of applications. If you are looking for a serverless computing service that is scalable, cost-effective, and secure, then AWS Lambda is a great option.

Context in Lambda

In AWS Lambda, the context object is a special object that is passed to the handler function when the function is invoked. The context object contains information about the invocation, function configuration, and execution environment.

The context object has the following properties:

  • functionName: The name of the Lambda function.
  • functionVersion: The version of the Lambda function.
  • invokedFunctionArn: The Amazon Resource Name (ARN) of the Lambda function.
  • memoryLimitInMB: The amount of memory that is allocated to the Lambda function.
  • awsRequestId: The identifier of the invocation request.
  • logGroupName: The name of the log group for the Lambda function.
  • logStreamName: The name of the log stream for the Lambda function.
  • identity: (mobile apps) Information about the Amazon Cognito identity that authorized the request.

The context object also has the following methods:

  • getRemainingTimeInMillis: Returns the number of milliseconds left before the execution times out.
  • getInvokedFunctionUrl: Returns the URL of the Lambda function.
  • getClientContext: Returns the client context for the Lambda function.

The context object can be used to:

  • Get information about the invocation, function configuration, and execution environment.
  • Check the amount of memory that is allocated to the Lambda function.
  • Get the identifier of the invocation request.
  • Get the name of the log group for the Lambda function.
  • Get the name of the log stream for the Lambda function.
  • Get the URL of the Lambda function.
  • Get the client context for the Lambda function.

The context object is a powerful tool that can be used to get information about the Lambda function and the execution environment. It can be used to troubleshoot problems, track performance, and customize the behavior of the Lambda function.

Here are some additional details about AWS Lambda:

  • Pricing: Lambda is priced based on the amount of compute time you use. You are charged per 100 milliseconds of compute time, with a minimum charge of 100 milliseconds.
  • Deployment: Lambda functions are deployed using the AWS Lambda console, the AWS Lambda API, or the AWS CLI.
  • Scaling: Lambda automatically scales your functions based on demand. This means that you only pay for the compute time you use.
  • Security: Lambda provides a variety of security features, including VPC integration and API Gateway authentication.
Python
def lambda_handler(event, context):
  # Get the message from the event
  message = event['message']

  # Print the message
  print(message)

  # Return the message
  return message

 This function takes two arguments: event and context. The event argument is a JSON object that contains information about the event that triggered the function. The context argument is a JSON object that contains information about the Lambda execution environment.

The function first gets the message from the event object. Then, it prints the message to the console. Finally, it returns the message.

To deploy this function, you can use the AWS Lambda console, the AWS Lambda API, or the AWS CLI.

Here are some additional details about the Lambda function:

  • Runtime: The runtime for this function is Python 3.7.
  • Handler: The handler for this function is lambda_handler.
  • Environment variables: This function does not use any environment variables.
  • Layers: This function does not use any layers.

 

AWS Lambda code to process data from CloudWatch logs

This code will process the CloudWatch log event and return a success response. The code will first get the CloudWatch log event from the event parameter. The event parameter contains a list of records, and each record contains the data for a single CloudWatch log event. The code will then process the CloudWatch log event by logging the event and doing something with the event. For example, you could store the event in a database or send it to another service. The code will then return a success response.

To run this code, you can create a Lambda function and upload the code to the function. You can then configure the function to trigger on CloudWatch log events. When a CloudWatch log event is triggered, the function will be invoked and the code will be executed.

import json
import logging

def handler(event, context):

    # Get the CloudWatch log event.
    log_event = json.loads(event["Records"][0]["kinesis"]["data"])

    # Process the CloudWatch log event.
    logging.info("Log event: %s", log_event)

    # Do something with the CloudWatch log event.
    # For example, you could store the event in a database or send it to another service.

    return {
        "statusCode": 200,
        "body": "Processed CloudWatch log event"
    }


AWS Lambda code to process Kinesis

import json
import logging
import boto3
def handler(event, context):
    # Get the Kinesis record.
    kinesis_record = json.loads(event["Records"][0]["kinesis"]["data"])
    # Process the Kinesis record.
    logging.info("Kinesis record: %s", kinesis_record)
    # Do something with the Kinesis record.
    # For example, you could store the record in a database or send it to another service.
    return {
        "statusCode": 200,
        "body": "Processed Kinesis record"
    }




This code will process the Kinesis record and return a success response. The code will first get the Kinesis record from the event parameter. The event parameter contains a list of records, and each record contains the data for a single Kinesis record. The code will then process the Kinesis record by logging the record and doing something with the record. For example, you could store the record in a database or send it to another service. The code will then return a success response.


Here is a detailed explanation of the code:
* The `import json` statement imports the `json` module. This module is used to parse the Kinesis record, which is a JSON object.
* The `import logging` statement imports the `logging` module. This module is used to log messages.
* The `import boto3` statement imports the `boto3` module. This module is used to interact with AWS services.
* The `def handler(event, context)` statement defines the handler function. This function is called when the Lambda function is invoked.
* The `kinesis_record = json.loads(event["Records"][0]["kinesis"]["data"])` statement gets the Kinesis record from the event parameter. The event parameter contains a list of records, and each record contains the data for a single Kinesis record. The `json.loads()` function parses the Kinesis record as a JSON object.
* The `logging.info("Kinesis record: %s", kinesis_record)` statement logs the Kinesis record. The `logging.info()` function logs a message at the INFO level.
* The `return {"statusCode": 200, "body": "Processed Kinesis record"}` statement returns a success response. The `statusCode` property specifies the status code of the response, and the `body` property specifies the body of the response.
To run this code, you can create a Lambda function and upload the code to the function. You can then configure the function to trigger on Kinesis events. When a Kinesis event is triggered, the function will be invoked and the code will be executed.

Store multiple messages received from SNS to S3 

import json
import logging
import boto3

def handler(event, context):

    # Get the SNS messages.
    sns_messages = []
    for record in event["Records"]:
        sns_message = json.loads(record["sns"]["message"])
        sns_messages.append(sns_message)

    # Create the S3 file.
    s3 = boto3.resource("s3")
    bucket = s3.Bucket("my-bucket")
    file_name = "messages.txt"
    file_object = bucket.Object(file_name)

    # Write the SNS messages to the S3 file.
    for sns_message in sns_messages:
        file_object.put(Body=sns_message)

    return {
        "statusCode": 200,
        "body": "Messages stored in S3 file"
    }

 
This code will get the SNS messages from the event parameter and store the messages in an S3 file. The S3 file must be in the my-bucket bucket.

    Configure the Lambda function to trigger on SNS events.

    Test the Lambda function by sending multiple SNS messages to the topic that is configured to trigger the Lambda function.

Here is a detailed explanation of the code:

    The import json statement imports the json module. This module is used to parse the SNS messages, which are JSON objects.
    The import logging statement imports the logging module. This module is used to log messages.
    The import boto3 statement imports the boto3 module. This module is used to interact with AWS services.
    The def handler(event, context) statement defines the handler function. This function is called when the Lambda function is invoked.
    The sns_messages = [] statement creates an empty list to store the SNS messages.
    The for record in event["Records"]: statement iterates through the list of SNS messages in the event parameter.
    The sns_message = json.loads(record["sns"]["message"]) statement gets the SNS message from the record.
    The sns_messages.append(sns_message) statement appends the SNS message to the list of SNS messages.
      The return {"statusCode": 200, "body": "Messages stored in S3 file"} statement returns a success response. The statusCode property specifies the status code of the response, and the body property specifies the body of the response.
 

AWS sample code to receive multiple messages from Kinesis and store them in S3

import json
import logging
import boto3

def handler(event, context):

    # Get the Kinesis messages.
    kinesis_messages = []
    for record in event["Records"]:
        kinesis_message = json.loads(record["kinesis"]["data"])
        kinesis_messages.append(kinesis_message)

    # Create the S3 file.
    s3 = boto3.resource("s3")
    bucket = s3.Bucket("my-bucket")
    file_name = "messages.txt"
    file_object = bucket.Object(file_name)

    # Write the Kinesis messages to the S3 file.
    for kinesis_message in kinesis_messages:
        file_object.put(Body=kinesis_message)

    return {
        "statusCode": 200,
        "body": "Messages stored in S3 file"
    }

 

Generate dynamic content, such as personalized web pages:

 
import json
import logging
import boto3

def handler(event, context):

    # Get the user's name.
    user_name = event["queryStringParameters"]["username"]

    # Get the user's profile information.
    dynamodb = boto3.resource("dynamodb")
    table = dynamodb.Table("users")
    user_profile = table.get_item(Key={"username": user_name})

    # Generate the personalized web page.
    web_page = """
<!DOCTYPE html>
<html>
<head>
<title>Personalized Web Page</title>
</head>
<body>
<h1>Welcome, {}!</h1>
<p>Your favorite color is: {}</p>
</body>
</html>
""".format(user_name, user_profile["favorite_color"])

    return {
        "statusCode": 200,
        "body": web_page
    }


 
 AWS sample Lambda code to create and send personalized email, using data from S3
 
import json
import logging
import boto3

def handler(event, context):

    # Get the user's name.
    user_name = event["queryStringParameters"]["username"]

    # Get the user's email address.
    s3 = boto3.resource("s3")
    bucket = s3.Bucket("my-bucket")
    file_name = "user_data.json"
    user_data = bucket.Object(file_name).get().content
    user_email = json.loads(user_data)["email"]

    # Create the email message.
    email_subject = "Welcome, {}!".format(user_name)
    email_body = """
Hi {},

Welcome to our service! We're excited to have you on board.

Thanks,
The Team
""".format(user_name)

    # Send the email message.
    ses = boto3.client("ses")
    ses.send_email(
        Source="noreply@example.com",
        Destination={"ToAddresses": [user_email]},
        Message={"Subject": email_subject, "Body": {"Text": {"Data": email_body}}}
    )

    return {
        "statusCode": 200,
        "body": "Email sent"
    }


 
 
 
AWS Lambda code to updating customer records in S3:

import json
import logging
import boto3

def handler(event, context):

    username = event["queryStringParameters"]["username"]
    new_email = event["queryStringParameters"]["new_email"]

    s3 = boto3.resource("s3")
    bucket = s3.Bucket("my-bucket")
    file_name = "customer_records.json"

    # Get the customer record.
    customer_record = json.loads(bucket.Object(file_name).get().content)

    # Update the customer record.
    customer_record["email"] = new_email

    # Put the updated customer record back to S3.
    bucket.Object(file_name).put(Body=json.dumps(customer_record))

    return {
        "statusCode": 200,
        "body": "Customer record updated"
    }

This code will update the customer record for the specified username with the new email address. The code will first get the customer record from S3. S3 is a cloud storage service that can be used to store customer records. The code will then update the customer record with the new email address. The code will then put the updated customer record back to S3.

Lambda to build microservices

import json
import logging
import boto3

def handler(event, context):

    # Get the request data.
    request_data = json.loads(event["body"])

    # Process the request.
    if request_data["action"] == "create_user":
        create_user(request_data)
    elif request_data["action"] == "update_user":
        update_user(request_data)
    elif request_data["action"] == "delete_user":
        delete_user(request_data)

    return {
        "statusCode": 200,
        "body": "Success"
    }

def create_user(request_data):

    # Create the user record.
    user_record = {
        "username": request_data["username"],
        "email": request_data["email"],
        "password": request_data["password"]
    }

    # Save the user record.
    s3 = boto3.resource("s3")
    bucket = s3.Bucket("my-bucket")
    file_name = "users.json"
    bucket.Object(file_name).put(Body=json.dumps(user_record))

def update_user(request_data):

    # Update the user record.
    s3 = boto3.resource("s3")
    bucket = s3.Bucket("my-bucket")
    file_name = "users.json"
    user_record = json.loads(bucket.Object(file_name).get().content)
    user_record["username"] = request_data["username"]
    user_record["email"] = request_data["email"]
    user_record["password"] = request_data["password"]
    bucket.Object(file_name).put(Body=json.dumps(user_record))

def delete_user(request_data):

    # Delete the user record.
    s3 = boto3.resource("s3")
    bucket = s3.Bucket("my-bucket")
    file_name = "users.json"
    bucket.Object(file_name).delete()

 

This code defines three functions: create_user(), update_user(), and delete_user(). These functions are responsible for creating, updating, and deleting user records. The code uses an S3 bucket to store the user records.

The handler() function is the main function of the Lambda function. This function is called when the Lambda function is invoked. The handler() function gets the request data from the event parameter and then calls the appropriate function to process the request.

 

Lambda function that can be used to build microservices

import json
import logging
import boto3

def handler(event, context):

    # Get the request data.
    request_data = json.loads(event["body"])

    # Process the request.
    if request_data["action"] == "get_products":
        get_products(request_data)
    elif request_data["action"] == "create_product":
        create_product(request_data)
    elif request_data["action"] == "update_product":
        update_product(request_data)
    elif request_data["action"] == "delete_product":
        delete_product(request_data)

    return {
        "statusCode": 200,
        "body": "Success"
    }

def get_products(request_data):

    # Get the product records.
    dynamodb = boto3.resource("dynamodb")
    table = dynamodb.Table("products")
    products = table.scan()

    # Return the product records.
    return {
        "statusCode": 200,
        "body": json.dumps(products["Items"])
    }

def create_product(request_data):

    # Create the product record.
    product_record = {
        "name": request_data["name"],
        "description": request_data["description"],
        "price": request_data["price"]
    }

    # Save the product record.
    dynamodb = boto3.resource("dynamodb")
    table = dynamodb.Table("products")
    table.put_item(Item=product_record)

    return {
        "statusCode": 200,
        "body": "Product created"
    }

def update_product(request_data):

    # Update the product record.
    dynamodb = boto3.resource("dynamodb")
    table = dynamodb.Table("products")
    table.update_item(Key={"name": request_data["name"]}, UpdateExpression="SET description = :description, price = :price", ReturnValues="UPDATED_NEW")

    return {
        "statusCode": 200,
        "body": "Product updated"
    }

def delete_product(request_data):

    # Delete the product record.
    dynamodb = boto3.resource("dynamodb")
    table = dynamodb.Table("products")
    table.delete_item(Key={"name": request_data["name"]})

    return {
        "statusCode": 200,
        "body": "Product deleted"
    }

This code defines four functions: get_products(), create_product(), update_product(), and delete_product(). These functions are responsible for getting, creating, updating, and deleting product records. The code uses a DynamoDB table to store the product records.

The handler() function is the main function of the Lambda function. This function is called when the Lambda function is invoked. The handler() function gets the request data from the event parameter and then calls the appropriate function to process the request.

Example 2:


 import json
import logging
import boto3

def handler(event, context):

    # Get the request data.
    request_data = json.loads(event["body"])

    # Process the request.
    if request_data["action"] == "get_logs":
        get_logs(request_data)
    elif request_data["action"] == "create_log_stream":
        create_log_stream(request_data)
    elif request_data["action"] == "publish_log_event":
        publish_log_event(request_data)

    return {
        "statusCode": 200,
        "body": "Success"
    }

def get_logs(request_data):

    # Get the log events.
    cloudwatch = boto3.client("cloudwatch")
    events = cloudwatch.get_log_events(
        LogGroupName=request_data["log_group_name"],
        LogStreamName=request_data["log_stream_name"],
        StartFromHead=True
    )

    # Return the log events.
    return {
        "statusCode": 200,
        "body": json.dumps(events["Events"])
    }

def create_log_stream(request_data):

    # Create the log stream.
    cloudwatch = boto3.client("cloudwatch")
    cloudwatch.create_log_stream(
        LogGroupName=request_data["log_group_name"],
        LogStreamName=request_data["log_stream_name"]
    )

    return {
        "statusCode": 200,
        "body": "Log stream created"
    }

def publish_log_event(request_data):

    # Publish the log event.
    cloudwatch = boto3.client("cloudwatch")
    cloudwatch.put_log_events(
        LogGroupName=request_data["log_group_name"],
        LogStreamName=request_data["log_stream_name"],
        LogEvents=[
            {
                "Message": request_data["message"],
                "Timestamp": request_data["timestamp"]
            }
        ]
    )

    return {
        "statusCode": 200,
        "body": "Log event published"
    }

This code defines three functions: get_logs(), create_log_stream(), and publish_log_event(). These functions are responsible for getting, creating, and publishing log events to CloudWatch.

The handler() function is the main function of the Lambda function. This function is called when the Lambda function is invoked. The handler() function gets the request data from the event parameter and then calls the appropriate function to process the request.

 Lambda function that will extract the log of a failed Glue job and send it to users with the line where the error is mentioned:

 

import json
import logging
import boto3

def handler(event, context):

    # Get the job execution ID.
    job_execution_id = event["job_execution_id"]

    # Get the log events.
    glue = boto3.client("glue")
    log_events = glue.get_job_run_log_events(
        JobName=event["job_name"],
        ExecutionId=job_execution_id
    )

    # Find the line where the error is mentioned.
    error_line = None
    for log_event in log_events:
        if "ERROR" in log_event["message"]:
            error_line = log_event["message"].split(" ")[0]
            break

    # Send the log to the users.
    users = event["users"]
    for user in users:
        logging.info("Sending log to user {}".format(user))
        boto3.client("ses").send_email(
            Source="noreply@example.com",
            Destination={"ToAddresses": [user]},
            Message={"Subject": "Glue job failed", "Body": {"Text": {"Data": "The Glue job failed at line {}.".format(error_line)}}}
        )

    return {
        "statusCode": 200,
        "body": "Log sent"
    }

This code first gets the job execution ID from the event parameter. The event parameter is a JSON object that contains the details of the failed Glue job.

The code then gets the log events for the job execution. The log events contain the logs for the job execution.

The code then finds the line where the error is mentioned. The code looks for the word "ERROR" in the log events. When the code finds the word "ERROR", it sets the error_line variable to the line number of the log event.

The code then sends the log to the users. The code gets the list of users from the event parameter. The code then sends an email to each user with the log for the failed Glue job.

The handler() function is the main function of the Lambda function. This function is called when the Lambda function is invoked. The handler() function gets the request data from the event parameter and then calls the appropriate function to process the request.


  Lambda function to send failed invocations to a dead-letter queue (DLQ) instead of discarding them

 

import json
import logging
import boto3

def handler(event, context):

    # Get the error message.
    error_message = event["error"]

    # Send the error message to the DLQ.
    sqs = boto3.client("sqs")
    dlq_url = event["dlq_url"]
    sqs.send_message(
        QueueUrl=dlq_url,
        MessageBody=error_message
    )

    return {
        "statusCode": 200,
        "body": "Error sent to DLQ"
    }


This code first gets the error message from the event parameter. The event parameter is a JSON object that contains the details of the failed invocation.

The code then sends the error message to the DLQ. The DLQ is a queue that stores failed invocations. The code uses the sqs.send_message() method to send the error message to the DLQ.

The handler() function is the main function of the Lambda function. This function is called when the Lambda function is invoked. The handler() function gets the request data from the event parameter and then calls the appropriate function to process the request.

how you can pass a SQL file as an event to a Lambda function:

  1. Create a SQL file and store it in an S3 bucket.
  2. Create a Lambda function and configure it to run the following code:

 

{
    "sql_file": "my-sql-file.sql"
}

 
 
Call 2 glue jobs in parallel from lambda function 
 
 To call two AWS Glue jobs in parallel from a Lambda function, you can utilize the boto3 library in Python, which provides the AWS SDK for Python. Here's an example Lambda function code that invokes two Glue jobs concurrently:
 
 
import boto3
import concurrent.futures

def invoke_glue_job(job_name):
    glue_client = boto3.client('glue')
    response = glue_client.start_job_run(JobName=job_name)
    print(f"Glue job '{job_name}' invoked. Run ID: {response['JobRunId']}")

def lambda_handler(event, context):
    # Specify the names of the Glue jobs to invoke
    job1_name = "your_glue_job1_name"
    job2_name = "your_glue_job2_name"

    # Create a thread pool to invoke the Glue jobs concurrently
    with concurrent.futures.ThreadPoolExecutor() as executor:
        futures = [
            executor.submit(invoke_glue_job, job1_name),
            executor.submit(invoke_glue_job, job2_name)
        ]

        # Wait for the Glue jobs to complete
        concurrent.futures.wait(futures)

    return {
        'statusCode': 200,
        'body': 'Glue jobs invoked successfully.'
    }

In this example:

  1. The boto3 library is imported to interact with AWS services.
  2. The invoke_glue_job function is defined, which takes a Glue job name as input and starts a job run using the start_job_run method of the Glue client.
  3. In the lambda_handler function, you can specify the names of the Glue jobs to invoke by replacing "your_glue_job1_name" and "your_glue_job2_name" with the actual names of your Glue jobs.
  4. A thread pool is created using concurrent.futures.ThreadPoolExecutor().
  5. Two Glue jobs are invoked concurrently using the executor.submit() method, passing the invoke_glue_job function and the respective Glue job names.
  6. The concurrent.futures.wait() method is called to wait for the Glue jobs to complete.
  7. The Lambda function returns a response indicating that the Glue jobs were invoked successfully.

Remember to ensure that your Lambda function has the necessary IAM permissions to invoke Glue jobs using the glue:StartJobRun action.

You can deploy this Lambda function and configure it as an event trigger to invoke the Glue jobs in parallel.


 

Comments

Popular posts from this blog

AWS Glue

You can trigger an AWS Glue job based on an event. You can use the following events to trigger a Glue job CloudWatch Events: You can create a CloudWatch Events rule that triggers a Glue job when a specific event occurs. For example, you could create a rule that triggers a Glue job when a new file is created in an S3 bucket. Step Functions: You can use a Step Functions state machine to trigger a Glue job. For example, you could create a state machine that triggers a Glue job when a specific event occurs in another AWS service. Manually: You can manually trigger a Glue job by clicking the Run button in the AWS Glue console. To trigger a Glue job based on an event, you will need to create a trigger in the AWS Glue console. When you create a trigger, you will need to specify the event that you want to trigger the job on, as well as the job that you want to run. Here are the steps on how to trigger a Glue job based on an event: Go to the AWS Glue console. Click Triggers . Click Cr...