官术网_书友最值得收藏!

Application specifications

Let's start by transforming this placeholder application into an application that counts words – the Hello World equivalent for big data processing frameworks. The functionality is easy to understand and not very important, as our focus here is on the development process.

The full source code of the modified application is available at https://github.com/tweise/apex-samples/tree/master/wordcount. Here is the modified application assembly in Application.java:

@Override
public void populateDAG(DAG dag, Configuration conf)
{
LineByLineFileInputOperator lineReader = dag.addOperator("input",
new LineByLineFileInputOperator());
LineSplitter parser = dag.addOperator("parser", new LineSplitter());
UniqueCounter counter = dag.addOperator("counter", new UniqueCounter());
GenericFileOutputOperator<Object> output = dag.addOperator("output",
new GenericFileOutputOperator<>());
output.setConverter(new ToStringConverter());
dag.addStream("lines", lineReader.output, parser.input);
dag.addStream("words", parser.output, counter.data);
dag.addStream("counts", counter.count, output.input);
}

The pipeline reads from a file (LineByLineFileInputOperator), then each line is split into words (LineSplitter), then occurrences of each word are counted (UniqueCounter), and finally the result is written to the file (GenericFileOutputOperator). Apart from the LineSplitter operator, all other operators are part of the Apex library. After all the operators are added to the DAG, the pipeline is completed connecting the operator (through their ports) using addStream. This is the explicit style of composing the logical DAG (rather than using the high level API), hence the name compositional API. Note that ports must always be defined in their respective operators, and may not always be named input and output.

主站蜘蛛池模板: 孝义市| 安国市| 保康县| 凤城市| 贵阳市| 怀宁县| 苍溪县| 华池县| 海兴县| 夏津县| 墨竹工卡县| 南华县| 泗洪县| 武宁县| 增城市| 年辖:市辖区| 保靖县| 雷山县| 西宁市| 泾阳县| 仁怀市| 桐梓县| 金平| 广平县| 塘沽区| 达拉特旗| 南京市| 依安县| 达拉特旗| 新疆| 英超| 沛县| 江安县| 缙云县| 赫章县| 闽侯县| 文化| 伊宁市| 江陵县| 湘西| 韶山市|