Event Tracking and Analytics via Ruby on Rails, DynamoDB (with Streams), Kinesis Firehose and Athena and CloudWatch Dashboard!

Andrew Brown 🇨🇦 - Aug 11 '19 - - Dev Community

p.s. This is not an end-to-end guide, I documented my journey and figured I would publish with what I had time to document instead of vaulting this knowledge in our private Knowledgebase. Then I happen to put a tech talk together so between the video and content below I hope it helps you create your own Event Tracking and Analytics on AWS.

AWS SDK Initializer

Since we only need DynamoDB add to your Gemfile:

gem 'aws-sdk-dynamodb'
Enter fullscreen mode Exit fullscreen mode

To make it easier to work with the SDK I have in an initializer RAILS_ROOT/config/intializers/aws.rb

You will notice I am aggressively setting credentials. The SDK is supposed to pick up these Environment Variables implicitly but I found in practice it did not when I wrote this. Maybe you don't have to be as verbose here like me here.

creds = Aws::Credentials.new(
   ENV['AWS_ACCESS_KEY_ID'],
   ENV['AWS_SECRET_ACCESS_KEY']
 )
Aws.config.update credentials: creds

module DynamoDB
  def self.resource
    @@dynamodb ||= Aws::DynamoDB::Resource.new({
     region: 'us-east-1',
     credentials: Aws::Credentials.new(
       ENV['AWS_ACCESS_KEY_ID'],
       ENV['AWS_SECRET_ACCESS_KEY']
    )})
    @@dynamodb
  end
end

Enter fullscreen mode Exit fullscreen mode

Probably should be storing the region as an Environment Variable in Figaro

When we want to use DynamoDB all we have to do is the following:

DynamoDB.resource.client.put_item({
  # ...
})
Enter fullscreen mode Exit fullscreen mode

Primary and Sort

Very unique ids such as User IDs make good Primary keys since its better for distribution across partitions.

Dates make very good Sort keys. Your table when queried will be stored ASC based on your SORT key. Explore the DynamoDB table explorer so you have an idea of the limitations of how you can filter.

Notice for Primary we only have the ability to do = and for Sort we have many options.

There are more advanced filter options in the documentation if you can make sense of it.

Tracker

First I define how I want to use my tracker before writing a module.
So this would write to the Dynamo

Putting data:

      Tracker::Put.event({
        user_id:    user.id,
        event_type: 'login',
        user_agent: request.user_agent,
        ip_address: request.remote_ip
      })
Enter fullscreen mode Exit fullscreen mode

Getting Data

    @recent_activity = Tracker::Get.recent_activity @model.id
Enter fullscreen mode Exit fullscreen mode

For the Putting data probably want to put this in an ActiveJob since it's possible having these event calls littered throughout your application can cause the code to block resulting in latency experienced by your team. I think DynamoDB blocks as it waits for a response even though we don't need one.

I created a new module in my lib directory eg. RAILS_ROOT/lib/tracker.rb

# This class is responsible for writing event data
# to various DynamoDB tables and fetching that data
# for display.

