S3 Event Notifications in NiFi
The desire to pull newly uploaded files from storage is common, and typically the ListFile -> FetchFile pattern has been used in NiFi to monitor for, and then pull, new files as they arrive. Taking NiFi in to the cloud means that we’re often working with cloud object storage, like Amazon S3. While we can use a similar pattern with ListS3 -> FetchS3Object, we could instead build a more robust flow by reacting to Amazon S3 Event Notifications.
This feature enables S3 to publish messages to an Amazon SNS topic to notify downstream services of changes in the S3 bucket. This way, rather than having to actively maintain a state of the directory in NiFi, we can just wait to be told that something has changed, and choose how we repond to it.
The Event Notification feature writes to SNS and, while NiFi can poll SNS topics with GetSNS, it’s recommended to have SNS persist messages in to an SQS queue. By doing this, we can have many downstream applications (or just many flows) that can all consume the same messages independently - and we get more options to handle failures.
So, the basic architecture of this goes something like:
Sample Data Files
For this example I will be using 2 sample files.
File 1 - sample-data1.json
[
{
"color": "red",
"value": "#f00"
},
{
"color": "green",
"value": "#0f0"
},
{
"color": "blue",
"value": "#00f"
}
]
File 2 - sample-data2.json
[
{
"color": "magenta",
"value": "#f0f"
},
{
"color": "yellow",
"value": "#ff0"
},
{
"color": "black",
"value": "#000"
}
]
Setting up AWS
Start by creating an S3 bucket.
Then create a new standard SQS Queue.
Next, create a new standard SNS Topic. Once created, add your S3 bucket to the access policy for the SNS Topic. For this, the below policy is enough. Be sure to replace
{
"Version": "2012-10-17",
"Id": "example-ID",
"Statement": [
{
"Sid": "example-statement-ID",
"Effect": "Allow",
"Principal": {
"Service": "s3.amazonaws.com"
},
"Action": ["SNS:Publish"],
"Resource": "<sns topic ARN>",
"Condition": {
"ArnLike": { "aws:SourceArn": "arn:aws:s3:*:*:<bucket name>" },
"StringEquals": { "aws:SourceAccount": "<account ID>" }
}
}
]
}
Now add a new subscription to the SNS Topic, which will look like this.
Once created, you need to go back to the SQS Queue and find the subscription in the SNS Subscriptions tab and confirm the subscription - tick the SNS item, click the big orange Subscribe to Amazon SNS topic button, select the SNS resource and confirm.
Now go back to the S3 bucket you created, and click on the Properties tab. Scroll down to Event Notifications. Create a new event notification, set a name and select the operations you want to be notified about - for this example, I will select All object create events.
Lastly, take a note of these details:
- SQS Queue URL (go to the SQS Queue in AWS and you will see the URL in the details)
- IAM Role Access Key ID & Secret Access Key (If you don’t already have one, go to AWS IAM and generate an Access Key for your IAM user)
- The AWS Region you created your resources in (e.g. EU Ireland)
Further information here on the AWS docs.
Setting up NiFi
If you don’t already have NiFi installed, I have a guide on installing NiFi 1.14.0.
Add a new Process Group to contain our flow, I’ll call this one ‘s3-event-notification-in-nifi’. Right click the new Process Group and click Configure. Under Process group parameter context select Create new parameter context. Give the parameter context a name. Add 3 parameters with the details from AWS:
- SQS Queue URL
- Access Key ID (tick Yes for Senstive Property)
- Secret Access Key (tick Yes for Senstive Property)
Drag in a GetSQS processor to the root flow and configure the following properties (we use the parameters we just created)
- Queue URL = #{SQS Queue URL}
- Access Key ID = #{Access Key ID}
- Secret Access Key = #{Secret Access Key}
- Region = pick the region you used in AWS
Drag in an EvaluateJsonPath. Set the following properties:
- Destination = flowfile-content.
Add a new dynamic property:
- message = $.Message
The Event Notification message contains a key called Message which contains an escaped JSON object string. Here replace the content of the FlowFile with this JSON object (now un-escaped).
Drag in another EvaluateJsonPath. Set the following properties:
- Destination = flowfile-attribute
Add a the following dynamic properties:
- bucket = $.Records[0].s3.bucket.name
- filename = $.Records[0].s3.object.key
Drag in a FetchS3Object. Set the following properties:
- Access Key ID = #{Access Key ID}
- Secret Access Key = #{Secret Access Key}
- Bucket = ${bucket}
- Object Key = ${filename}
Drag in a Funnel. Connect the processors up like this:
GetSQS (success) > EvaluateJSONPath (1, matched) > EvaluateJSONPath (2, matched) > FetchS3Object (success) > Funnel
Start all of the processors.
Now go back to the S3 bucket in AWS and upload the two sample json files.
Return to NiFi and wait. The SQS notification will usually take a few seconds to arrive. The Funnel at the end of the flow will end up with 2 FlowFiles containing the contents of the 2 sample files.
We’re done!
This is an alternative strategy to using the List > Fetch pattern in NiFi, often called an ‘Event Driven Fetch’ pattern.