- Create the project from the following template:
$ sls create --template-url https://github.com/danteinc/js-cloud-native-cookbook/tree/master/ch2/replaying-events --path cncb-replaying-events
- Navigate to the cncb-replaying-events directory with cd cncb-replaying-events.
- Review the file named ./lib/replay.js with the following content:
exports.command = 'replay [bucket] [prefix]'
exports.desc = 'Replay the events in [bucket] for [prefix]'
const _ = require('highland');
const lodash = require('lodash');
const aws = require('aws-sdk');
aws.config.setPromisesDependency(require('bluebird'));
exports.builder = {
bucket: {
alias: 'b',
},
prefix: {
alias: 'p',
},
function: {
alias: 'f',
},
dry: {
alias: 'd',
default: true,
type: 'boolean'
},
region: {
alias: 'r',
default: 'us-east-1'
},
}
exports.handler = (argv) => {
aws.config.logger = process.stdout;
aws.config.region = argv.region;
const s3 = new aws.S3();
const lambda = new aws.Lambda();
paginate(s3, argv)
.flatMap(obj => get(s3, argv, obj))
.flatMap(event => invoke(lambda, argv, event))
.collect()
.each(list => console.log('count:', list.length));
}
const paginate = (s3, options) => {
let marker = undefined;
return _((push, next) => {
const params = {
Bucket: options.bucket,
Prefix: options.prefix,
Marker: marker // paging indicator
};
s3.listObjects(params).promise()
.then(data => {
if (data.IsTruncated) {
marker = lodash.last(data.Contents)['Key'];
} else {
marker = undefined;
}
data.Contents.forEach(obj => {
push(null, obj);
})
})
.catch(err => {
push(err, null);
})
.finally(() => {
if (marker) { // indicates more pages
next();
} else {
push(null, _.nil);
}
})
});
}
const get = (s3, options, obj) => {
const params = {
Bucket: options.b,
Key: obj.Key
};
return _(
s3.getObject(params).promise()
.then(data => Buffer.from(data.Body).toString())
)
.split() // EOL we added in data lake recipe transformer
.filter(line => line.length != 0)
.map(JSON.parse);
}
const invoke = (lambda, options, event) => {
let payload = {
Records: [
{
kinesis: {
partitionKey: event.kinesisRecordMetadata.partitionKey,
sequenceNumber: event.kinesisRecordMetadata.sequenceNumber,
data: Buffer.from(JSON.stringify(event.event)).toString('base64'),
kinesisSchemaVersion: '1.0',
},
eventID: `${event.kinesisRecordMetadata.shardId}:${event.kinesisRecordMetadata.sequenceNumber}`,
eventName: 'aws:kinesis:record',
eventSourceARN: event.firehoseRecordMetadata.deliveryStreamArn,
eventSource: 'aws:kinesis',
eventVersion: '1.0',
awsRegion: event.firehoseRecordMetadata.region,
}
]
};
payload = Buffer.from(JSON.stringify(payload));
const params = {
FunctionName: options.function,
InvocationType: options.dry ? 'DryRun' :
payload.length <= 100000 ? 'Event' : 'RequestResponse',
Payload: payload,
};
return _(
lambda.invoke(params).promise()
);
}
- Install the dependencies with npm install.
- Deploy the stack with npm run dp:lcl -- -s $MY_STAGE.
- Replay events with the following command:
$ node index.js replay -b cncb-data-lake-s3-john-bucket-396po814rlai -p john-cncb-event-stream-s1 -f cncb-replaying-events-john-listener -dry false
[AWS s3 200 0.288s 0 retries] listObjects({ Bucket: 'cncb-data-lake-s3-john-bucket-396po814rlai',
Prefix: 'john-cncb-event-stream-s1',
Marker: undefined })
[AWS s3 200 0.199s 0 retries] getObject({ Bucket: 'cncb-data-lake-s3-john-bucket-396po814rlai',
Key: 'john-cncb-event-stream-s1/2018/04/08/03/cncb-data-lake-s3-john-DeliveryStream-13N6LEC9XJ6DZ-3-2018-04-08-03-53-28-d79d6893-aa4c-4845-8964-61256ffc6496' })
[AWS lambda 202 0.199s 0 retries] invoke({ FunctionName: 'cncb-replaying-events-john-listener',
InvocationType: 'Event',
Payload: '***SensitiveInformation***' })
[AWS lambda 202 0.151s 0 retries] invoke({ FunctionName: 'cncb-replaying-events-john-listener',
InvocationType: 'Event',
Payload: '***SensitiveInformation***' })
[AWS lambda 202 0.146s 0 retries] invoke({ FunctionName: 'cncb-replaying-events-john-listener',
InvocationType: 'Event',
Payload: '***SensitiveInformation***' })
count: 3
- Take a look at the logs:
$ sls logs -f listener -r us-east-1 -s $MY_STAGE
START ...
2018-04-17 23:43:14 ... event: {"Records":[{"kinesis":{"partitionKey":"ccfd67c3-a266-4dec-9576-ae5ea228a79c","sequenceNumber":"49583337208235522365774435506752843085880683263405588482","data":"...","kinesisSchemaVersion":"1.0"},"eventID":"shardId-000000000000:49583337208235522365774435506752843085880683263405588482","eventName":"aws:kinesis:record","eventSourceARN":"arn:aws:firehose:us-east-1:123456789012:deliverystream/cncb-data-lake-s3-john-DeliveryStream-13N6LEC9XJ6DZ","eventSource":"aws:kinesis","eventVersion":"1.0","awsRegion":"us-east-1"}]}
END ...
REPORT ... Duration: 10.03 ms Billed Duration: 100 ms ... Max Memory Used: 20 MB
...
- Remove the stack once you have finished with npm run rm:lcl -- -s $MY_STAGE.