001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client.coprocessor;
019
020import static org.apache.hadoop.hbase.client.coprocessor.AggregationHelper.getParsedGenericInstance;
021import static org.apache.hadoop.hbase.client.coprocessor.AggregationHelper.validateArgAndGetPB;
022import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
023
024import com.google.protobuf.Message;
025import java.io.IOException;
026import java.util.Map;
027import java.util.NavigableMap;
028import java.util.NavigableSet;
029import java.util.NoSuchElementException;
030import java.util.TreeMap;
031import java.util.concurrent.CompletableFuture;
032import org.apache.hadoop.hbase.Cell;
033import org.apache.hadoop.hbase.HConstants;
034import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer;
035import org.apache.hadoop.hbase.client.AsyncTable;
036import org.apache.hadoop.hbase.client.AsyncTable.CoprocessorCallback;
037import org.apache.hadoop.hbase.client.RegionInfo;
038import org.apache.hadoop.hbase.client.Result;
039import org.apache.hadoop.hbase.client.Scan;
040import org.apache.hadoop.hbase.coprocessor.ColumnInterpreter;
041import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateRequest;
042import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateResponse;
043import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateService;
044import org.apache.hadoop.hbase.util.Bytes;
045import org.apache.hadoop.hbase.util.ReflectionUtils;
046import org.apache.yetus.audience.InterfaceAudience;
047
048/**
049 * This client class is for invoking the aggregate functions deployed on the Region Server side via
050 * the AggregateService. This class will implement the supporting functionality for
051 * summing/processing the individual results obtained from the AggregateService for each region.
052 */
053@InterfaceAudience.Public
054public final class AsyncAggregationClient {
055  private AsyncAggregationClient() {}
056
057  private static abstract class AbstractAggregationCallback<T>
058      implements CoprocessorCallback<AggregateResponse> {
059    private final CompletableFuture<T> future;
060
061    protected boolean finished = false;
062
063    private void completeExceptionally(Throwable error) {
064      if (finished) {
065        return;
066      }
067      finished = true;
068      future.completeExceptionally(error);
069    }
070
071    protected AbstractAggregationCallback(CompletableFuture<T> future) {
072      this.future = future;
073    }
074
075    @Override
076    public synchronized void onRegionError(RegionInfo region, Throwable error) {
077      completeExceptionally(error);
078    }
079
080    @Override
081    public synchronized void onError(Throwable error) {
082      completeExceptionally(error);
083    }
084
085    protected abstract void aggregate(RegionInfo region, AggregateResponse resp)
086        throws IOException;
087
088    @Override
089    public synchronized void onRegionComplete(RegionInfo region, AggregateResponse resp) {
090      try {
091        aggregate(region, resp);
092      } catch (IOException e) {
093        completeExceptionally(e);
094      }
095    }
096
097    protected abstract T getFinalResult();
098
099    @Override
100    public synchronized void onComplete() {
101      if (finished) {
102        return;
103      }
104      finished = true;
105      future.complete(getFinalResult());
106    }
107  }
108
109  private static <R, S, P extends Message, Q extends Message, T extends Message> R
110      getCellValueFromProto(ColumnInterpreter<R, S, P, Q, T> ci, AggregateResponse resp,
111          int firstPartIndex) throws IOException {
112    Q q = getParsedGenericInstance(ci.getClass(), 3, resp.getFirstPart(firstPartIndex));
113    return ci.getCellValueFromProto(q);
114  }
115
116  private static <R, S, P extends Message, Q extends Message, T extends Message> S
117      getPromotedValueFromProto(ColumnInterpreter<R, S, P, Q, T> ci, AggregateResponse resp,
118          int firstPartIndex) throws IOException {
119    T t = getParsedGenericInstance(ci.getClass(), 4, resp.getFirstPart(firstPartIndex));
120    return ci.getPromotedValueFromProto(t);
121  }
122
123  private static byte[] nullToEmpty(byte[] b) {
124    return b != null ? b : HConstants.EMPTY_BYTE_ARRAY;
125  }
126
127  public static <R, S, P extends Message, Q extends Message, T extends Message> CompletableFuture<R>
128      max(AsyncTable<?> table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) {
129    CompletableFuture<R> future = new CompletableFuture<>();
130    AggregateRequest req;
131    try {
132      req = validateArgAndGetPB(scan, ci, false);
133    } catch (IOException e) {
134      future.completeExceptionally(e);
135      return future;
136    }
137    AbstractAggregationCallback<R> callback = new AbstractAggregationCallback<R>(future) {
138
139      private R max;
140
141      @Override
142      protected void aggregate(RegionInfo region, AggregateResponse resp) throws IOException {
143        if (resp.getFirstPartCount() > 0) {
144          R result = getCellValueFromProto(ci, resp, 0);
145          if (max == null || (result != null && ci.compare(max, result) < 0)) {
146            max = result;
147          }
148        }
149      }
150
151      @Override
152      protected R getFinalResult() {
153        return max;
154      }
155    };
156    table
157        .<AggregateService, AggregateResponse> coprocessorService(AggregateService::newStub,
158          (stub, controller, rpcCallback) -> stub.getMax(controller, req, rpcCallback), callback)
159        .fromRow(nullToEmpty(scan.getStartRow()), scan.includeStartRow())
160        .toRow(nullToEmpty(scan.getStopRow()), scan.includeStopRow()).execute();
161    return future;
162  }
163
164  public static <R, S, P extends Message, Q extends Message, T extends Message> CompletableFuture<R>
165      min(AsyncTable<?> table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) {
166    CompletableFuture<R> future = new CompletableFuture<>();
167    AggregateRequest req;
168    try {
169      req = validateArgAndGetPB(scan, ci, false);
170    } catch (IOException e) {
171      future.completeExceptionally(e);
172      return future;
173    }
174
175    AbstractAggregationCallback<R> callback = new AbstractAggregationCallback<R>(future) {
176
177      private R min;
178
179      @Override
180      protected void aggregate(RegionInfo region, AggregateResponse resp) throws IOException {
181        if (resp.getFirstPartCount() > 0) {
182          R result = getCellValueFromProto(ci, resp, 0);
183          if (min == null || (result != null && ci.compare(min, result) > 0)) {
184            min = result;
185          }
186        }
187      }
188
189      @Override
190      protected R getFinalResult() {
191        return min;
192      }
193    };
194    table
195        .<AggregateService, AggregateResponse> coprocessorService(AggregateService::newStub,
196          (stub, controller, rpcCallback) -> stub.getMin(controller, req, rpcCallback), callback)
197        .fromRow(nullToEmpty(scan.getStartRow()), scan.includeStartRow())
198        .toRow(nullToEmpty(scan.getStopRow()), scan.includeStopRow()).execute();
199    return future;
200  }
201
202  public static <R, S, P extends Message, Q extends Message, T extends Message>
203      CompletableFuture<Long> rowCount(AsyncTable<?> table, ColumnInterpreter<R, S, P, Q, T> ci,
204          Scan scan) {
205    CompletableFuture<Long> future = new CompletableFuture<>();
206    AggregateRequest req;
207    try {
208      req = validateArgAndGetPB(scan, ci, true);
209    } catch (IOException e) {
210      future.completeExceptionally(e);
211      return future;
212    }
213    AbstractAggregationCallback<Long> callback = new AbstractAggregationCallback<Long>(future) {
214
215      private long count;
216
217      @Override
218      protected void aggregate(RegionInfo region, AggregateResponse resp) throws IOException {
219        count += resp.getFirstPart(0).asReadOnlyByteBuffer().getLong();
220      }
221
222      @Override
223      protected Long getFinalResult() {
224        return count;
225      }
226    };
227    table
228        .<AggregateService, AggregateResponse> coprocessorService(AggregateService::newStub,
229          (stub, controller, rpcCallback) -> stub.getRowNum(controller, req, rpcCallback), callback)
230        .fromRow(nullToEmpty(scan.getStartRow()), scan.includeStartRow())
231        .toRow(nullToEmpty(scan.getStopRow()), scan.includeStopRow()).execute();
232    return future;
233  }
234
235  public static <R, S, P extends Message, Q extends Message, T extends Message> CompletableFuture<S>
236      sum(AsyncTable<?> table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) {
237    CompletableFuture<S> future = new CompletableFuture<>();
238    AggregateRequest req;
239    try {
240      req = validateArgAndGetPB(scan, ci, false);
241    } catch (IOException e) {
242      future.completeExceptionally(e);
243      return future;
244    }
245    AbstractAggregationCallback<S> callback = new AbstractAggregationCallback<S>(future) {
246      private S sum;
247
248      @Override
249      protected void aggregate(RegionInfo region, AggregateResponse resp) throws IOException {
250        if (resp.getFirstPartCount() > 0) {
251          S s = getPromotedValueFromProto(ci, resp, 0);
252          sum = ci.add(sum, s);
253        }
254      }
255
256      @Override
257      protected S getFinalResult() {
258        return sum;
259      }
260    };
261    table
262        .<AggregateService, AggregateResponse> coprocessorService(AggregateService::newStub,
263          (stub, controller, rpcCallback) -> stub.getSum(controller, req, rpcCallback), callback)
264        .fromRow(nullToEmpty(scan.getStartRow()), scan.includeStartRow())
265        .toRow(nullToEmpty(scan.getStopRow()), scan.includeStopRow()).execute();
266    return future;
267  }
268
269  public static <R, S, P extends Message, Q extends Message, T extends Message>
270      CompletableFuture<Double> avg(AsyncTable<?> table, ColumnInterpreter<R, S, P, Q, T> ci,
271          Scan scan) {
272    CompletableFuture<Double> future = new CompletableFuture<>();
273    AggregateRequest req;
274    try {
275      req = validateArgAndGetPB(scan, ci, false);
276    } catch (IOException e) {
277      future.completeExceptionally(e);
278      return future;
279    }
280    AbstractAggregationCallback<Double> callback = new AbstractAggregationCallback<Double>(future) {
281      private S sum;
282
283      long count = 0L;
284
285      @Override
286      protected void aggregate(RegionInfo region, AggregateResponse resp) throws IOException {
287        if (resp.getFirstPartCount() > 0) {
288          sum = ci.add(sum, getPromotedValueFromProto(ci, resp, 0));
289          count += resp.getSecondPart().asReadOnlyByteBuffer().getLong();
290        }
291      }
292
293      @Override
294      protected Double getFinalResult() {
295        return ci.divideForAvg(sum, count);
296      }
297    };
298    table
299        .<AggregateService, AggregateResponse> coprocessorService(AggregateService::newStub,
300          (stub, controller, rpcCallback) -> stub.getAvg(controller, req, rpcCallback), callback)
301        .fromRow(nullToEmpty(scan.getStartRow()), scan.includeStartRow())
302        .toRow(nullToEmpty(scan.getStopRow()), scan.includeStopRow()).execute();
303    return future;
304  }
305
306  public static <R, S, P extends Message, Q extends Message, T extends Message>
307      CompletableFuture<Double> std(AsyncTable<?> table, ColumnInterpreter<R, S, P, Q, T> ci,
308          Scan scan) {
309    CompletableFuture<Double> future = new CompletableFuture<>();
310    AggregateRequest req;
311    try {
312      req = validateArgAndGetPB(scan, ci, false);
313    } catch (IOException e) {
314      future.completeExceptionally(e);
315      return future;
316    }
317    AbstractAggregationCallback<Double> callback = new AbstractAggregationCallback<Double>(future) {
318
319      private S sum;
320
321      private S sumSq;
322
323      private long count;
324
325      @Override
326      protected void aggregate(RegionInfo region, AggregateResponse resp) throws IOException {
327        if (resp.getFirstPartCount() > 0) {
328          sum = ci.add(sum, getPromotedValueFromProto(ci, resp, 0));
329          sumSq = ci.add(sumSq, getPromotedValueFromProto(ci, resp, 1));
330          count += resp.getSecondPart().asReadOnlyByteBuffer().getLong();
331        }
332      }
333
334      @Override
335      protected Double getFinalResult() {
336        double avg = ci.divideForAvg(sum, count);
337        double avgSq = ci.divideForAvg(sumSq, count);
338        return Math.sqrt(avgSq - avg * avg);
339      }
340    };
341    table
342        .<AggregateService, AggregateResponse> coprocessorService(AggregateService::newStub,
343          (stub, controller, rpcCallback) -> stub.getStd(controller, req, rpcCallback), callback)
344        .fromRow(nullToEmpty(scan.getStartRow()), scan.includeStartRow())
345        .toRow(nullToEmpty(scan.getStopRow()), scan.includeStopRow()).execute();
346    return future;
347  }
348
349  // the map key is the startRow of the region
350  private static <R, S, P extends Message, Q extends Message, T extends Message>
351      CompletableFuture<NavigableMap<byte[], S>>
352      sumByRegion(AsyncTable<?> table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) {
353    CompletableFuture<NavigableMap<byte[], S>> future =
354        new CompletableFuture<NavigableMap<byte[], S>>();
355    AggregateRequest req;
356    try {
357      req = validateArgAndGetPB(scan, ci, false);
358    } catch (IOException e) {
359      future.completeExceptionally(e);
360      return future;
361    }
362    int firstPartIndex = scan.getFamilyMap().get(scan.getFamilies()[0]).size() - 1;
363    AbstractAggregationCallback<NavigableMap<byte[], S>> callback =
364        new AbstractAggregationCallback<NavigableMap<byte[], S>>(future) {
365
366      private final NavigableMap<byte[], S> map = new TreeMap<>(Bytes.BYTES_COMPARATOR);
367
368        @Override
369        protected void aggregate(RegionInfo region, AggregateResponse resp) throws IOException {
370          if (resp.getFirstPartCount() > 0) {
371            map.put(region.getStartKey(), getPromotedValueFromProto(ci, resp, firstPartIndex));
372          }
373        }
374
375        @Override
376        protected NavigableMap<byte[], S> getFinalResult() {
377          return map;
378        }
379      };
380    table
381        .<AggregateService, AggregateResponse> coprocessorService(AggregateService::newStub,
382          (stub, controller, rpcCallback) -> stub.getMedian(controller, req, rpcCallback), callback)
383        .fromRow(nullToEmpty(scan.getStartRow()), scan.includeStartRow())
384        .toRow(nullToEmpty(scan.getStopRow()), scan.includeStopRow()).execute();
385    return future;
386  }
387
388  private static <R, S, P extends Message, Q extends Message, T extends Message> void findMedian(
389          CompletableFuture<R> future, AsyncTable<AdvancedScanResultConsumer> table,
390          ColumnInterpreter<R, S, P, Q, T> ci, Scan scan, NavigableMap<byte[], S> sumByRegion) {
391    double halfSum = ci.divideForAvg(sumByRegion.values().stream().reduce(ci::add).get(), 2L);
392    S movingSum = null;
393    byte[] startRow = null;
394    for (Map.Entry<byte[], S> entry : sumByRegion.entrySet()) {
395      startRow = entry.getKey();
396      S newMovingSum = ci.add(movingSum, entry.getValue());
397      if (ci.divideForAvg(newMovingSum, 1L) > halfSum) {
398        break;
399      }
400      movingSum = newMovingSum;
401    }
402    if (startRow != null) {
403      scan.withStartRow(startRow);
404    }
405    // we can not pass movingSum directly to an anonymous class as it is not final.
406    S baseSum = movingSum;
407    byte[] family = scan.getFamilies()[0];
408    NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(family);
409    byte[] weightQualifier = qualifiers.last();
410    byte[] valueQualifier = qualifiers.first();
411    table.scan(scan, new AdvancedScanResultConsumer() {
412      private S sum = baseSum;
413
414      private R value = null;
415
416      @Override
417      public void onNext(Result[] results, ScanController controller) {
418        try {
419          for (Result result : results) {
420            Cell weightCell = result.getColumnLatestCell(family, weightQualifier);
421            R weight = ci.getValue(family, weightQualifier, weightCell);
422            sum = ci.add(sum, ci.castToReturnType(weight));
423            if (ci.divideForAvg(sum, 1L) > halfSum) {
424              if (value != null) {
425                future.complete(value);
426              } else {
427                future.completeExceptionally(new NoSuchElementException());
428              }
429              controller.terminate();
430              return;
431            }
432            Cell valueCell = result.getColumnLatestCell(family, valueQualifier);
433            value = ci.getValue(family, valueQualifier, valueCell);
434          }
435        } catch (IOException e) {
436          future.completeExceptionally(e);
437          controller.terminate();
438        }
439      }
440
441      @Override
442      public void onError(Throwable error) {
443        future.completeExceptionally(error);
444      }
445
446      @Override
447      public void onComplete() {
448        if (!future.isDone()) {
449          // we should not reach here as the future should be completed in onNext.
450          future.completeExceptionally(new NoSuchElementException());
451        }
452      }
453    });
454  }
455
456  public static <R, S, P extends Message, Q extends Message, T extends Message>
457      CompletableFuture<R> median(AsyncTable<AdvancedScanResultConsumer> table,
458      ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) {
459    CompletableFuture<R> future = new CompletableFuture<>();
460    addListener(sumByRegion(table, ci, scan), (sumByRegion, error) -> {
461      if (error != null) {
462        future.completeExceptionally(error);
463      } else if (sumByRegion.isEmpty()) {
464        future.completeExceptionally(new NoSuchElementException());
465      } else {
466        findMedian(future, table, ci, ReflectionUtils.newInstance(scan.getClass(), scan),
467          sumByRegion);
468      }
469    });
470    return future;
471  }
472}