官术网_书友最值得收藏!

How it works...

Stream processors listen for data from a streaming service such as Kinesis or DynamoDB Streams. Deploying a stream processor is completely declarative. We configure a function with the stream event type and the pertinent settings, such as the type, arn, batchSize, and startingPosition. The arn is set dynamically using a CloudFormation variable, ${cf:cncb-event-stream-${opt:stage}.streamArn}, that references the output value of the cnbc-event-stream stack.


Streams are the only resources that are shared between autonomous cloud-native services.

We will discuss batch size and starting position in detail in both Chapter 8, Designing for Failure, and Chapter 9, Optimizing Performance. For now, you may have noticed that the new stream processor logged all the events that were published to the stream in the last 24 hours. This is because the startingPosition is set to TRIM_HORIZON. If it was set to LATEST, then it would only receive events that were published after the function was created.

Stream processing is a perfect match for functional reactive programming with Node.js streams. The terminology can be a little confusing because the word stream is overloaded. I like to think of streams as either macro or micro. For example, Kinesis is the macro stream and the code in our stream processor function is the micro stream. My favorite library for implementing the micro stream is Highland.js (https://highlandjs.org). A popular alternative is RxJS (https://rxjs-dev.firebaseapp.com). As you can see in this recipe, functional reactive programming is very descriptive and readable. One of the reasons for this is that there are no loops. If you try to implement a stream processor with imperative programming, you will find that it quickly gets very messy. You also lose backpressure, which we will discuss in Chapter 8, Designing for Failure.

The code in the listener function creates a pipeline of steps that the data from the Kinesis stream will ultimately flow through. The first step, _(event.Records), converts the array of Kinesis records into a Highland.js stream object that will allow each element in the array to be pulled through the stream in turn as the downstream steps are ready to receive the next element. The .map(recordToEvent) step decodes the Base64 encoded data from the Kinesis record and parses the JSON into an event object. The next step, .tap(printEvent), simply logs the event so that we can see what is happening in the recipe.

Kinesis and event streaming, in general, is a member of the high performance, dumb-pipe-smart-endpoints generation of messaging middleware. This means that Kinesis, the dumb pipe, does not waste its processing power on filtering data for the endpoints. Instead, all that logic is spread out across the processing power of the smart endpoints. Our stream processor function is the smart endpoint. To that end, the .filter(forThingCreated) step is responsible for filtering out the events that the processor is not interested in. All the remaining steps can assume that they are receiving the expected event types.

Our bare-boned stream processor needs something somewhat interesting but simple to do. So, we count and print the number of thing-created events in the batch. We have filtered out all other event types, so the .collect() step collects all the remaining events into an array. Then, the .tap(printCount) step logs the length of the array. Finally, the .toCallback(cb) step will invoke the callback function once all the data in the batch has been processed. At this point, the Kinesis checkpoint is advanced and the next batch of events is processed. We will cover error handling and how it relates to batches and checkpoints in Chapter 8, Designing for Failure.

主站蜘蛛池模板: 搜索| 迭部县| 上杭县| 定南县| 乾安县| 江口县| 江口县| 沙雅县| 罗定市| 大兴区| 郑州市| 古丈县| 仪陇县| 霍林郭勒市| 沙湾县| 黑河市| 广水市| 宜兴市| 鄱阳县| 蓝山县| 封开县| 左云县| 鸡西市| 镇坪县| 丹棱县| 九江市| 阜南县| 长丰县| 德清县| 孟连| 吉隆县| 修文县| 吴川市| 大兴区| 德庆县| 土默特右旗| 白水县| 恩平市| 五寨县| 海晏县| 腾冲县|