001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client.coprocessor;
019
020import static org.apache.hadoop.hbase.client.coprocessor.AggregationHelper.getParsedGenericInstance;
021import static org.apache.hadoop.hbase.client.coprocessor.AggregationHelper.validateArgAndGetPB;
022
023import com.google.protobuf.ByteString;
024import com.google.protobuf.Message;
025import com.google.protobuf.RpcCallback;
026import com.google.protobuf.RpcController;
027
028import java.io.Closeable;
029import java.io.IOException;
030import java.nio.ByteBuffer;
031import java.util.ArrayList;
032import java.util.List;
033import java.util.Map;
034import java.util.NavigableMap;
035import java.util.NavigableSet;
036import java.util.TreeMap;
037import java.util.concurrent.atomic.AtomicLong;
038
039import org.apache.hadoop.conf.Configuration;
040import org.apache.hadoop.hbase.Cell;
041import org.apache.hadoop.hbase.HConstants;
042import org.apache.hadoop.hbase.TableName;
043import org.apache.hadoop.hbase.client.Connection;
044import org.apache.hadoop.hbase.client.ConnectionFactory;
045import org.apache.hadoop.hbase.client.Result;
046import org.apache.hadoop.hbase.client.ResultScanner;
047import org.apache.hadoop.hbase.client.Scan;
048import org.apache.hadoop.hbase.client.Table;
049import org.apache.hadoop.hbase.coprocessor.ColumnInterpreter;
050import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
051import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateRequest;
052import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateResponse;
053import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateService;
054import org.apache.hadoop.hbase.util.Bytes;
055import org.apache.hadoop.hbase.util.Pair;
056import org.apache.yetus.audience.InterfaceAudience;
057import org.slf4j.Logger;
058import org.slf4j.LoggerFactory;
059
060/**
061 * This client class is for invoking the aggregate functions deployed on the
062 * Region Server side via the AggregateService. This class will implement the
063 * supporting functionality for summing/processing the individual results
064 * obtained from the AggregateService for each region.
065 * <p>
066 * This will serve as the client side handler for invoking the aggregate
067 * functions.
068 * For all aggregate functions,
069 * <ul>
070 * <li>start row &lt; end row is an essential condition (if they are not
071 * {@link HConstants#EMPTY_BYTE_ARRAY})
072 * <li>Column family can't be null. In case where multiple families are
073 * provided, an IOException will be thrown. An optional column qualifier can
074 * also be defined.</li>
075 * <li>For methods to find maximum, minimum, sum, rowcount, it returns the
076 * parameter type. For average and std, it returns a double value. For row
077 * count, it returns a long value.</li>
078 * </ul>
079 * <p>Call {@link #close()} when done.
080 */
081@InterfaceAudience.Public
082public class AggregationClient implements Closeable {
083  // TODO: This class is not used.  Move to examples?
084  private static final Logger log = LoggerFactory.getLogger(AggregationClient.class);
085  private final Connection connection;
086
087  /**
088   * An RpcController implementation for use here in this endpoint.
089   */
090  static class AggregationClientRpcController implements RpcController {
091    private String errorText;
092    private boolean cancelled = false;
093    private boolean failed = false;
094
095    @Override
096    public String errorText() {
097      return this.errorText;
098    }
099
100    @Override
101    public boolean failed() {
102      return this.failed;
103    }
104
105    @Override
106    public boolean isCanceled() {
107      return this.cancelled;
108    }
109
110    @Override
111    public void notifyOnCancel(RpcCallback<Object> arg0) {
112      throw new UnsupportedOperationException();
113    }
114
115    @Override
116    public void reset() {
117      this.errorText = null;
118      this.cancelled = false;
119      this.failed = false;
120    }
121
122    @Override
123    public void setFailed(String errorText) {
124      this.failed = true;
125      this.errorText = errorText;
126    }
127
128    @Override
129    public void startCancel() {
130      this.cancelled = true;
131    }
132  }
133
134  /**
135   * Constructor with Conf object
136   * @param cfg Configuration to use
137   */
138  public AggregationClient(Configuration cfg) {
139    try {
140      // Create a connection on construction. Will use it making each of the calls below.
141      this.connection = ConnectionFactory.createConnection(cfg);
142    } catch (IOException e) {
143      throw new RuntimeException(e);
144    }
145  }
146
147  @Override
148  public void close() throws IOException {
149    if (this.connection != null && !this.connection.isClosed()) {
150      this.connection.close();
151    }
152  }
153
154  /**
155   * It gives the maximum value of a column for a given column family for the
156   * given range. In case qualifier is null, a max of all values for the given
157   * family is returned.
158   * @param tableName the name of the table to scan
159   * @param ci the user's ColumnInterpreter implementation
160   * @param scan the HBase scan object to use to read data from HBase
161   * @return max val &lt;R&gt;
162   * @throws Throwable The caller is supposed to handle the exception as they are thrown
163   *           &amp; propagated to it.
164   */
165  public <R, S, P extends Message, Q extends Message, T extends Message> R max(
166          final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
167          throws Throwable {
168    try (Table table = connection.getTable(tableName)) {
169      return max(table, ci, scan);
170    }
171  }
172
173  /**
174   * It gives the maximum value of a column for a given column family for the
175   * given range. In case qualifier is null, a max of all values for the given
176   * family is returned.
177   * @param table table to scan.
178   * @param ci the user's ColumnInterpreter implementation
179   * @param scan the HBase scan object to use to read data from HBase
180   * @return max val &lt;&gt;
181   * @throws Throwable The caller is supposed to handle the exception as they are thrown
182   *           &amp; propagated to it.
183   */
184  public <R, S, P extends Message, Q extends Message, T extends Message>
185    R max(final Table table, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
186          throws Throwable {
187    final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false);
188    class MaxCallBack implements Batch.Callback<R> {
189      R max = null;
190
191      R getMax() {
192        return max;
193      }
194
195      @Override
196      public synchronized void update(byte[] region, byte[] row, R result) {
197        max = (max == null || (result != null && ci.compare(max, result) < 0)) ? result : max;
198      }
199    }
200    MaxCallBack aMaxCallBack = new MaxCallBack();
201    table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
202        new Batch.Call<AggregateService, R>() {
203          @Override
204          public R call(AggregateService instance) throws IOException {
205            RpcController controller = new AggregationClientRpcController();
206            CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse> rpcCallback =
207                new CoprocessorRpcUtils.BlockingRpcCallback<>();
208            instance.getMax(controller, requestArg, rpcCallback);
209            AggregateResponse response = rpcCallback.get();
210            if (controller.failed()) {
211              throw new IOException(controller.errorText());
212            }
213            if (response.getFirstPartCount() > 0) {
214              ByteString b = response.getFirstPart(0);
215              Q q = getParsedGenericInstance(ci.getClass(), 3, b);
216              return ci.getCellValueFromProto(q);
217            }
218            return null;
219          }
220        }, aMaxCallBack);
221    return aMaxCallBack.getMax();
222  }
223
224  /**
225   * It gives the minimum value of a column for a given column family for the
226   * given range. In case qualifier is null, a min of all values for the given
227   * family is returned.
228   * @param tableName the name of the table to scan
229   * @param ci the user's ColumnInterpreter implementation
230   * @param scan the HBase scan object to use to read data from HBase
231   * @return min val &lt;R&gt;
232   * @throws Throwable The caller is supposed to handle the exception as they are thrown
233   *           &amp; propagated to it.
234   */
235  public <R, S, P extends Message, Q extends Message, T extends Message> R min(
236          final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
237          throws Throwable {
238    try (Table table = connection.getTable(tableName)) {
239      return min(table, ci, scan);
240    }
241  }
242
243  /**
244   * It gives the minimum value of a column for a given column family for the
245   * given range. In case qualifier is null, a min of all values for the given
246   * family is returned.
247   * @param table table to scan.
248   * @param ci the user's ColumnInterpreter implementation
249   * @param scan the HBase scan object to use to read data from HBase
250   * @return min val &lt;R&gt;
251   * @throws Throwable The caller is supposed to handle the exception as they are thrown
252   *           &amp; propagated to it.
253   */
254  public <R, S, P extends Message, Q extends Message, T extends Message>
255    R min(final Table table, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
256          throws Throwable {
257    final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false);
258    class MinCallBack implements Batch.Callback<R> {
259      private R min = null;
260
261      public R getMinimum() {
262        return min;
263      }
264
265      @Override
266      public synchronized void update(byte[] region, byte[] row, R result) {
267        min = (min == null || (result != null && ci.compare(result, min) < 0)) ? result : min;
268      }
269    }
270
271    MinCallBack minCallBack = new MinCallBack();
272    table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
273        new Batch.Call<AggregateService, R>() {
274          @Override
275          public R call(AggregateService instance) throws IOException {
276            RpcController controller = new AggregationClientRpcController();
277            CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse> rpcCallback =
278                new CoprocessorRpcUtils.BlockingRpcCallback<>();
279            instance.getMin(controller, requestArg, rpcCallback);
280            AggregateResponse response = rpcCallback.get();
281            if (controller.failed()) {
282              throw new IOException(controller.errorText());
283            }
284            if (response.getFirstPartCount() > 0) {
285              ByteString b = response.getFirstPart(0);
286              Q q = getParsedGenericInstance(ci.getClass(), 3, b);
287              return ci.getCellValueFromProto(q);
288            }
289            return null;
290          }
291        }, minCallBack);
292    log.debug("Min fom all regions is: " + minCallBack.getMinimum());
293    return minCallBack.getMinimum();
294  }
295
296  /**
297   * It gives the row count, by summing up the individual results obtained from
298   * regions. In case the qualifier is null, FirstKeyValueFilter is used to
299   * optimised the operation. In case qualifier is provided, I can't use the
300   * filter as it may set the flag to skip to next row, but the value read is
301   * not of the given filter: in this case, this particular row will not be
302   * counted ==&gt; an error.
303   * @param tableName the name of the table to scan
304   * @param ci the user's ColumnInterpreter implementation
305   * @param scan the HBase scan object to use to read data from HBase
306   * @return &lt;R, S&gt;
307   * @throws Throwable The caller is supposed to handle the exception as they are thrown
308   *           &amp; propagated to it.
309   */
310  public <R, S, P extends Message, Q extends Message, T extends Message> long rowCount(
311          final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
312          throws Throwable {
313    try (Table table = connection.getTable(tableName)) {
314      return rowCount(table, ci, scan);
315    }
316  }
317
318  /**
319   * It gives the row count, by summing up the individual results obtained from
320   * regions. In case the qualifier is null, FirstKeyValueFilter is used to
321   * optimised the operation. In case qualifier is provided, I can't use the
322   * filter as it may set the flag to skip to next row, but the value read is
323   * not of the given filter: in this case, this particular row will not be
324   * counted ==&gt; an error.
325   * @param table table to scan.
326   * @param ci the user's ColumnInterpreter implementation
327   * @param scan the HBase scan object to use to read data from HBase
328   * @return &lt;R, S&gt;
329   * @throws Throwable The caller is supposed to handle the exception as they are thrown
330   *           &amp; propagated to it.
331   */
332  public <R, S, P extends Message, Q extends Message, T extends Message>
333    long rowCount(final Table table, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
334          throws Throwable {
335    final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, true);
336    class RowNumCallback implements Batch.Callback<Long> {
337      private final AtomicLong rowCountL = new AtomicLong(0);
338
339      public long getRowNumCount() {
340        return rowCountL.get();
341      }
342
343      @Override
344      public void update(byte[] region, byte[] row, Long result) {
345        rowCountL.addAndGet(result.longValue());
346      }
347    }
348
349    RowNumCallback rowNum = new RowNumCallback();
350    table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
351        new Batch.Call<AggregateService, Long>() {
352          @Override
353          public Long call(AggregateService instance) throws IOException {
354            RpcController controller = new AggregationClientRpcController();
355            CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse> rpcCallback =
356                new CoprocessorRpcUtils.BlockingRpcCallback<>();
357            instance.getRowNum(controller, requestArg, rpcCallback);
358            AggregateResponse response = rpcCallback.get();
359            if (controller.failed()) {
360              throw new IOException(controller.errorText());
361            }
362            byte[] bytes = getBytesFromResponse(response.getFirstPart(0));
363            ByteBuffer bb = ByteBuffer.allocate(8).put(bytes);
364            bb.rewind();
365            return bb.getLong();
366          }
367        }, rowNum);
368    return rowNum.getRowNumCount();
369  }
370
371  /**
372   * It sums up the value returned from various regions. In case qualifier is
373   * null, summation of all the column qualifiers in the given family is done.
374   * @param tableName the name of the table to scan
375   * @param ci the user's ColumnInterpreter implementation
376   * @param scan the HBase scan object to use to read data from HBase
377   * @return sum &lt;S&gt;
378   * @throws Throwable The caller is supposed to handle the exception as they are thrown
379   *           &amp; propagated to it.
380   */
381  public <R, S, P extends Message, Q extends Message, T extends Message> S sum(
382      final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
383    throws Throwable {
384    try (Table table = connection.getTable(tableName)) {
385      return sum(table, ci, scan);
386    }
387  }
388
389  /**
390   * It sums up the value returned from various regions. In case qualifier is
391   * null, summation of all the column qualifiers in the given family is done.
392   * @param table table to scan.
393   * @param ci the user's ColumnInterpreter implementation
394   * @param scan the HBase scan object to use to read data from HBase
395   * @return sum &lt;S&gt;
396   * @throws Throwable The caller is supposed to handle the exception as they are thrown
397   *           &amp; propagated to it.
398   */
399  public <R, S, P extends Message, Q extends Message, T extends Message>
400    S sum(final Table table, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
401          throws Throwable {
402    final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false);
403
404    class SumCallBack implements Batch.Callback<S> {
405      S sumVal = null;
406
407      public S getSumResult() {
408        return sumVal;
409      }
410
411      @Override
412      public synchronized void update(byte[] region, byte[] row, S result) {
413        sumVal = ci.add(sumVal, result);
414      }
415    }
416    SumCallBack sumCallBack = new SumCallBack();
417    table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
418        new Batch.Call<AggregateService, S>() {
419          @Override
420          public S call(AggregateService instance) throws IOException {
421            RpcController controller = new AggregationClientRpcController();
422            // Not sure what is going on here why I have to do these casts. TODO.
423            CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse> rpcCallback =
424                new CoprocessorRpcUtils.BlockingRpcCallback<>();
425            instance.getSum(controller, requestArg, rpcCallback);
426            AggregateResponse response = rpcCallback.get();
427            if (controller.failed()) {
428              throw new IOException(controller.errorText());
429            }
430            if (response.getFirstPartCount() == 0) {
431              return null;
432            }
433            ByteString b = response.getFirstPart(0);
434            T t = getParsedGenericInstance(ci.getClass(), 4, b);
435            S s = ci.getPromotedValueFromProto(t);
436            return s;
437          }
438        }, sumCallBack);
439    return sumCallBack.getSumResult();
440  }
441
442  /**
443   * It computes average while fetching sum and row count from all the
444   * corresponding regions. Approach is to compute a global sum of region level
445   * sum and rowcount and then compute the average.
446   * @param tableName the name of the table to scan
447   * @param scan the HBase scan object to use to read data from HBase
448   * @throws Throwable The caller is supposed to handle the exception as they are thrown
449   *           &amp; propagated to it.
450   */
451  private <R, S, P extends Message, Q extends Message, T extends Message> Pair<S, Long> getAvgArgs(
452      final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
453      throws Throwable {
454    try (Table table = connection.getTable(tableName)) {
455      return getAvgArgs(table, ci, scan);
456    }
457  }
458
459  /**
460   * It computes average while fetching sum and row count from all the
461   * corresponding regions. Approach is to compute a global sum of region level
462   * sum and rowcount and then compute the average.
463   * @param table table to scan.
464   * @param scan the HBase scan object to use to read data from HBase
465   * @throws Throwable The caller is supposed to handle the exception as they are thrown
466   *           &amp; propagated to it.
467   */
468  private <R, S, P extends Message, Q extends Message, T extends Message>
469    Pair<S, Long> getAvgArgs(final Table table, final ColumnInterpreter<R, S, P, Q, T> ci,
470          final Scan scan) throws Throwable {
471    final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false);
472    class AvgCallBack implements Batch.Callback<Pair<S, Long>> {
473      S sum = null;
474      Long rowCount = 0L;
475
476      public synchronized Pair<S, Long> getAvgArgs() {
477        return new Pair<>(sum, rowCount);
478      }
479
480      @Override
481      public synchronized void update(byte[] region, byte[] row, Pair<S, Long> result) {
482        sum = ci.add(sum, result.getFirst());
483        rowCount += result.getSecond();
484      }
485    }
486
487    AvgCallBack avgCallBack = new AvgCallBack();
488    table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
489        new Batch.Call<AggregateService, Pair<S, Long>>() {
490          @Override
491          public Pair<S, Long> call(AggregateService instance) throws IOException {
492            RpcController controller = new AggregationClientRpcController();
493            CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse> rpcCallback =
494                new CoprocessorRpcUtils.BlockingRpcCallback<>();
495            instance.getAvg(controller, requestArg, rpcCallback);
496            AggregateResponse response = rpcCallback.get();
497            if (controller.failed()) {
498              throw new IOException(controller.errorText());
499            }
500            Pair<S, Long> pair = new Pair<>(null, 0L);
501            if (response.getFirstPartCount() == 0) {
502              return pair;
503            }
504            ByteString b = response.getFirstPart(0);
505            T t = getParsedGenericInstance(ci.getClass(), 4, b);
506            S s = ci.getPromotedValueFromProto(t);
507            pair.setFirst(s);
508            ByteBuffer bb = ByteBuffer.allocate(8).put(
509                getBytesFromResponse(response.getSecondPart()));
510            bb.rewind();
511            pair.setSecond(bb.getLong());
512            return pair;
513          }
514        }, avgCallBack);
515    return avgCallBack.getAvgArgs();
516  }
517
518  /**
519   * This is the client side interface/handle for calling the average method for
520   * a given cf-cq combination. It was necessary to add one more call stack as
521   * its return type should be a decimal value, irrespective of what
522   * columninterpreter says. So, this methods collects the necessary parameters
523   * to compute the average and returs the double value.
524   * @param tableName the name of the table to scan
525   * @param ci the user's ColumnInterpreter implementation
526   * @param scan the HBase scan object to use to read data from HBase
527   * @return &lt;R, S&gt;
528   * @throws Throwable The caller is supposed to handle the exception as they are thrown
529   *           &amp; propagated to it.
530   */
531  public <R, S, P extends Message, Q extends Message, T extends Message>
532    double avg(final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci,
533          Scan scan) throws Throwable {
534    Pair<S, Long> p = getAvgArgs(tableName, ci, scan);
535    return ci.divideForAvg(p.getFirst(), p.getSecond());
536  }
537
538  /**
539   * This is the client side interface/handle for calling the average method for
540   * a given cf-cq combination. It was necessary to add one more call stack as
541   * its return type should be a decimal value, irrespective of what
542   * columninterpreter says. So, this methods collects the necessary parameters
543   * to compute the average and returs the double value.
544   * @param table table to scan.
545   * @param ci the user's ColumnInterpreter implementation
546   * @param scan the HBase scan object to use to read data from HBase
547   * @return &lt;R, S&gt;
548   * @throws Throwable The caller is supposed to handle the exception as they are thrown
549   *           &amp; propagated to it.
550   */
551  public <R, S, P extends Message, Q extends Message, T extends Message> double avg(
552          final Table table, final ColumnInterpreter<R, S, P, Q, T> ci, Scan scan)
553          throws Throwable {
554    Pair<S, Long> p = getAvgArgs(table, ci, scan);
555    return ci.divideForAvg(p.getFirst(), p.getSecond());
556  }
557
558  /**
559   * It computes a global standard deviation for a given column and its value.
560   * Standard deviation is square root of (average of squares -
561   * average*average). From individual regions, it obtains sum, square sum and
562   * number of rows. With these, the above values are computed to get the global
563   * std.
564   * @param table table to scan.
565   * @param scan the HBase scan object to use to read data from HBase
566   * @return standard deviations
567   * @throws Throwable The caller is supposed to handle the exception as they are thrown
568   *           &amp; propagated to it.
569   */
570  private <R, S, P extends Message, Q extends Message, T extends Message>
571    Pair<List<S>, Long> getStdArgs(final Table table, final ColumnInterpreter<R, S, P, Q, T> ci,
572          final Scan scan) throws Throwable {
573    final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false);
574    class StdCallback implements Batch.Callback<Pair<List<S>, Long>> {
575      long rowCountVal = 0L;
576      S sumVal = null, sumSqVal = null;
577
578      public synchronized Pair<List<S>, Long> getStdParams() {
579        List<S> l = new ArrayList<>(2);
580        l.add(sumVal);
581        l.add(sumSqVal);
582        Pair<List<S>, Long> p = new Pair<>(l, rowCountVal);
583        return p;
584      }
585
586      @Override
587      public synchronized void update(byte[] region, byte[] row, Pair<List<S>, Long> result) {
588        if (result.getFirst().size() > 0) {
589          sumVal = ci.add(sumVal, result.getFirst().get(0));
590          sumSqVal = ci.add(sumSqVal, result.getFirst().get(1));
591          rowCountVal += result.getSecond();
592        }
593      }
594    }
595
596    StdCallback stdCallback = new StdCallback();
597    table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
598        new Batch.Call<AggregateService, Pair<List<S>, Long>>() {
599          @Override
600          public Pair<List<S>, Long> call(AggregateService instance) throws IOException {
601            RpcController controller = new AggregationClientRpcController();
602            CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse> rpcCallback =
603                new CoprocessorRpcUtils.BlockingRpcCallback<>();
604            instance.getStd(controller, requestArg, rpcCallback);
605            AggregateResponse response = rpcCallback.get();
606            if (controller.failed()) {
607              throw new IOException(controller.errorText());
608            }
609            Pair<List<S>, Long> pair = new Pair<>(new ArrayList<>(), 0L);
610            if (response.getFirstPartCount() == 0) {
611              return pair;
612            }
613            List<S> list = new ArrayList<>();
614            for (int i = 0; i < response.getFirstPartCount(); i++) {
615              ByteString b = response.getFirstPart(i);
616              T t = getParsedGenericInstance(ci.getClass(), 4, b);
617              S s = ci.getPromotedValueFromProto(t);
618              list.add(s);
619            }
620            pair.setFirst(list);
621            ByteBuffer bb = ByteBuffer.allocate(8).put(
622                getBytesFromResponse(response.getSecondPart()));
623            bb.rewind();
624            pair.setSecond(bb.getLong());
625            return pair;
626          }
627        }, stdCallback);
628    return stdCallback.getStdParams();
629  }
630
631  /**
632   * This is the client side interface/handle for calling the std method for a
633   * given cf-cq combination. It was necessary to add one more call stack as its
634   * return type should be a decimal value, irrespective of what
635   * columninterpreter says. So, this methods collects the necessary parameters
636   * to compute the std and returns the double value.
637   * @param tableName the name of the table to scan
638   * @param ci the user's ColumnInterpreter implementation
639   * @param scan the HBase scan object to use to read data from HBase
640   * @return &lt;R, S&gt;
641   * @throws Throwable The caller is supposed to handle the exception as they are thrown
642   *           &amp; propagated to it.
643   */
644  public <R, S, P extends Message, Q extends Message, T extends Message>
645    double std(final TableName tableName, ColumnInterpreter<R, S, P, Q, T> ci,
646      Scan scan) throws Throwable {
647    try (Table table = connection.getTable(tableName)) {
648      return std(table, ci, scan);
649    }
650  }
651
652  /**
653   * This is the client side interface/handle for calling the std method for a
654   * given cf-cq combination. It was necessary to add one more call stack as its
655   * return type should be a decimal value, irrespective of what
656   * columninterpreter says. So, this methods collects the necessary parameters
657   * to compute the std and returns the double value.
658   * @param table table to scan.
659   * @param ci the user's ColumnInterpreter implementation
660   * @param scan the HBase scan object to use to read data from HBase
661   * @return &lt;R, S&gt;
662   * @throws Throwable The caller is supposed to handle the exception as they are thrown
663   *           &amp; propagated to it.
664   */
665  public <R, S, P extends Message, Q extends Message, T extends Message> double std(
666      final Table table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) throws Throwable {
667    Pair<List<S>, Long> p = getStdArgs(table, ci, scan);
668    double res = 0d;
669    double avg = ci.divideForAvg(p.getFirst().get(0), p.getSecond());
670    double avgOfSumSq = ci.divideForAvg(p.getFirst().get(1), p.getSecond());
671    res = avgOfSumSq - (avg) * (avg); // variance
672    res = Math.pow(res, 0.5);
673    return res;
674  }
675
676  /**
677   * It helps locate the region with median for a given column whose weight
678   * is specified in an optional column.
679   * From individual regions, it obtains sum of values and sum of weights.
680   * @param table table to scan.
681   * @param ci the user's ColumnInterpreter implementation
682   * @param scan the HBase scan object to use to read data from HBase
683   * @return pair whose first element is a map between start row of the region
684   *   and (sum of values, sum of weights) for the region, the second element is
685   *   (sum of values, sum of weights) for all the regions chosen
686   * @throws Throwable The caller is supposed to handle the exception as they are thrown
687   *           &amp; propagated to it.
688   */
689  private <R, S, P extends Message, Q extends Message, T extends Message>
690    Pair<NavigableMap<byte[], List<S>>, List<S>>
691    getMedianArgs(final Table table,
692      final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable {
693    final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false);
694    final NavigableMap<byte[], List<S>> map = new TreeMap<>(Bytes.BYTES_COMPARATOR);
695    class StdCallback implements Batch.Callback<List<S>> {
696      S sumVal = null, sumWeights = null;
697
698      public synchronized Pair<NavigableMap<byte[], List<S>>, List<S>> getMedianParams() {
699        List<S> l = new ArrayList<>(2);
700        l.add(sumVal);
701        l.add(sumWeights);
702        Pair<NavigableMap<byte[], List<S>>, List<S>> p = new Pair<>(map, l);
703        return p;
704      }
705
706      @Override
707      public synchronized void update(byte[] region, byte[] row, List<S> result) {
708        map.put(row, result);
709        sumVal = ci.add(sumVal, result.get(0));
710        sumWeights = ci.add(sumWeights, result.get(1));
711      }
712    }
713    StdCallback stdCallback = new StdCallback();
714    table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
715        new Batch.Call<AggregateService, List<S>>() {
716          @Override
717          public List<S> call(AggregateService instance) throws IOException {
718            RpcController controller = new AggregationClientRpcController();
719            CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse> rpcCallback =
720                new CoprocessorRpcUtils.BlockingRpcCallback<>();
721            instance.getMedian(controller, requestArg, rpcCallback);
722            AggregateResponse response = rpcCallback.get();
723            if (controller.failed()) {
724              throw new IOException(controller.errorText());
725            }
726
727            List<S> list = new ArrayList<>();
728            for (int i = 0; i < response.getFirstPartCount(); i++) {
729              ByteString b = response.getFirstPart(i);
730              T t = getParsedGenericInstance(ci.getClass(), 4, b);
731              S s = ci.getPromotedValueFromProto(t);
732              list.add(s);
733            }
734            return list;
735          }
736
737        }, stdCallback);
738    return stdCallback.getMedianParams();
739  }
740
741  /**
742   * This is the client side interface/handler for calling the median method for a
743   * given cf-cq combination. This method collects the necessary parameters
744   * to compute the median and returns the median.
745   * @param tableName the name of the table to scan
746   * @param ci the user's ColumnInterpreter implementation
747   * @param scan the HBase scan object to use to read data from HBase
748   * @return R the median
749   * @throws Throwable The caller is supposed to handle the exception as they are thrown
750   *           &amp; propagated to it.
751   */
752  public <R, S, P extends Message, Q extends Message, T extends Message>
753    R median(final TableName tableName, ColumnInterpreter<R, S, P, Q, T> ci,
754      Scan scan) throws Throwable {
755    try (Table table = connection.getTable(tableName)) {
756      return median(table, ci, scan);
757    }
758  }
759
760  /**
761   * This is the client side interface/handler for calling the median method for a
762   * given cf-cq combination. This method collects the necessary parameters
763   * to compute the median and returns the median.
764   * @param table table to scan.
765   * @param ci the user's ColumnInterpreter implementation
766   * @param scan the HBase scan object to use to read data from HBase
767   * @return R the median
768   * @throws Throwable The caller is supposed to handle the exception as they are thrown
769   *           &amp; propagated to it.
770   */
771  public <R, S, P extends Message, Q extends Message, T extends Message>
772    R median(final Table table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) throws Throwable {
773    Pair<NavigableMap<byte[], List<S>>, List<S>> p = getMedianArgs(table, ci, scan);
774    byte[] startRow = null;
775    byte[] colFamily = scan.getFamilies()[0];
776    NavigableSet<byte[]> quals = scan.getFamilyMap().get(colFamily);
777    NavigableMap<byte[], List<S>> map = p.getFirst();
778    S sumVal = p.getSecond().get(0);
779    S sumWeights = p.getSecond().get(1);
780    double halfSumVal = ci.divideForAvg(sumVal, 2L);
781    double movingSumVal = 0;
782    boolean weighted = false;
783    if (quals.size() > 1) {
784      weighted = true;
785      halfSumVal = ci.divideForAvg(sumWeights, 2L);
786    }
787
788    for (Map.Entry<byte[], List<S>> entry : map.entrySet()) {
789      S s = weighted ? entry.getValue().get(1) : entry.getValue().get(0);
790      double newSumVal = movingSumVal + ci.divideForAvg(s, 1L);
791      if (newSumVal > halfSumVal) {
792        // we found the region with the median
793        break;
794      }
795      movingSumVal = newSumVal;
796      startRow = entry.getKey();
797    }
798    // scan the region with median and find it
799    Scan scan2 = new Scan(scan);
800    // inherit stop row from method parameter
801    if (startRow != null) {
802      scan2.setStartRow(startRow);
803    }
804    ResultScanner scanner = null;
805    try {
806      int cacheSize = scan2.getCaching();
807      if (!scan2.getCacheBlocks() || scan2.getCaching() < 2) {
808        scan2.setCacheBlocks(true);
809        cacheSize = 5;
810        scan2.setCaching(cacheSize);
811      }
812      scanner = table.getScanner(scan2);
813      Result[] results = null;
814      byte[] qualifier = quals.pollFirst();
815      // qualifier for the weight column
816      byte[] weightQualifier = weighted ? quals.pollLast() : qualifier;
817      R value = null;
818      do {
819        results = scanner.next(cacheSize);
820        if (results != null && results.length > 0) {
821          for (int i = 0; i < results.length; i++) {
822            Result r = results[i];
823            // retrieve weight
824            Cell kv = r.getColumnLatestCell(colFamily, weightQualifier);
825            R newValue = ci.getValue(colFamily, weightQualifier, kv);
826            S s = ci.castToReturnType(newValue);
827            double newSumVal = movingSumVal + ci.divideForAvg(s, 1L);
828            // see if we have moved past the median
829            if (newSumVal > halfSumVal) {
830              return value;
831            }
832            movingSumVal = newSumVal;
833            kv = r.getColumnLatestCell(colFamily, qualifier);
834            value = ci.getValue(colFamily, qualifier, kv);
835          }
836        }
837      } while (results != null && results.length > 0);
838    } finally {
839      if (scanner != null) {
840        scanner.close();
841      }
842    }
843    return null;
844  }
845
846  byte[] getBytesFromResponse(ByteString response) {
847    ByteBuffer bb = response.asReadOnlyByteBuffer();
848    bb.rewind();
849    byte[] bytes;
850    if (bb.hasArray()) {
851      bytes = bb.array();
852    } else {
853      bytes = response.toByteArray();
854    }
855    return bytes;
856  }
857}