官术网_书友最值得收藏!

Writing the integration test

The test covers the entire DAG and will run the application in embedded mode. In embedded mode, all operators and containers share the JUnit JVM. Containers are threads (instead of separate processes) but the data flow still behaves as if operators lived in separate processes. This means operators execute asynchronously as they would in a distributed cluster and data is transferred over the loopback interface (if that's how the streams are configured).

@Test 
public void testApplication() throws Exception { 
  EmbeddedAppLauncher<?> launcher = Launcher.getLauncher(LaunchMode.EMBEDDED); 
  Attribute.AttributeMap launchAttributes = new 
Attribute.AttributeMap.DefaultAttributeMap(); launchAttributes.put(EmbeddedAppLauncher.RUN_ASYNC, true); Configuration conf = new Configuration(false); conf.addResource(this.getClass().getResourceAsStream("/META-
INF/properties.xml")); conf.addResource(this.getClass().getResourceAsStream("/wordcounttest-
properties.xml")); File resultFile = new File("./target/wordcountresult_4.0"); if (resultFile.exists() && !resultFile.delete()) { throw new AssertionError("Failed to delete " + resultFile); } AppHandle appHandle = launcher.launchApp(new Application(), conf,
launchAttributes); long timeoutMillis = System.currentTimeMillis() + 10000; while (!appHandle.isFinished() && System.currentTimeMillis() < timeoutMillis) { Thread.sleep(500); if (resultFile.exists() && resultFile.length() > 0) { break; } } appHandle.shutdown(ShutdownMode.KILL); Assert.assertTrue(resultFile.exists() && resultFile.length() > 0); String result = FileUtils.readFileToString(resultFile); Assert.assertTrue(result, result.contains("MyFirstApplication=5")); }

The flow of the test is as follows:

  1. The test driver creates a launcher for embedded mode.
  2. Load the configuration (from the standard location as well as overrides that are needed for the test.
  3. Make sure that the result area is clean, which means any residual from previous runs should be removed as part of the test initialization.
  4. Instantiate the application and launch it with the embedded cluster, keep the returned handle to check and control the asynchronous execution.
  5. In a loop, check if the application has terminated or cancel it after the expected results are in. In case the logic checks for the expected result files and with the timeout also makes sure that the test does not turn into a runaway process if the application fails to produce the expected results.
  6. After the results are available, check the details with assertions.

The test completes within a few seconds and can easily be run within the IDE whenever a change is made. At this stage, there is no need to package the application, launch it on a Hadoop cluster, and diagnose issues by collecting logs and other information from distributed workers. Instead, when a test fails it is most of the time straightforward to identify the cause and where appropriate step through the code with a debugger.

主站蜘蛛池模板: 印江| 绵竹市| 漾濞| 周口市| 天长市| 平凉市| 咸丰县| 左权县| 佛冈县| 九江市| 绿春县| 盐山县| 额济纳旗| 淮滨县| 曲水县| 本溪| 通化市| 望都县| 堆龙德庆县| 澄江县| 通江县| 措勤县| 日土县| 霍城县| 皮山县| 米泉市| 上思县| 双鸭山市| 昆山市| 万全县| 宝兴县| 铁力市| 鲜城| 元阳县| 温州市| 皋兰县| 阳江市| 宝山区| 景洪市| 沈阳市| 陆丰市|