module Tracker
  class Entity
    include ActiveModel::Validations

    def initialize(opts={})
      opts.each { |k,v| instance_variable_set("@#{k}", v) }
    end

    attr_accessor :user_id,
                  :event_type,
                  :user_agent,
                  :ip_address,
                  :event_at,
                  :event_id


    validates :user_id        , presence: true, numericality: { only_integer: true }
    validates :event_type     , presence: true, inclusion: { in: %w(
  login
  material-view
)}
    validates :user_agent , presence: true
    validates :ip_address , presence: true
    validates :event_at   , presence: true

    def event_at
      @event_at || Time.now.iso8601
    end
  end

  class Put
    def self.event attrs={}
      entity = Tracker::Entity.new attrs
      unless entity.valid?
        raise ArgumentError, "Tracker Entity invalid permissions"
      end

      DynamoDB.resource.client.put_item({
        item: {
          'user_id'    => entity.user_id,
          'ip_address' => entity.ip_address,
          'user_agent' => entity.user_agent,
          'event_id'   => entity.event_id,
          'event_type' => entity.event_type,
          'event_at'   => Time.now.iso8601 # sort key
        },
        # We don't care about returning cosumed capactiy
        # We can handle looking event tracking data and
        # don't need to be alerted.
        return_consumed_capacity: 'NONE',
        table_name: 'exampro-events'
      })
    end
  end ## Put

  class Get
    def self.recent_activity user_id
      result =
      DynamoDB.resource.client.query({
        expression_attribute_values: {
          ":user_id"    => user_id
        },
        # https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Query.html#Query.KeyConditionExpressions
        key_condition_expression: "user_id = :user_id",
        limit: 50,
        projection_expression: 'ip_address,event_type,event_at,user_agent', # select statement
        scan_index_forward: false, # descending order
        table_name: 'exampro-events'
      }).items
      result.each do |t|
        t['event_at']   = DateTime.parse(t['event_at'])
        unless t['user_agent'].blank?
          t['user_agent'] = DeviceDetector.new(t['user_agent'])
        end
      end
      result
    end

    def self.logins user_id, event_type
      result =
      DynamoDB.resource.client.query({
        expression_attribute_values: {
          ":user_id"    => user_id,
          ":event_type" => event_type
        },
        key_condition_expression: "user_id = :user_id",
        filter_expression: "event_type = :event_type",
        limit: 10,
        projection_expression: 'ip_address,event_type,event_at,user_agent', # select statement
        scan_index_forward: false, # descending order
        table_name: 'exampro-events'
      }).items
      result.each do |t|
        t['event_at']   = DateTime.parse(t['event_at'])
        unless t['user_agent'].blank?
          t['user_agent'] = DeviceDetector.new(t['user_agent'])
        end
      end
      result
    end
  end ## Get
end
Enter fullscreen mode Exit fullscreen mode

Tracker::Entity

I have this Entity class. Its purpose is to validate the format of arguments. I would probably enrich this further in the future with a metadata attribute.

Tracker::Put

I have a class for Put which for writing to DynamoDB. Currently, I only have one method but may in the future add more.

Tracker::Get

I have another class called Get which queries data from DyanmoDB

DateTime as String

Another thing to note is that I am converting the time to a string Time.now.iso8601. DynamoDB does not have a DateTime datatype.

This StackOverflow does a good explaining what to consider when choosing what format to use your dates.

I care about readability so ISO 8601 is a good format.
I don't care about using TTL (Time to live) since I don't need to expire records from my DynamoDB to prune the DB.
You have DynamoDB only stream TTL events which is interesting.
What matters most is when filtering the date I can use the BETWEEN to filter between two ranges.

scan_index_forward

We are using scan_index_forward: false to change the sort to be DESC instead of ASC.

projection_expression

We only want specific attributes returned from the database so thati s the purposes of:
projection_expression: 'ip_address,event_type,event_at,user_agent'

return_consumed_capacity

We are using return_consumed_capacity: 'NONE' because I don't care about getting a response back. If there was a capacity issue I have an alarm where I would take action. Since this is event data I don't are some event tracking is dropped.

DeviceDetector

We are passing our user_agent through DeviceDetector gem eg.
DeviceDetector.new(t['user_agent'])

It so in our dashboard for our app I can get human readable values such as if they are on a phone/desktop, windows/mac or using a specific web browser.

DynamoDB

Enabling DyanmoDB Streams

We are going to need to turn on DynamoDB streams.

To have streams trigger a lambda under the Triggers tab we will add an existing function. You may need to click more to find this Triggers tab.

When a record is inserted into DynamoDB. Streams will allow us to pass the puts in batches to a Lambda function.

We only want New Images. I believe a record it first put its an "Old Image" and does not contain all data. Then when all data is written it is a "New Image".

