001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client.coprocessor; 019 020import static org.apache.hadoop.hbase.client.coprocessor.AggregationHelper.getParsedGenericInstance; 021import static org.apache.hadoop.hbase.client.coprocessor.AggregationHelper.validateArgAndGetPB; 022import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 023 024import com.google.protobuf.Message; 025import java.io.IOException; 026import java.util.Map; 027import java.util.NavigableMap; 028import java.util.NavigableSet; 029import java.util.NoSuchElementException; 030import java.util.TreeMap; 031import java.util.concurrent.CompletableFuture; 032import org.apache.hadoop.hbase.Cell; 033import org.apache.hadoop.hbase.HConstants; 034import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer; 035import org.apache.hadoop.hbase.client.AsyncTable; 036import org.apache.hadoop.hbase.client.AsyncTable.CoprocessorCallback; 037import org.apache.hadoop.hbase.client.RegionInfo; 038import org.apache.hadoop.hbase.client.Result; 039import org.apache.hadoop.hbase.client.Scan; 040import org.apache.hadoop.hbase.coprocessor.ColumnInterpreter; 041import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateRequest; 042import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateResponse; 043import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateService; 044import org.apache.hadoop.hbase.util.Bytes; 045import org.apache.hadoop.hbase.util.ReflectionUtils; 046import org.apache.yetus.audience.InterfaceAudience; 047 048/** 049 * This client class is for invoking the aggregate functions deployed on the Region Server side via 050 * the AggregateService. This class will implement the supporting functionality for 051 * summing/processing the individual results obtained from the AggregateService for each region. 052 */ 053@InterfaceAudience.Public 054public final class AsyncAggregationClient { 055 private AsyncAggregationClient() {} 056 057 private static abstract class AbstractAggregationCallback<T> 058 implements CoprocessorCallback<AggregateResponse> { 059 private final CompletableFuture<T> future; 060 061 protected boolean finished = false; 062 063 private void completeExceptionally(Throwable error) { 064 if (finished) { 065 return; 066 } 067 finished = true; 068 future.completeExceptionally(error); 069 } 070 071 protected AbstractAggregationCallback(CompletableFuture<T> future) { 072 this.future = future; 073 } 074 075 @Override 076 public synchronized void onRegionError(RegionInfo region, Throwable error) { 077 completeExceptionally(error); 078 } 079 080 @Override 081 public synchronized void onError(Throwable error) { 082 completeExceptionally(error); 083 } 084 085 protected abstract void aggregate(RegionInfo region, AggregateResponse resp) 086 throws IOException; 087 088 @Override 089 public synchronized void onRegionComplete(RegionInfo region, AggregateResponse resp) { 090 try { 091 aggregate(region, resp); 092 } catch (IOException e) { 093 completeExceptionally(e); 094 } 095 } 096 097 protected abstract T getFinalResult(); 098 099 @Override 100 public synchronized void onComplete() { 101 if (finished) { 102 return; 103 } 104 finished = true; 105 future.complete(getFinalResult()); 106 } 107 } 108 109 private static <R, S, P extends Message, Q extends Message, T extends Message> R 110 getCellValueFromProto(ColumnInterpreter<R, S, P, Q, T> ci, AggregateResponse resp, 111 int firstPartIndex) throws IOException { 112 Q q = getParsedGenericInstance(ci.getClass(), 3, resp.getFirstPart(firstPartIndex)); 113 return ci.getCellValueFromProto(q); 114 } 115 116 private static <R, S, P extends Message, Q extends Message, T extends Message> S 117 getPromotedValueFromProto(ColumnInterpreter<R, S, P, Q, T> ci, AggregateResponse resp, 118 int firstPartIndex) throws IOException { 119 T t = getParsedGenericInstance(ci.getClass(), 4, resp.getFirstPart(firstPartIndex)); 120 return ci.getPromotedValueFromProto(t); 121 } 122 123 private static byte[] nullToEmpty(byte[] b) { 124 return b != null ? b : HConstants.EMPTY_BYTE_ARRAY; 125 } 126 127 public static <R, S, P extends Message, Q extends Message, T extends Message> CompletableFuture<R> 128 max(AsyncTable<?> table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) { 129 CompletableFuture<R> future = new CompletableFuture<>(); 130 AggregateRequest req; 131 try { 132 req = validateArgAndGetPB(scan, ci, false); 133 } catch (IOException e) { 134 future.completeExceptionally(e); 135 return future; 136 } 137 AbstractAggregationCallback<R> callback = new AbstractAggregationCallback<R>(future) { 138 139 private R max; 140 141 @Override 142 protected void aggregate(RegionInfo region, AggregateResponse resp) throws IOException { 143 if (resp.getFirstPartCount() > 0) { 144 R result = getCellValueFromProto(ci, resp, 0); 145 if (max == null || (result != null && ci.compare(max, result) < 0)) { 146 max = result; 147 } 148 } 149 } 150 151 @Override 152 protected R getFinalResult() { 153 return max; 154 } 155 }; 156 table 157 .<AggregateService, AggregateResponse> coprocessorService(AggregateService::newStub, 158 (stub, controller, rpcCallback) -> stub.getMax(controller, req, rpcCallback), callback) 159 .fromRow(nullToEmpty(scan.getStartRow()), scan.includeStartRow()) 160 .toRow(nullToEmpty(scan.getStopRow()), scan.includeStopRow()).execute(); 161 return future; 162 } 163 164 public static <R, S, P extends Message, Q extends Message, T extends Message> CompletableFuture<R> 165 min(AsyncTable<?> table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) { 166 CompletableFuture<R> future = new CompletableFuture<>(); 167 AggregateRequest req; 168 try { 169 req = validateArgAndGetPB(scan, ci, false); 170 } catch (IOException e) { 171 future.completeExceptionally(e); 172 return future; 173 } 174 175 AbstractAggregationCallback<R> callback = new AbstractAggregationCallback<R>(future) { 176 177 private R min; 178 179 @Override 180 protected void aggregate(RegionInfo region, AggregateResponse resp) throws IOException { 181 if (resp.getFirstPartCount() > 0) { 182 R result = getCellValueFromProto(ci, resp, 0); 183 if (min == null || (result != null && ci.compare(min, result) > 0)) { 184 min = result; 185 } 186 } 187 } 188 189 @Override 190 protected R getFinalResult() { 191 return min; 192 } 193 }; 194 table 195 .<AggregateService, AggregateResponse> coprocessorService(AggregateService::newStub, 196 (stub, controller, rpcCallback) -> stub.getMin(controller, req, rpcCallback), callback) 197 .fromRow(nullToEmpty(scan.getStartRow()), scan.includeStartRow()) 198 .toRow(nullToEmpty(scan.getStopRow()), scan.includeStopRow()).execute(); 199 return future; 200 } 201 202 public static <R, S, P extends Message, Q extends Message, T extends Message> 203 CompletableFuture<Long> rowCount(AsyncTable<?> table, ColumnInterpreter<R, S, P, Q, T> ci, 204 Scan scan) { 205 CompletableFuture<Long> future = new CompletableFuture<>(); 206 AggregateRequest req; 207 try { 208 req = validateArgAndGetPB(scan, ci, true); 209 } catch (IOException e) { 210 future.completeExceptionally(e); 211 return future; 212 } 213 AbstractAggregationCallback<Long> callback = new AbstractAggregationCallback<Long>(future) { 214 215 private long count; 216 217 @Override 218 protected void aggregate(RegionInfo region, AggregateResponse resp) throws IOException { 219 count += resp.getFirstPart(0).asReadOnlyByteBuffer().getLong(); 220 } 221 222 @Override 223 protected Long getFinalResult() { 224 return count; 225 } 226 }; 227 table 228 .<AggregateService, AggregateResponse> coprocessorService(AggregateService::newStub, 229 (stub, controller, rpcCallback) -> stub.getRowNum(controller, req, rpcCallback), callback) 230 .fromRow(nullToEmpty(scan.getStartRow()), scan.includeStartRow()) 231 .toRow(nullToEmpty(scan.getStopRow()), scan.includeStopRow()).execute(); 232 return future; 233 } 234 235 public static <R, S, P extends Message, Q extends Message, T extends Message> CompletableFuture<S> 236 sum(AsyncTable<?> table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) { 237 CompletableFuture<S> future = new CompletableFuture<>(); 238 AggregateRequest req; 239 try { 240 req = validateArgAndGetPB(scan, ci, false); 241 } catch (IOException e) { 242 future.completeExceptionally(e); 243 return future; 244 } 245 AbstractAggregationCallback<S> callback = new AbstractAggregationCallback<S>(future) { 246 private S sum; 247 248 @Override 249 protected void aggregate(RegionInfo region, AggregateResponse resp) throws IOException { 250 if (resp.getFirstPartCount() > 0) { 251 S s = getPromotedValueFromProto(ci, resp, 0); 252 sum = ci.add(sum, s); 253 } 254 } 255 256 @Override 257 protected S getFinalResult() { 258 return sum; 259 } 260 }; 261 table 262 .<AggregateService, AggregateResponse> coprocessorService(AggregateService::newStub, 263 (stub, controller, rpcCallback) -> stub.getSum(controller, req, rpcCallback), callback) 264 .fromRow(nullToEmpty(scan.getStartRow()), scan.includeStartRow()) 265 .toRow(nullToEmpty(scan.getStopRow()), scan.includeStopRow()).execute(); 266 return future; 267 } 268 269 public static <R, S, P extends Message, Q extends Message, T extends Message> 270 CompletableFuture<Double> avg(AsyncTable<?> table, ColumnInterpreter<R, S, P, Q, T> ci, 271 Scan scan) { 272 CompletableFuture<Double> future = new CompletableFuture<>(); 273 AggregateRequest req; 274 try { 275 req = validateArgAndGetPB(scan, ci, false); 276 } catch (IOException e) { 277 future.completeExceptionally(e); 278 return future; 279 } 280 AbstractAggregationCallback<Double> callback = new AbstractAggregationCallback<Double>(future) { 281 private S sum; 282 283 long count = 0L; 284 285 @Override 286 protected void aggregate(RegionInfo region, AggregateResponse resp) throws IOException { 287 if (resp.getFirstPartCount() > 0) { 288 sum = ci.add(sum, getPromotedValueFromProto(ci, resp, 0)); 289 count += resp.getSecondPart().asReadOnlyByteBuffer().getLong(); 290 } 291 } 292 293 @Override 294 protected Double getFinalResult() { 295 return ci.divideForAvg(sum, count); 296 } 297 }; 298 table 299 .<AggregateService, AggregateResponse> coprocessorService(AggregateService::newStub, 300 (stub, controller, rpcCallback) -> stub.getAvg(controller, req, rpcCallback), callback) 301 .fromRow(nullToEmpty(scan.getStartRow()), scan.includeStartRow()) 302 .toRow(nullToEmpty(scan.getStopRow()), scan.includeStopRow()).execute(); 303 return future; 304 } 305 306 public static <R, S, P extends Message, Q extends Message, T extends Message> 307 CompletableFuture<Double> std(AsyncTable<?> table, ColumnInterpreter<R, S, P, Q, T> ci, 308 Scan scan) { 309 CompletableFuture<Double> future = new CompletableFuture<>(); 310 AggregateRequest req; 311 try { 312 req = validateArgAndGetPB(scan, ci, false); 313 } catch (IOException e) { 314 future.completeExceptionally(e); 315 return future; 316 } 317 AbstractAggregationCallback<Double> callback = new AbstractAggregationCallback<Double>(future) { 318 319 private S sum; 320 321 private S sumSq; 322 323 private long count; 324 325 @Override 326 protected void aggregate(RegionInfo region, AggregateResponse resp) throws IOException { 327 if (resp.getFirstPartCount() > 0) { 328 sum = ci.add(sum, getPromotedValueFromProto(ci, resp, 0)); 329 sumSq = ci.add(sumSq, getPromotedValueFromProto(ci, resp, 1)); 330 count += resp.getSecondPart().asReadOnlyByteBuffer().getLong(); 331 } 332 } 333 334 @Override 335 protected Double getFinalResult() { 336 double avg = ci.divideForAvg(sum, count); 337 double avgSq = ci.divideForAvg(sumSq, count); 338 return Math.sqrt(avgSq - avg * avg); 339 } 340 }; 341 table 342 .<AggregateService, AggregateResponse> coprocessorService(AggregateService::newStub, 343 (stub, controller, rpcCallback) -> stub.getStd(controller, req, rpcCallback), callback) 344 .fromRow(nullToEmpty(scan.getStartRow()), scan.includeStartRow()) 345 .toRow(nullToEmpty(scan.getStopRow()), scan.includeStopRow()).execute(); 346 return future; 347 } 348 349 // the map key is the startRow of the region 350 private static <R, S, P extends Message, Q extends Message, T extends Message> 351 CompletableFuture<NavigableMap<byte[], S>> 352 sumByRegion(AsyncTable<?> table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) { 353 CompletableFuture<NavigableMap<byte[], S>> future = 354 new CompletableFuture<NavigableMap<byte[], S>>(); 355 AggregateRequest req; 356 try { 357 req = validateArgAndGetPB(scan, ci, false); 358 } catch (IOException e) { 359 future.completeExceptionally(e); 360 return future; 361 } 362 int firstPartIndex = scan.getFamilyMap().get(scan.getFamilies()[0]).size() - 1; 363 AbstractAggregationCallback<NavigableMap<byte[], S>> callback = 364 new AbstractAggregationCallback<NavigableMap<byte[], S>>(future) { 365 366 private final NavigableMap<byte[], S> map = new TreeMap<>(Bytes.BYTES_COMPARATOR); 367 368 @Override 369 protected void aggregate(RegionInfo region, AggregateResponse resp) throws IOException { 370 if (resp.getFirstPartCount() > 0) { 371 map.put(region.getStartKey(), getPromotedValueFromProto(ci, resp, firstPartIndex)); 372 } 373 } 374 375 @Override 376 protected NavigableMap<byte[], S> getFinalResult() { 377 return map; 378 } 379 }; 380 table 381 .<AggregateService, AggregateResponse> coprocessorService(AggregateService::newStub, 382 (stub, controller, rpcCallback) -> stub.getMedian(controller, req, rpcCallback), callback) 383 .fromRow(nullToEmpty(scan.getStartRow()), scan.includeStartRow()) 384 .toRow(nullToEmpty(scan.getStopRow()), scan.includeStopRow()).execute(); 385 return future; 386 } 387 388 private static <R, S, P extends Message, Q extends Message, T extends Message> void findMedian( 389 CompletableFuture<R> future, AsyncTable<AdvancedScanResultConsumer> table, 390 ColumnInterpreter<R, S, P, Q, T> ci, Scan scan, NavigableMap<byte[], S> sumByRegion) { 391 double halfSum = ci.divideForAvg(sumByRegion.values().stream().reduce(ci::add).get(), 2L); 392 S movingSum = null; 393 byte[] startRow = null; 394 for (Map.Entry<byte[], S> entry : sumByRegion.entrySet()) { 395 startRow = entry.getKey(); 396 S newMovingSum = ci.add(movingSum, entry.getValue()); 397 if (ci.divideForAvg(newMovingSum, 1L) > halfSum) { 398 break; 399 } 400 movingSum = newMovingSum; 401 } 402 if (startRow != null) { 403 scan.withStartRow(startRow); 404 } 405 // we can not pass movingSum directly to an anonymous class as it is not final. 406 S baseSum = movingSum; 407 byte[] family = scan.getFamilies()[0]; 408 NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(family); 409 byte[] weightQualifier = qualifiers.last(); 410 byte[] valueQualifier = qualifiers.first(); 411 table.scan(scan, new AdvancedScanResultConsumer() { 412 private S sum = baseSum; 413 414 private R value = null; 415 416 @Override 417 public void onNext(Result[] results, ScanController controller) { 418 try { 419 for (Result result : results) { 420 Cell weightCell = result.getColumnLatestCell(family, weightQualifier); 421 R weight = ci.getValue(family, weightQualifier, weightCell); 422 sum = ci.add(sum, ci.castToReturnType(weight)); 423 if (ci.divideForAvg(sum, 1L) > halfSum) { 424 if (value != null) { 425 future.complete(value); 426 } else { 427 future.completeExceptionally(new NoSuchElementException()); 428 } 429 controller.terminate(); 430 return; 431 } 432 Cell valueCell = result.getColumnLatestCell(family, valueQualifier); 433 value = ci.getValue(family, valueQualifier, valueCell); 434 } 435 } catch (IOException e) { 436 future.completeExceptionally(e); 437 controller.terminate(); 438 } 439 } 440 441 @Override 442 public void onError(Throwable error) { 443 future.completeExceptionally(error); 444 } 445 446 @Override 447 public void onComplete() { 448 if (!future.isDone()) { 449 // we should not reach here as the future should be completed in onNext. 450 future.completeExceptionally(new NoSuchElementException()); 451 } 452 } 453 }); 454 } 455 456 public static <R, S, P extends Message, Q extends Message, T extends Message> 457 CompletableFuture<R> median(AsyncTable<AdvancedScanResultConsumer> table, 458 ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) { 459 CompletableFuture<R> future = new CompletableFuture<>(); 460 addListener(sumByRegion(table, ci, scan), (sumByRegion, error) -> { 461 if (error != null) { 462 future.completeExceptionally(error); 463 } else if (sumByRegion.isEmpty()) { 464 future.completeExceptionally(new NoSuchElementException()); 465 } else { 466 findMedian(future, table, ci, ReflectionUtils.newInstance(scan.getClass(), scan), 467 sumByRegion); 468 } 469 }); 470 return future; 471 } 472}