View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.client.coprocessor;
21  
22  import java.io.Closeable;
23  import java.io.IOException;
24  import java.nio.ByteBuffer;
25  import java.util.ArrayList;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.NavigableMap;
29  import java.util.NavigableSet;
30  import java.util.TreeMap;
31  import java.util.concurrent.atomic.AtomicLong;
32  
33  import org.apache.commons.logging.Log;
34  import org.apache.commons.logging.LogFactory;
35  import org.apache.hadoop.hbase.classification.InterfaceAudience;
36  import org.apache.hadoop.conf.Configuration;
37  import org.apache.hadoop.hbase.Cell;
38  import org.apache.hadoop.hbase.HConstants;
39  import org.apache.hadoop.hbase.TableName;
40  import org.apache.hadoop.hbase.classification.InterfaceAudience;
41  import org.apache.hadoop.hbase.client.Connection;
42  import org.apache.hadoop.hbase.client.ConnectionFactory;
43  import org.apache.hadoop.hbase.client.Result;
44  import org.apache.hadoop.hbase.client.ResultScanner;
45  import org.apache.hadoop.hbase.client.Scan;
46  import org.apache.hadoop.hbase.client.Table;
47  import org.apache.hadoop.hbase.coprocessor.ColumnInterpreter;
48  import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
49  import org.apache.hadoop.hbase.ipc.ServerRpcController;
50  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
51  import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateRequest;
52  import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateResponse;
53  import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateService;
54  import org.apache.hadoop.hbase.util.Bytes;
55  import org.apache.hadoop.hbase.util.Pair;
56  
57  import com.google.protobuf.ByteString;
58  import com.google.protobuf.Message;
59  
60  /**
61   * This client class is for invoking the aggregate functions deployed on the
62   * Region Server side via the AggregateService. This class will implement the
63   * supporting functionality for summing/processing the individual results
64   * obtained from the AggregateService for each region.
65   * <p>
66   * This will serve as the client side handler for invoking the aggregate
67   * functions.
68   * <ul>
69   * For all aggregate functions,
70   * <li>start row < end row is an essential condition (if they are not
71   * {@link HConstants#EMPTY_BYTE_ARRAY})
72   * <li>Column family can't be null. In case where multiple families are
73   * provided, an IOException will be thrown. An optional column qualifier can
74   * also be defined.
75   * <li>For methods to find maximum, minimum, sum, rowcount, it returns the
76   * parameter type. For average and std, it returns a double value. For row
77   * count, it returns a long value.
78   * <p>Call {@link #close()} when done.
79   */
80  @InterfaceAudience.Private
81  public class AggregationClient implements Closeable {
82    // TODO: This class is not used.  Move to examples?
83    private static final Log log = LogFactory.getLog(AggregationClient.class);
84    private final Connection connection;
85  
86    /**
87     * Constructor with Conf object
88     * @param cfg
89     */
90    public AggregationClient(Configuration cfg) {
91      try {
92        // Create a connection on construction. Will use it making each of the calls below.
93        this.connection = ConnectionFactory.createConnection(cfg);
94      } catch (IOException e) {
95        throw new RuntimeException(e);
96      }
97    }
98  
99    @Override
100   public void close() throws IOException {
101     if (this.connection != null && !this.connection.isClosed()) {
102       this.connection.close();
103     }
104   }
105 
106   /**
107    * It gives the maximum value of a column for a given column family for the
108    * given range. In case qualifier is null, a max of all values for the given
109    * family is returned.
110    * @param tableName
111    * @param ci
112    * @param scan
113    * @return max val <R>
114    * @throws Throwable
115    *           The caller is supposed to handle the exception as they are thrown
116    *           & propagated to it.
117    */
118   public <R, S, P extends Message, Q extends Message, T extends Message> R max(
119       final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
120   throws Throwable {
121     try (Table table = connection.getTable(tableName)) {
122       return max(table, ci, scan);
123     }
124   }
125 
126   /**
127    * It gives the maximum value of a column for a given column family for the
128    * given range. In case qualifier is null, a max of all values for the given
129    * family is returned.
130    * @param table
131    * @param ci
132    * @param scan
133    * @return max val <R>
134    * @throws Throwable
135    *           The caller is supposed to handle the exception as they are thrown
136    *           & propagated to it.
137    */
138   public <R, S, P extends Message, Q extends Message, T extends Message>
139   R max(final Table table, final ColumnInterpreter<R, S, P, Q, T> ci,
140       final Scan scan) throws Throwable {
141     final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false);
142     class MaxCallBack implements Batch.Callback<R> {
143       R max = null;
144 
145       R getMax() {
146         return max;
147       }
148 
149       @Override
150       public synchronized void update(byte[] region, byte[] row, R result) {
151         max = (max == null || (result != null && ci.compare(max, result) < 0)) ? result : max;
152       }
153     }
154     MaxCallBack aMaxCallBack = new MaxCallBack();
155     table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
156         new Batch.Call<AggregateService, R>() {
157           @Override
158           public R call(AggregateService instance) throws IOException {
159             ServerRpcController controller = new ServerRpcController();
160             BlockingRpcCallback<AggregateResponse> rpcCallback =
161                 new BlockingRpcCallback<AggregateResponse>();
162             instance.getMax(controller, requestArg, rpcCallback);
163             AggregateResponse response = rpcCallback.get();
164             if (controller.failedOnException()) {
165               throw controller.getFailedOn();
166             }
167             if (response.getFirstPartCount() > 0) {
168               ByteString b = response.getFirstPart(0);
169               Q q = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 3, b);
170               return ci.getCellValueFromProto(q);
171             }
172             return null;
173           }
174         }, aMaxCallBack);
175     return aMaxCallBack.getMax();
176   }
177 
178   /*
179    * @param scan
180    * @param canFamilyBeAbsent whether column family can be absent in familyMap of scan
181    */
182   private void validateParameters(Scan scan, boolean canFamilyBeAbsent) throws IOException {
183     if (scan == null
184         || (Bytes.equals(scan.getStartRow(), scan.getStopRow()) && !Bytes
185             .equals(scan.getStartRow(), HConstants.EMPTY_START_ROW))
186         || ((Bytes.compareTo(scan.getStartRow(), scan.getStopRow()) > 0) &&
187           !Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW))) {
188       throw new IOException(
189           "Agg client Exception: Startrow should be smaller than Stoprow");
190     } else if (!canFamilyBeAbsent) {
191       if (scan.getFamilyMap().size() != 1) {
192         throw new IOException("There must be only one family.");
193       }
194     }
195   }
196 
197   /**
198    * It gives the minimum value of a column for a given column family for the
199    * given range. In case qualifier is null, a min of all values for the given
200    * family is returned.
201    * @param tableName
202    * @param ci
203    * @param scan
204    * @return min val <R>
205    * @throws Throwable
206    */
207   public <R, S, P extends Message, Q extends Message, T extends Message> R min(
208       final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
209   throws Throwable {
210     try (Table table = connection.getTable(tableName)) {
211       return min(table, ci, scan);
212     }
213   }
214 
215   /**
216    * It gives the minimum value of a column for a given column family for the
217    * given range. In case qualifier is null, a min of all values for the given
218    * family is returned.
219    * @param table
220    * @param ci
221    * @param scan
222    * @return min val <R>
223    * @throws Throwable
224    */
225   public <R, S, P extends Message, Q extends Message, T extends Message>
226   R min(final Table table, final ColumnInterpreter<R, S, P, Q, T> ci,
227       final Scan scan) throws Throwable {
228     final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false);
229     class MinCallBack implements Batch.Callback<R> {
230 
231       private R min = null;
232 
233       public R getMinimum() {
234         return min;
235       }
236 
237       @Override
238       public synchronized void update(byte[] region, byte[] row, R result) {
239         min = (min == null || (result != null && ci.compare(result, min) < 0)) ? result : min;
240       }
241     }
242     MinCallBack minCallBack = new MinCallBack();
243     table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
244         new Batch.Call<AggregateService, R>() {
245 
246           @Override
247           public R call(AggregateService instance) throws IOException {
248             ServerRpcController controller = new ServerRpcController();
249             BlockingRpcCallback<AggregateResponse> rpcCallback =
250                 new BlockingRpcCallback<AggregateResponse>();
251             instance.getMin(controller, requestArg, rpcCallback);
252             AggregateResponse response = rpcCallback.get();
253             if (controller.failedOnException()) {
254               throw controller.getFailedOn();
255             }
256             if (response.getFirstPartCount() > 0) {
257               ByteString b = response.getFirstPart(0);
258               Q q = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 3, b);
259               return ci.getCellValueFromProto(q);
260             }
261             return null;
262           }
263         }, minCallBack);
264     log.debug("Min fom all regions is: " + minCallBack.getMinimum());
265     return minCallBack.getMinimum();
266   }
267 
268   /**
269    * It gives the row count, by summing up the individual results obtained from
270    * regions. In case the qualifier is null, FirstKeyValueFilter is used to
271    * optimised the operation. In case qualifier is provided, I can't use the
272    * filter as it may set the flag to skip to next row, but the value read is
273    * not of the given filter: in this case, this particular row will not be
274    * counted ==> an error.
275    * @param tableName
276    * @param ci
277    * @param scan
278    * @return <R, S>
279    * @throws Throwable
280    */
281   public <R, S, P extends Message, Q extends Message, T extends Message> long rowCount(
282       final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
283   throws Throwable {
284     try (Table table = connection.getTable(tableName)) {
285         return rowCount(table, ci, scan);
286     }
287   }
288 
289   /**
290    * It gives the row count, by summing up the individual results obtained from
291    * regions. In case the qualifier is null, FirstKeyValueFilter is used to
292    * optimised the operation. In case qualifier is provided, I can't use the
293    * filter as it may set the flag to skip to next row, but the value read is
294    * not of the given filter: in this case, this particular row will not be
295    * counted ==> an error.
296    * @param table
297    * @param ci
298    * @param scan
299    * @return <R, S>
300    * @throws Throwable
301    */
302   public <R, S, P extends Message, Q extends Message, T extends Message>
303   long rowCount(final Table table,
304       final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable {
305     final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, true);
306     class RowNumCallback implements Batch.Callback<Long> {
307       private final AtomicLong rowCountL = new AtomicLong(0);
308 
309       public long getRowNumCount() {
310         return rowCountL.get();
311       }
312 
313       @Override
314       public void update(byte[] region, byte[] row, Long result) {
315         rowCountL.addAndGet(result.longValue());
316       }
317     }
318     RowNumCallback rowNum = new RowNumCallback();
319     table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
320         new Batch.Call<AggregateService, Long>() {
321           @Override
322           public Long call(AggregateService instance) throws IOException {
323             ServerRpcController controller = new ServerRpcController();
324             BlockingRpcCallback<AggregateResponse> rpcCallback =
325                 new BlockingRpcCallback<AggregateResponse>();
326             instance.getRowNum(controller, requestArg, rpcCallback);
327             AggregateResponse response = rpcCallback.get();
328             if (controller.failedOnException()) {
329               throw controller.getFailedOn();
330             }
331             byte[] bytes = getBytesFromResponse(response.getFirstPart(0));
332             ByteBuffer bb = ByteBuffer.allocate(8).put(bytes);
333             bb.rewind();
334             return bb.getLong();
335           }
336         }, rowNum);
337     return rowNum.getRowNumCount();
338   }
339 
340   /**
341    * It sums up the value returned from various regions. In case qualifier is
342    * null, summation of all the column qualifiers in the given family is done.
343    * @param tableName
344    * @param ci
345    * @param scan
346    * @return sum <S>
347    * @throws Throwable
348    */
349   public <R, S, P extends Message, Q extends Message, T extends Message> S sum(
350       final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
351   throws Throwable {
352     try (Table table = connection.getTable(tableName)) {
353         return sum(table, ci, scan);
354     }
355   }
356 
357   /**
358    * It sums up the value returned from various regions. In case qualifier is
359    * null, summation of all the column qualifiers in the given family is done.
360    * @param table
361    * @param ci
362    * @param scan
363    * @return sum <S>
364    * @throws Throwable
365    */
366   public <R, S, P extends Message, Q extends Message, T extends Message>
367   S sum(final Table table, final ColumnInterpreter<R, S, P, Q, T> ci,
368       final Scan scan) throws Throwable {
369     final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false);
370 
371     class SumCallBack implements Batch.Callback<S> {
372       S sumVal = null;
373 
374       public S getSumResult() {
375         return sumVal;
376       }
377 
378       @Override
379       public synchronized void update(byte[] region, byte[] row, S result) {
380         sumVal = ci.add(sumVal, result);
381       }
382     }
383     SumCallBack sumCallBack = new SumCallBack();
384     table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
385         new Batch.Call<AggregateService, S>() {
386           @Override
387           public S call(AggregateService instance) throws IOException {
388             ServerRpcController controller = new ServerRpcController();
389             BlockingRpcCallback<AggregateResponse> rpcCallback =
390                 new BlockingRpcCallback<AggregateResponse>();
391             instance.getSum(controller, requestArg, rpcCallback);
392             AggregateResponse response = rpcCallback.get();
393             if (controller.failedOnException()) {
394               throw controller.getFailedOn();
395             }
396             if (response.getFirstPartCount() == 0) {
397               return null;
398             }
399             ByteString b = response.getFirstPart(0);
400             T t = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 4, b);
401             S s = ci.getPromotedValueFromProto(t);
402             return s;
403           }
404         }, sumCallBack);
405     return sumCallBack.getSumResult();
406   }
407 
408   /**
409    * It computes average while fetching sum and row count from all the
410    * corresponding regions. Approach is to compute a global sum of region level
411    * sum and rowcount and then compute the average.
412    * @param tableName
413    * @param scan
414    * @throws Throwable
415    */
416   private <R, S, P extends Message, Q extends Message, T extends Message> Pair<S, Long> getAvgArgs(
417       final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
418       throws Throwable {
419     try (Table table = connection.getTable(tableName)) {
420         return getAvgArgs(table, ci, scan);
421     }
422   }
423 
424   /**
425    * It computes average while fetching sum and row count from all the
426    * corresponding regions. Approach is to compute a global sum of region level
427    * sum and rowcount and then compute the average.
428    * @param table
429    * @param scan
430    * @throws Throwable
431    */
432   private <R, S, P extends Message, Q extends Message, T extends Message>
433   Pair<S, Long> getAvgArgs(final Table table,
434       final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable {
435     final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false);
436     class AvgCallBack implements Batch.Callback<Pair<S, Long>> {
437       S sum = null;
438       Long rowCount = 0l;
439 
440       public synchronized Pair<S, Long> getAvgArgs() {
441         return new Pair<S, Long>(sum, rowCount);
442       }
443 
444       @Override
445       public synchronized void update(byte[] region, byte[] row, Pair<S, Long> result) {
446         sum = ci.add(sum, result.getFirst());
447         rowCount += result.getSecond();
448       }
449     }
450     AvgCallBack avgCallBack = new AvgCallBack();
451     table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
452         new Batch.Call<AggregateService, Pair<S, Long>>() {
453           @Override
454           public Pair<S, Long> call(AggregateService instance) throws IOException {
455             ServerRpcController controller = new ServerRpcController();
456             BlockingRpcCallback<AggregateResponse> rpcCallback =
457                 new BlockingRpcCallback<AggregateResponse>();
458             instance.getAvg(controller, requestArg, rpcCallback);
459             AggregateResponse response = rpcCallback.get();
460             if (controller.failedOnException()) {
461               throw controller.getFailedOn();
462             }
463             Pair<S, Long> pair = new Pair<S, Long>(null, 0L);
464             if (response.getFirstPartCount() == 0) {
465               return pair;
466             }
467             ByteString b = response.getFirstPart(0);
468             T t = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 4, b);
469             S s = ci.getPromotedValueFromProto(t);
470             pair.setFirst(s);
471             ByteBuffer bb = ByteBuffer.allocate(8).put(
472                 getBytesFromResponse(response.getSecondPart()));
473             bb.rewind();
474             pair.setSecond(bb.getLong());
475             return pair;
476           }
477         }, avgCallBack);
478     return avgCallBack.getAvgArgs();
479   }
480 
481   /**
482    * This is the client side interface/handle for calling the average method for
483    * a given cf-cq combination. It was necessary to add one more call stack as
484    * its return type should be a decimal value, irrespective of what
485    * columninterpreter says. So, this methods collects the necessary parameters
486    * to compute the average and returs the double value.
487    * @param tableName
488    * @param ci
489    * @param scan
490    * @return <R, S>
491    * @throws Throwable
492    */
493   public <R, S, P extends Message, Q extends Message, T extends Message>
494   double avg(final TableName tableName,
495       final ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) throws Throwable {
496     Pair<S, Long> p = getAvgArgs(tableName, ci, scan);
497     return ci.divideForAvg(p.getFirst(), p.getSecond());
498   }
499 
500   /**
501    * This is the client side interface/handle for calling the average method for
502    * a given cf-cq combination. It was necessary to add one more call stack as
503    * its return type should be a decimal value, irrespective of what
504    * columninterpreter says. So, this methods collects the necessary parameters
505    * to compute the average and returs the double value.
506    * @param table
507    * @param ci
508    * @param scan
509    * @return <R, S>
510    * @throws Throwable
511    */
512   public <R, S, P extends Message, Q extends Message, T extends Message> double avg(
513       final Table table, final ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) throws Throwable {
514     Pair<S, Long> p = getAvgArgs(table, ci, scan);
515     return ci.divideForAvg(p.getFirst(), p.getSecond());
516   }
517 
518   /**
519    * It computes a global standard deviation for a given column and its value.
520    * Standard deviation is square root of (average of squares -
521    * average*average). From individual regions, it obtains sum, square sum and
522    * number of rows. With these, the above values are computed to get the global
523    * std.
524    * @param table
525    * @param scan
526    * @return standard deviations
527    * @throws Throwable
528    */
529   private <R, S, P extends Message, Q extends Message, T extends Message>
530   Pair<List<S>, Long> getStdArgs(final Table table,
531       final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable {
532     final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false);
533     class StdCallback implements Batch.Callback<Pair<List<S>, Long>> {
534       long rowCountVal = 0l;
535       S sumVal = null, sumSqVal = null;
536 
537       public synchronized Pair<List<S>, Long> getStdParams() {
538         List<S> l = new ArrayList<S>();
539         l.add(sumVal);
540         l.add(sumSqVal);
541         Pair<List<S>, Long> p = new Pair<List<S>, Long>(l, rowCountVal);
542         return p;
543       }
544 
545       @Override
546       public synchronized void update(byte[] region, byte[] row, Pair<List<S>, Long> result) {
547         if (result.getFirst().size() > 0) {
548           sumVal = ci.add(sumVal, result.getFirst().get(0));
549           sumSqVal = ci.add(sumSqVal, result.getFirst().get(1));
550           rowCountVal += result.getSecond();
551         }
552       }
553     }
554     StdCallback stdCallback = new StdCallback();
555     table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
556         new Batch.Call<AggregateService, Pair<List<S>, Long>>() {
557           @Override
558           public Pair<List<S>, Long> call(AggregateService instance) throws IOException {
559             ServerRpcController controller = new ServerRpcController();
560             BlockingRpcCallback<AggregateResponse> rpcCallback =
561                 new BlockingRpcCallback<AggregateResponse>();
562             instance.getStd(controller, requestArg, rpcCallback);
563             AggregateResponse response = rpcCallback.get();
564             if (controller.failedOnException()) {
565               throw controller.getFailedOn();
566             }
567             Pair<List<S>, Long> pair = new Pair<List<S>, Long>(new ArrayList<S>(), 0L);
568             if (response.getFirstPartCount() == 0) {
569               return pair;
570             }
571             List<S> list = new ArrayList<S>();
572             for (int i = 0; i < response.getFirstPartCount(); i++) {
573               ByteString b = response.getFirstPart(i);
574               T t = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 4, b);
575               S s = ci.getPromotedValueFromProto(t);
576               list.add(s);
577             }
578             pair.setFirst(list);
579             ByteBuffer bb = ByteBuffer.allocate(8).put(
580                 getBytesFromResponse(response.getSecondPart()));
581             bb.rewind();
582             pair.setSecond(bb.getLong());
583             return pair;
584           }
585         }, stdCallback);
586     return stdCallback.getStdParams();
587   }
588 
589   /**
590    * This is the client side interface/handle for calling the std method for a
591    * given cf-cq combination. It was necessary to add one more call stack as its
592    * return type should be a decimal value, irrespective of what
593    * columninterpreter says. So, this methods collects the necessary parameters
594    * to compute the std and returns the double value.
595    * @param tableName
596    * @param ci
597    * @param scan
598    * @return <R, S>
599    * @throws Throwable
600    */
601   public <R, S, P extends Message, Q extends Message, T extends Message>
602   double std(final TableName tableName, ColumnInterpreter<R, S, P, Q, T> ci,
603       Scan scan) throws Throwable {
604     try (Table table = connection.getTable(tableName)) {
605         return std(table, ci, scan);
606     }
607   }
608 
609   /**
610    * This is the client side interface/handle for calling the std method for a
611    * given cf-cq combination. It was necessary to add one more call stack as its
612    * return type should be a decimal value, irrespective of what
613    * columninterpreter says. So, this methods collects the necessary parameters
614    * to compute the std and returns the double value.
615    * @param table
616    * @param ci
617    * @param scan
618    * @return <R, S>
619    * @throws Throwable
620    */
621   public <R, S, P extends Message, Q extends Message, T extends Message> double std(
622       final Table table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) throws Throwable {
623     Pair<List<S>, Long> p = getStdArgs(table, ci, scan);
624     double res = 0d;
625     double avg = ci.divideForAvg(p.getFirst().get(0), p.getSecond());
626     double avgOfSumSq = ci.divideForAvg(p.getFirst().get(1), p.getSecond());
627     res = avgOfSumSq - (avg) * (avg); // variance
628     res = Math.pow(res, 0.5);
629     return res;
630   }
631 
632   /**
633    * It helps locate the region with median for a given column whose weight
634    * is specified in an optional column.
635    * From individual regions, it obtains sum of values and sum of weights.
636    * @param table
637    * @param ci
638    * @param scan
639    * @return pair whose first element is a map between start row of the region
640    *  and (sum of values, sum of weights) for the region, the second element is
641    *  (sum of values, sum of weights) for all the regions chosen
642    * @throws Throwable
643    */
644   private <R, S, P extends Message, Q extends Message, T extends Message>
645   Pair<NavigableMap<byte[], List<S>>, List<S>>
646   getMedianArgs(final Table table,
647       final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable {
648     final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false);
649     final NavigableMap<byte[], List<S>> map =
650       new TreeMap<byte[], List<S>>(Bytes.BYTES_COMPARATOR);
651     class StdCallback implements Batch.Callback<List<S>> {
652       S sumVal = null, sumWeights = null;
653 
654       public synchronized Pair<NavigableMap<byte[], List<S>>, List<S>> getMedianParams() {
655         List<S> l = new ArrayList<S>();
656         l.add(sumVal);
657         l.add(sumWeights);
658         Pair<NavigableMap<byte[], List<S>>, List<S>> p =
659           new Pair<NavigableMap<byte[], List<S>>, List<S>>(map, l);
660         return p;
661       }
662 
663       @Override
664       public synchronized void update(byte[] region, byte[] row, List<S> result) {
665         map.put(row, result);
666         sumVal = ci.add(sumVal, result.get(0));
667         sumWeights = ci.add(sumWeights, result.get(1));
668       }
669     }
670     StdCallback stdCallback = new StdCallback();
671     table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
672         new Batch.Call<AggregateService, List<S>>() {
673           @Override
674           public List<S> call(AggregateService instance) throws IOException {
675             ServerRpcController controller = new ServerRpcController();
676             BlockingRpcCallback<AggregateResponse> rpcCallback =
677                 new BlockingRpcCallback<AggregateResponse>();
678             instance.getMedian(controller, requestArg, rpcCallback);
679             AggregateResponse response = rpcCallback.get();
680             if (controller.failedOnException()) {
681               throw controller.getFailedOn();
682             }
683 
684             List<S> list = new ArrayList<S>();
685             for (int i = 0; i < response.getFirstPartCount(); i++) {
686               ByteString b = response.getFirstPart(i);
687               T t = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 4, b);
688               S s = ci.getPromotedValueFromProto(t);
689               list.add(s);
690             }
691             return list;
692           }
693 
694         }, stdCallback);
695     return stdCallback.getMedianParams();
696   }
697 
698   /**
699    * This is the client side interface/handler for calling the median method for a
700    * given cf-cq combination. This method collects the necessary parameters
701    * to compute the median and returns the median.
702    * @param tableName
703    * @param ci
704    * @param scan
705    * @return R the median
706    * @throws Throwable
707    */
708   public <R, S, P extends Message, Q extends Message, T extends Message>
709   R median(final TableName tableName, ColumnInterpreter<R, S, P, Q, T> ci,
710       Scan scan) throws Throwable {
711     try (Table table = connection.getTable(tableName)) {
712         return median(table, ci, scan);
713     }
714   }
715 
716   /**
717    * This is the client side interface/handler for calling the median method for a
718    * given cf-cq combination. This method collects the necessary parameters
719    * to compute the median and returns the median.
720    * @param table
721    * @param ci
722    * @param scan
723    * @return R the median
724    * @throws Throwable
725    */
726   public <R, S, P extends Message, Q extends Message, T extends Message>
727   R median(final Table table, ColumnInterpreter<R, S, P, Q, T> ci,
728       Scan scan) throws Throwable {
729     Pair<NavigableMap<byte[], List<S>>, List<S>> p = getMedianArgs(table, ci, scan);
730     byte[] startRow = null;
731     byte[] colFamily = scan.getFamilies()[0];
732     NavigableSet<byte[]> quals = scan.getFamilyMap().get(colFamily);
733     NavigableMap<byte[], List<S>> map = p.getFirst();
734     S sumVal = p.getSecond().get(0);
735     S sumWeights = p.getSecond().get(1);
736     double halfSumVal = ci.divideForAvg(sumVal, 2L);
737     double movingSumVal = 0;
738     boolean weighted = false;
739     if (quals.size() > 1) {
740       weighted = true;
741       halfSumVal = ci.divideForAvg(sumWeights, 2L);
742     }
743 
744     for (Map.Entry<byte[], List<S>> entry : map.entrySet()) {
745       S s = weighted ? entry.getValue().get(1) : entry.getValue().get(0);
746       double newSumVal = movingSumVal + ci.divideForAvg(s, 1L);
747       if (newSumVal > halfSumVal) break;  // we found the region with the median
748       movingSumVal = newSumVal;
749       startRow = entry.getKey();
750     }
751     // scan the region with median and find it
752     Scan scan2 = new Scan(scan);
753     // inherit stop row from method parameter
754     if (startRow != null) scan2.setStartRow(startRow);
755     ResultScanner scanner = null;
756     try {
757       int cacheSize = scan2.getCaching();
758       if (!scan2.getCacheBlocks() || scan2.getCaching() < 2) {
759         scan2.setCacheBlocks(true);
760         cacheSize = 5;
761         scan2.setCaching(cacheSize);
762       }
763       scanner = table.getScanner(scan2);
764       Result[] results = null;
765       byte[] qualifier = quals.pollFirst();
766       // qualifier for the weight column
767       byte[] weightQualifier = weighted ? quals.pollLast() : qualifier;
768       R value = null;
769       do {
770         results = scanner.next(cacheSize);
771         if (results != null && results.length > 0) {
772           for (int i = 0; i < results.length; i++) {
773             Result r = results[i];
774             // retrieve weight
775             Cell kv = r.getColumnLatest(colFamily, weightQualifier);
776             R newValue = ci.getValue(colFamily, weightQualifier, kv);
777             S s = ci.castToReturnType(newValue);
778             double newSumVal = movingSumVal + ci.divideForAvg(s, 1L);
779             // see if we have moved past the median
780             if (newSumVal > halfSumVal) {
781               return value;
782             }
783             movingSumVal = newSumVal;
784             kv = r.getColumnLatest(colFamily, qualifier);
785             value = ci.getValue(colFamily, qualifier, kv);
786             }
787           }
788       } while (results != null && results.length > 0);
789     } finally {
790       if (scanner != null) {
791         scanner.close();
792       }
793     }
794     return null;
795   }
796 
797   <R, S, P extends Message, Q extends Message, T extends Message> AggregateRequest
798   validateArgAndGetPB(Scan scan, ColumnInterpreter<R,S,P,Q,T> ci, boolean canFamilyBeAbsent)
799       throws IOException {
800     validateParameters(scan, canFamilyBeAbsent);
801     final AggregateRequest.Builder requestBuilder =
802         AggregateRequest.newBuilder();
803     requestBuilder.setInterpreterClassName(ci.getClass().getCanonicalName());
804     P columnInterpreterSpecificData = null;
805     if ((columnInterpreterSpecificData = ci.getRequestData())
806        != null) {
807       requestBuilder.setInterpreterSpecificBytes(columnInterpreterSpecificData.toByteString());
808     }
809     requestBuilder.setScan(ProtobufUtil.toScan(scan));
810     return requestBuilder.build();
811   }
812 
813   byte[] getBytesFromResponse(ByteString response) {
814     ByteBuffer bb = response.asReadOnlyByteBuffer();
815     bb.rewind();
816     byte[] bytes;
817     if (bb.hasArray()) {
818       bytes = bb.array();
819     } else {
820       bytes = response.toByteArray();
821     }
822     return bytes;
823   }
824 }