当对新老版本多比对测试时,对新老版本的返回值做一部处理,可以用先分区再对返回结果进行异步处理。
rstInfo = hiveInfo.repartition(5).javaRDD().mapPartitions(new FlatMapFunction<Iterator, TestCaseResult>() { @Override public Iterator call(Iterator iterator) throws Exception { List testCaseResultList = new ArrayList(); while (iterator.hasNext()) { String request = iterator.next(); TestCaseResult testCaseResult = new TestCaseResult(); String newRequest = request; CompletableFuture future1 = CompletableFuture.supplyAsync(() -> HttpUtil.post(baseUrl, request)); CompletableFuture future2 = CompletableFuture.supplyAsync(() -> HttpUtil.post(testUrl, request)); TestCaseResult entity = CompletableFuture.allOf(future1, future2).thenApply(r -> { //对返回值进行处理 }).join(); testCaseResultList.add(entity); } return testCaseResultList.iterator(); } });
注意,hiveInfo是Dataset格式的。