We will leave it for batches of 100. This doesn't mean the Streams will wait until it has 100 records to send but can send up to 100 at a time.

We can see our Lambda is attached. If an error occurs on this Lambda sometimes its smart to check here to find out at a glance if the Lambda is failing.

Here we can see the records in our DynamoDB table

We need to create a policy which allows Lambda to accept data from a specific DynamoDB Stream.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": [
                "dynamodb:GetShardIterator",
                "dynamodb:DescribeStream",
                "dynamodb:GetRecords"
            ],
            "Resource": "arn:aws:dynamodb:us-east-1:ACCOUNT-ID:table/exampro-events/stream/2019-06-30T11:17:05.770"
        },
        {
            "Sid": "VisualEditor1",
            "Effect": "Allow",
            "Action": "dynamodb:ListStreams",
            "Resource": "*"
        }
    ]
}
Enter fullscreen mode Exit fullscreen mode

We need to allow our lambda function to stream data to our Kinesis Firehose

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": [
                "firehose:PutRecord",
                "firehose:PutRecordBatch"
            ],
            "Resource": "arn:aws:firehose:us-east-1:ACCOUNT-ID:deliverystream/exampro-events"
        }
    ]
}
Enter fullscreen mode Exit fullscreen mode

Then I attach these two new policies to a role which is then attached to my Lambda function.

Lambda that streams data from DynamoDB to Firehose

Since DynamoDB Streams can deliver data in batches we are going to use the put_record_batch

We need to supply the delivery_stream_name. Probably should place this in Environment Variables instead of how I'm hardcoding here.

Even though we are never going to update DynamoDB records we are going to only publish events to stream for INSERT

require 'json'
require 'aws-sdk-firehose'

def lambda_handler(event:, context:)
  records = []
  event['Records'].each do |t|
    if t['eventName'] == 'INSERT'
      records.push({data: {
        user_id:  t['dynamodb']['NewImage']['user_id']['N'],
        event_at: t['dynamodb']['NewImage']['event_at']['S'],
        event_id: t['dynamodb']['NewImage']['event_id']['N'],
        event_type: t['dynamodb']['NewImage']['event_type']['S'],
        ip_address: t['dynamodb']['NewImage']['ip_address']['S'],
        user_agent: t['dynamodb']['NewImage']['user_agent']['S']
      }.to_json + "\n" })
    end
  end
  json = {records_size: records.size}.to_json
  puts json
  unless records.size.zero?
    firehose = Aws::Firehose::Resource.new
    resp = firehose.client.put_record_batch({
      delivery_stream_name: "exampro-events", # required
      records: records
    })
    json = {failed_put_count: resp.failed_put_count}.to_json
    puts json
  end

  return true
end

Enter fullscreen mode Exit fullscreen mode

Json records on newline

You will notice I am adding a new line at then of our json string.

.to_json + "\n"

This is very important because when Athena reads our json files it expects each json record to be on its own line. If they are all on one line it will read only one record.

Json Log Events

Notice that I am converting my hash to json and then using puts to log it. This is how you log Json events so we can then use a Metric Filter for later. You cannot just puts a hash, you have to convert it to json.

  json = {records_size: records.size}.to_json
  puts json
Enter fullscreen mode Exit fullscreen mode

SDK vs KPL

If you're wondering why I'm not using KPL (Kinesis Producer Library) I could have but I would have had to use a Java Lambda and its configuration is more complicated. KPL is more efficient but for our use-case we don't need to KPL. You can read more about KPL in the documentation

Metric Filter

Based on the Filter and Pattern Syntax under Publishing Numerical Values Found in Log Entries we can select an attribute of a JSON Log Event and then log it.

So for the metric filter, we want to filter json log events with an attribute of records_size greater than 0

{ $.records_size > 0 }
Enter fullscreen mode Exit fullscreen mode

For the metric value, we will supply the attribute we want it to then collect

