I am a massive AWS Lambda fan, especially with workflows where you respond to specific events.

In this tutorial, I will keep it basic to demonstrate the power of how you can trigger a AWS Lambda function on a S3 PUT event, so that this can give the reader a basic demonstration to go further and build amazing things.

What will we be doing?

In this tutorial we will be converting CSV files to JSON with the help of Lambda using the Python language.

The workflow will be like this:

  • User uploads his csv file to S3, lets say bucket/input/*.csv
  • We then use CloudWatch events to trigger when data is uploaded to the bucket/uploads/input prefix and has a suffix of .csv
  • We will then trigger our Lamda function to convert the CSV file and write the JSON file to bucket/uploads/output/{year}/{month}/{day}/{timestamp}.json

You can go furhter than that by using S3 Lifecycle Policies to delete objects when they are  older than, lets say 30 days.

Create the S3 Bucket

Head over to AWS S3 and create a New Bucket (or use an existing one):

Use a descriptive name of your choice:

Then your S3 bucket should appear in your console:

Create your Lambda Function

Head over to AWS Lambda and create a function. I will be using Python 3.7 and will be calling it csv-to-json-function:

You can then save the function as is, we will come back to the code.

S3 Events

So what we essentially want to do, when ever someone uploads an object to S3, which MUST match the prefix uploads/input and has the suffix of .csv we want to trigger our Lambda function to act on that event to load the CSV and convert that object to JSON.

Head over to your S3 Bucket:

Select properties and and select events:

and you should see:

click add notification and provide what needs to happen. This step is very importantand should not be done wrong as it could incur in a lot of costs f done wrong.

We are configuring this S3 Event to trigger a Lambda Function when a object is created with a prefix for example: uploads/input/data.csv , lets say for example your Lambda  function writes a .csv file back to the input prefix, your Lambda will go in a triggering loop and will cost a LOT of money, so we have to make sure that our event only listens for .csv suffixes on a uploads/input prefix.

Provide a Name, on the PUT event, provide the prefix uploads/input as an example, then provide the suffix .csv as we only want to trigger if csv files are uploaded and trigger your Lambda function:

it should look like:

IAM User

Now we want to create a IAM user that will be uploading the CSV files to S3.

Head over to IAM, select Policies, Create Policy:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": [
                "s3:PutObject"
            ],
            "Resource": "arn:aws:s3:::ruanbekker.demo.blogpost/uploads/input/*"
        }
    ]
}

I will call this policy s3-uploads-csv-policy, select users, create a new user and tick programmatic access:

Hit next, assign the policy to the user:

Hit create user and make note of your aws access and secret key as the secret key is not retrievable after creation:

AKIAXXXXXXXXXXXXXXXX
ZCTfXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX

Head to your terminal and configure the credentials for that user, I will configure it under the profile csv-uploader:

$ aws configure --profile csv-uploader
AWS Access Key ID [None]: AKIAXXXXXXXXXXXXXXXX
AWS Secret Access Key [None]: ZCTfXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
Default region name [None]: eu-west-1
Default output format [None]: json

Lambda Function

Let's head back to Lambda and write some code that will read the CSV file when it arrives onto S3, process the file, convert to JSON and uploads to S3 to a key named: uploads/output/{year}/{month}/{day}/{timestamp}.json

When the S3 event triggers the Lambda function, this is what's passed as the event:

{
  "Records": [
    {
      "eventVersion": "2.1",
      "eventSource": "aws:s3",
      "awsRegion": "eu-west-1",
      "eventTime": "2020-04-08T19:50:35.471Z",
      "eventName": "ObjectCreated:Put",
      "userIdentity": {
        "principalId": "AWS:x"
      },
      "requestParameters": {
        "sourceIPAddress": "102.132.218.115"
      },
      "responseElements": {
        "x-amz-request-id": "x",
        "x-amz-id-2": "x/x/x"
      },
      "s3": {
        "s3SchemaVersion": "1.0",
        "configurationId": "TriggerLambdaToConvertCsvToJson",
        "bucket": {
          "name": "ruanbekker.demo.blogpost",
          "ownerIdentity": {
            "principalId": "x"
          },
          "arn": "arn:aws:s3:::ruanbekker.demo.blogpost"
        },
        "object": {
          "key": "uploads/input/foo.csv",
          "size": 0,
          "eTag": "x",
          "sequencer": "x"
        }
      }
    }
  ]
}

So we have context on the key name as well as the bucket name.

Onto the code of our Lambda Function, there's probably better ways such as streaming to do this, but the focus is on what the task is doing and not really on the code.

I am reading the CSV file, writing it to the /tmp directory (only path which is writable), processing the data convert to json and write as a json file, then uploads to S3 and remove the files from the disk:

import json
import csv
import boto3
import os
import datetime as dt

s3 = boto3.client('s3')

def lambda_handler(event, context):
    
    datestamp = dt.datetime.now().strftime("%Y/%m/%d")
    timestamp = dt.datetime.now().strftime("%s")
    
    filename_json = "/tmp/file_{ts}.json".format(ts=timestamp)
    filename_csv = "/tmp/file_{ts}.csv".format(ts=timestamp)
    keyname_s3 = "uploads/output/{ds}/{ts}.json".format(ds=datestamp, ts=timestamp)
    
    json_data = []

    for record in event['Records']:
        bucket_name = record['s3']['bucket']['name']
        key_name = record['s3']['object']['key']
        
    s3_object = s3.get_object(Bucket=bucket_name, Key=key_name)
    data = s3_object['Body'].read()
    contents = data.decode('utf-8')
    
    with open(filename_csv, 'a') as csv_data:
        csv_data.write(contents)
    
    with open(filename_csv) as csv_data:
        csv_reader = csv.DictReader(csv_data)
        for csv_row in csv_reader:
            json_data.append(csv_row)
            
    with open(filename_json, 'w') as json_file:
        json_file.write(json.dumps(json_data))
    
    with open(filename_json, 'r') as json_file_contents:
        response = s3.put_object(Bucket=bucket_name, Key=keyname_s3, Body=json_file_contents.read())

    os.remove(filename_csv)
    os.remove(filename_json)

    return {
        'statusCode': 200,
        'body': json.dumps('CSV converted to JSON and available at: {bucket}/{key}'.format(bucket=bucket_name,key=keyname_s3))
    }

Save the Lambda function.

Upload CSV to S3

Back to your terminal, create a CSV file, in my case:

$ cat > data.csv << EOF
name,surname,age,country,city
ruan,bekker,33,south africa,cape town
james,oguya,32,kenya,nairobi
stefan,bester,33,south africa,kroonstad
EOF

Now upload the data to S3 uploads/input/foo.csv . The destination filename can be anything, as long as the prefix is uploads/inputs and suffix with .csv:

$ aws --profile csv-uploader s3 cp data.csv s3://ruanbekker.demo.blogpost/uploads/input/foo.csv
upload: ./data.csv to s3://ruanbekker.demo.blogpost/uploads/input/foo.csv

As we can see from our Lambda Execution Logs on CloudWatch, our execution was successful:

Looking at S3, we can see our key was created as bucket/uploads/output/{year}/{month}/{day/{timestamp}.json

Selecting our object, hit actions and select open, we can see that our CSV file was converted to JSON (note: I pretty printed the file manually for better readability):

Thank You

This was a very basic example of what you can do with S3 Events and Lambda, the sky is the limit, you can do awesome things!