web statistics
FraudAway integration with AWS Kinesis
top of page
surface-of-the-bill-note-filled-with-flow-of--dotted-data-stream-flowing--on-the-black-bac

FraudAway integration with AWS Kinesis

Updated: Nov 8, 2023

This article serves as a technical guide to the integration of FraudAway with AWS Kinesis. It provides a breakdown of each integration step to accommodate the integration process. As indicated in the title, the tutorial primarily deals with AWS integration. However, it's essential to note that the same integration blueprint can be seamlessly adapted to accommodate alternative components, such as open-source solutions like Kafka or extended to other cloud platforms like Azure and Google for pub/sub integration and function execution.


Overall architecture

Bulk Offline Data Processing forms the foundational pillar of any robust AML solution. It involves the comprehensive collection, storage, and organization of vast amounts of transaction data. This data can include customer profiles, transaction histories, and external data sources, allowing financial institutions to create a comprehensive digital footprint for every customer.

This article complements the overall AML offline process as described in this blog Strengthening the Financial Fortress: The Four Pillars of Anti-Money Laundering Solutions


AML architecture blueprint
AML architecture blueprint

Here we showcase an example with blueprint architecture based on AWS micro services, where FraudAway rules engine is placed to detect and prevent fraudulent attempts. More about the exact policy rules you can find in another article. For simplicity’s sake we'll explore a specific data flow process that involves pushing messages to Kinesis and optimising the transfer of these records for further processing with FraudAway.


ETL to Kinesis integration

Integration steps

For this use case, we have split the implementation in the following steps:

  • prepare Kinesis in AWS

  • prepare the JSON BULK file

  • sending data towards Kinesis

  • attaching Lambda to the Kinesis stream

  • invocation of policy rules from Lambda towards FraudAway

Prepare Kinesis stream in AWS

First, ensure that you have a Kinesis stream created and ready to receive data.

Prepare JSON Bulk file

You can find the test file here below, it includes several data points which will be used to test the implementation end to end (in real world example, these messages will come as the outcome of the ETL process). Note that it assumes that cumulative calculation for consecutive transfer was done during the ETL process. Some future articles will deal with use cases where this is not the case.


sampleData
.json
Download JSON • 2KB

Send data to the Kinesis stream

To send data to a Kinesis stream a producer is necessary. In this sample code we have the putRecord function that will act as our producer and some additional logic coming with the sendRecordsWithDelay function so that we can send an arbitrary number of records with some delay between them if we want to do that.


Here is the code snippet that allows you to push BULK file towards Kinesis:


const AWS = require('aws-sdk');
const fs = require('fs');

const kinesis = new AWS.Kinesis();

AWS.config.update({
  region: 'your-region',
  accessKeyId:'your-access-key-id',
  secretAccessKey:'your-secret-access-key',
  sessionToken:'your-session-token',   
  });

const sampleData = JSON.parse(fs.readFileSync('sampleData.json', 'utf8'));

function putRecord(data) {
  const params = {
    Data: Buffer.from(JSON.stringify(data), 'utf-8').toString('base64'),
    PartitionKey: 'your-partition-key', 
    StreamName: 'your-kinesis-stream',
  };
  kinesis.putRecord(params, (err, data) => {
    if (err) {
      console.error('Error putting record:', err);
    } else {
      console.log('Record put:', data);
    }
  });
}

function sendRecordsWithDelay(recordCount, delayInSeconds) {
  let recordsSent = 0;
  let currentIndex = 0;

  const interval = setInterval(() => {
    if (recordsSent >= recordCount) {
      clearInterval(interval);
    } else {
      if (currentIndex >= sampleData.length) {
        currentIndex = 0;
      }
      putRecord(sampleData[currentIndex]);
      recordsSent++;
      currentIndex++;
    }
  }, delayInSeconds * 1000);
}
// Example: Send 100 records with a 0-second delay between each record
sendRecordsWithDelay(100,0);


Attaching Lambda to the Kinesis stream

In the next step we will create a lambda function that will be attached to the Kinesis stream:


Now let's look further in how this function is implemented:


Running a Bulk Operation Towards FraudAway

To optimise for speed, you may want to perform bulk operations towards the FraudAway, which allows you to process data in larger chunks, reducing the overhead.


Here's how to run setup up the lambda for bulk operation: select a batch of Records: In this example the criteria for batch selection will be set to 100 received records or 60 seconds window. This can be done via lambda trigger settings.


In the code snippet below, you can observe two important parts:

  • Data Transformation: Perform any necessary data transformation or enrichment to ensure that the data is in the format expected by the template.

  • Bulk Rules Processing: Send selected records in bulk towards the rules engine. This minimises the number of requests made to the rules engine, resulting in more efficient processing.

const axios = require("axios");
const { env } = require("node:process");

// change depending on customer deployment
const domain = "https://example.io"

// change template depending on events
const template = "AML_Rules"

exports.handler = async (event) => {
  console.log("Handling event:", event);
  // each entry in this array will be used as input to a template run
  const transactions = [];
  for (const record of event.Records) {
    try {
      const recordData = await getRecordDataAsync(record.kinesis);
      transactions.push(recordData);
    } catch (err) {
      console.error(
        `An error occurred, perhaps we need to push this data somewhere: ${err}`
      );
    }
  }
  console.log(`Successfully processed ${event.Records.length} records.`);

  try {
    if (transactions.length > 0) {

	// Modify targetUrl accordingly, as an enhancement we could automatically compose the targetUrl based on incoming events, using a lookup function
      const targetUrl = domain + "/rules/v1/templates/" + template + "/run";
      const payload = {
        data: transactions
      };
      // Axios configuration for basic authentication
      const axiosConfig = {
        auth: {
          username:env.username,
          password:env.password,
        },
      };

      const response = await axios.post(targetUrl, payload, axiosConfig);
      console.log(`HTTP Status Code: ${response.status}`);
      console.log("Response Data:", response.data);
    } else {
      console.log("No records/transactions to process ");
    }
  } catch (error) {
    // it would make sense to push failed to process messages/transactions into another queue or try to reprocess it
    console.error("Error:", error);
  }
};

async function getRecordDataAsync(payload) {
  var data = JSON.parse(Buffer.from(payload.data, "base64").toString("utf-8"));
  // all is returned as an array with a single object while a template can receive multiple input objects,in our case there is only one input needed by our template.
  // for more info please check: https://docs.waylay.io/#/api/rules/?id=running-a-template
  return [data];
}

Please note that we have left couple of things out of this example. Most notably, we decided to always call the same rules template. In reality, you can create additional mapping function, kind of a router that depending on the event type or data fields in the message can call different templates. Other option was to embed the routing within a template itself (which is exactly how this example was built), and then call different subflows from the parent template.



Conclusion


Efficient data flow from ETL process to FraudAway is essential for BULK data processing. This process can be further enhanced with reports, analytics and case management (which will be covered in another article). As mentioned earlier, this article can serve as a guideline and can be tailored further towards your needs.


As you embark on your integration journey, remember to consult our documentation and explore more advanced features and optimisations to tailor the process to your specific use case.







42 views0 comments
bottom of page