$.records_size
Enter fullscreen mode Exit fullscreen mode

Define Metric Filter

View created metric filter

You cannot add a Metric Filter to your Cloudwatch Dashboard until data has been published to it.

How to find metric filter after its created

If you ever need to find this filter metric its shows up under Logs as a column in the logs table.

Kinesis Firehose

Pricing

Kinesis Firehose is incredibly affordable at $0.029/GB so 500 GB = $14 USD. Other Kinesis can have a very expensive base cost.

But what about Kinesis Data Analytics?

You will see there is another AWS Kinesis service called Kinesis Data Analytics and you make think you nee this expensive service based on its name.

Kinesis Data Analytics lets you run queries (SQL) on incoming streaming data. I am thinking that Kinesis Data Analytics might be faster at proactivity producing real-time analytics because it crunches data as it comes in.

Using Firehose we just dump out data to S3. When someone needs to see an up to date dashboard we can just query Athena with a Lamba function, dump the results back into DynamoDB or maybe as a json file and then display that to the user. We can decide to only generate new analytics only if the last version compiled is out of date by say 5 mins.

Creating Firehose

The dashboard is a bit confusing so you look where I created my Firehose stream.

We could transform our data via Kinesis but for us, this is not necessary since we can apply our transformation prior Lambda and we do. If you have data coming from multiple sources you may want this lambda to normalize the data as guarantee its consistent. Since we only ingest data from one lambda function this is a minimal risk for us.

I have this option set to disabled but I just wanted to show you then the data can be transformed by Glue into Parquet files which are much more performant when using Athena. This is not a pain point for us currently so we are going to leave the data as is which is json. Also, I didn't feel like calculating the cost of Glue here at scale.

I had read somewhere in the docs that compression was needed for encryption in a specific use-case. When I used Glue create table using a crawler on snappy compression it produced a bizarre schema so I rolled back on this and just encrypted using KMS.

Since I am storing IP addresses I consider this sensitive data. We run Macie, so uncertain if it would alert on this if unencrypted.

The reason we collect IP Addresses for our click event data is to detect abnormal behaviour of a user. Such as account sharing, scraping or etc.

Athena

We need a database and table.

Database and Table via Glue Catalog and Glue Crawler

This is one way for you to create your Database and table.
So create a database. I am not going to recommend this way but showing you it can be done.

We will also need a table. We could easily define the columns manually but if we already have data in our S3 bucket we can just use a crawler once to determine that schema for us. So you choose your datastore being the s3 bucket and it does the rest.

If we check out table it should have determined our schema.

Database and Athena via SQL

When using Glue via automatic cralwer it would guess the wrong column types and did not partition based on date. We can just create what we need directly in Athena.

Create our database

CREATE DATABASE exampro_events
LOCATION 's3://exampro-events/';
Enter fullscreen mode Exit fullscreen mode

And now create the table

CREATE EXTERNAL TABLE exampro_events.events (
  user_id    INT,
  event_at   STRING,
  event_id   INT,
  event_type STRING,
  user_agent STRING,
  ip_address STRING
) 
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES ('paths' = 'user_id,event_at,event_id,event_type,user_agent,ip_address')
LOCATION 's3://exampro-events/';
Enter fullscreen mode Exit fullscreen mode

Ensure the location ends with a forward slash or you'll get an error about the path.

ROW FORMAT SERDE tells it the data will be in JSON format.

A SerDe (Serializer/Deserializer) is a way in which Athena interacts with data in various formats.

Notice that for event_at I set it as STRING instead of TIMESTAMP. iso8601 is not the correct format for date, and we could change all our code to comply though since Athena has this sql function from_iso8601_timestamp I'm not concerned unless I run into a performance or limitations on the ability to query.

Athena expects this format: 2008-09-15 03:04:05.324

Partitions

You can partition your tables on things such as date eg. Year 2020. This might be something I want to do in the future but for the time being, I am ignoring partitions.

Querying in Athena

