-
Notifications
You must be signed in to change notification settings - Fork 102
/
Copy pathapp.ts
80 lines (66 loc) · 2.5 KB
/
app.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
//Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
//SPDX-License-Identifier: MIT-0
/**
* Lambda Handler for the typescript kinesis-lambda-dynamodb
* This handler accepts a kinesis event with records that contain JSON object in data property
* The DynamoDB Table used is passed as an environment variable "PROCESSED_RECORDS_TABLE_NAME"
*/
import { KinesisStreamEvent, KinesisStreamRecord } from 'aws-lambda';
import { DynamoDBClient } from "@aws-sdk/client-dynamodb";
import {
BatchWriteCommand,
DynamoDBDocumentClient,
} from "@aws-sdk/lib-dynamodb";
const ddbClient = new DynamoDBClient({});
const ddbDocumentClient = DynamoDBDocumentClient.from(ddbClient);
// Porcessed record is type stored in the DDB Table
export type ProcessedRecord = {
PK: string,
SK: string,
}
// Record type expected in the Kinesis Data Stream data paload
export type UnprocessedRecord = {
batch: string,
id: string,
}
export const lambdaHandler = async (event: KinesisStreamEvent): Promise<void> => {
// Getting the dynamoDB table name from environment variable
const dynamoDBTableName = process.env.PROCESSED_RECORDS_TABLE_NAME;
let itemBatch : ProcessedRecord[] = [];
const batchPromises: Promise<void>[] = [];
for (let index = 0; index < event.Records.length; index++) {
const item = createRecordItem(event.Records[index]);
itemBatch.push(item);
const isLastItem = index === event.Records.length - 1;
// DDB BatchWriteItem is limited to 25 items
if (isLastItem || itemBatch.length === 25) {
// store batch in the DDB Table and reset itemBatch
batchPromises.push(storeBatchInTable(itemBatch, dynamoDBTableName));
itemBatch = [];
}
}
await Promise.all(batchPromises);
};
const createRecordItem = (record: KinesisStreamRecord): ProcessedRecord => {
const payload = Buffer.from(record.kinesis.data, 'base64').toString('ascii');
const data = JSON.parse(payload) as UnprocessedRecord;
return {
PK: data.batch,
SK: data.id,
};
}
const storeBatchInTable = async (records: ProcessedRecord[], tableName: string): Promise<void> => {
const writeCommand = new BatchWriteCommand({
RequestItems: {
[tableName]: records.map((r) => ({
PutRequest: { Item: r }
})),
},
});
try {
const response = await ddbDocumentClient.send(writeCommand);
console.log(response)
} catch (e) {
console.error(e);
}
};