To get started click on the ellipses beside the table and Preview Table. It will create the query and show you some data so you can save yourself the trouble to type all this yourself.

Writing Athena queries can be a painful experience even with prior SQL knowledge. Read the docs to help you learn the SQL syntax

CloudWatch Dashboard

If something goes wrong we want to have a CloudWatch Dashboard to gain some insight.

We are going to add a widget

Here we can see our custom Metric. If you don't see it here its because data has yet to ever be collected so ensure data is being logged and your metric filter is correctly filtered.

So there is our record-size. The other filter is just an old test one.

So here is my line graph. I don't know how useful it is but just getting something in here. Remember to Save dashboard !!!!

In DynamoDB there is the metric which could be useful to compare against the records which could be filtered in our Lambda.

Added a few more widgets.
We can see how many records are streaming, how many records the lambda passes to Firehose, how many incoming records were received, and how many were delivered to S3. Still missing Athena. We will get there.

Fake Data via Rake Command

I wanted some login data for the past 7 days so I can compose my Athena query to group logins per day for the week.

Rake commands are great for this. Also, I suppose you could test your read/write capacity using this method.

require 'faker'
namespace :track do
  namespace :put do
    task :login  => :environment do
      50.times.each do |t|
        ip_address = Faker::Internet.public_ip_v4_address
        user_agent = Faker::Internet.user_agent
        event_at   = rand(1..7).days.ago.iso8601

        v = [0..4].sample
        Tracker::Put.event({
          user_id: 1,
          event_type: 'login',
          user_agent: user_agent,
          ip_address: ip_address,
          event_at: event_at
        })

        puts "#{ip_address} - #{user_agent} - #{event_at}"
        sleep 0.2 # sleep 1/5th of a second
      end # x.times
    end #login
  end #put

  namespace :get do
    task :logins  => :environment do
      results = Tracker::Get.logins 1
      puts results
    end #logins
  end #get
end # track
Enter fullscreen mode Exit fullscreen mode

So here I am running my rake command to create logins:

~/Sites/exampro-projects/exampro[master]: rake track:put:login
11.174.250.238 - Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_3) AppleWebKit/537.75.14 (KHTML, like Gecko) Version/7.0.3 Safari/7046A194A - 2019-06-29T16:46:23Z
143.251.23.90 - Mozilla/5.0 (compatible; MSIE 9.0; AOL 9.7; AOLBuild 4343.19; Windows NT 6.1; WOW64; Trident/5.0; FunWebProducts) - 2019-06-23T16:46:24Z
57.161.250.74 - Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7.0; AS; rv:11.0) like Gecko - 2019-06-29T16:46:24Z
170.128.151.22 - Mozilla/5.0 (compatible; MSIE 9.0; AOL 9.7; AOLBuild 4343.19; Windows NT 6.1; WOW64; Trident/5.0; FunWebProducts) - 2019-06-29T16:46:24Z
65.166.116.179 - Mozilla/5.0 (Windows NT x.y; Win64; x64; rv:10.0) Gecko/20100101 Firefox/10.0 - 2019-06-29T16:46:25Z
54.85.94.162 - Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7.0; AS; rv:11.0) like Gecko - 2019-06-24T16:46:25Z
56.33.98.190 - Mozilla/5.0 (compatible; MSIE 9.0; AOL 9.7; AOLBuild 4343.19; Windows NT 6.1; WOW64; Trident/5.0; FunWebProducts) - 2019-06-23T16:46:25Z
139.173.42.58 - Mozilla/5.0 (Windows NT x.y; Win64; x64; rv:10.0) Gecko/20100101 Firefox/10.0 - 2019-06-29T16:46:25Z
107.234.132.121 - Mozilla/5.0 (compatible; MSIE 9.0; AOL 9.7; AOLBuild 4343.19; Windows NT 6.1; WOW64; Trident/5.0; FunWebProducts
Enter fullscreen mode Exit fullscreen mode
. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .