View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.client.coprocessor;
21  
22  import java.io.Closeable;
23  import java.io.IOException;
24  import java.nio.ByteBuffer;
25  import java.util.ArrayList;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.NavigableMap;
29  import java.util.NavigableSet;
30  import java.util.TreeMap;
31  import java.util.concurrent.atomic.AtomicLong;
32  
33  import org.apache.commons.logging.Log;
34  import org.apache.commons.logging.LogFactory;
35  import org.apache.hadoop.conf.Configuration;
36  import org.apache.hadoop.hbase.Cell;
37  import org.apache.hadoop.hbase.HConstants;
38  import org.apache.hadoop.hbase.TableName;
39  import org.apache.hadoop.hbase.classification.InterfaceAudience;
40  import org.apache.hadoop.hbase.client.Connection;
41  import org.apache.hadoop.hbase.client.ConnectionFactory;
42  import org.apache.hadoop.hbase.client.Result;
43  import org.apache.hadoop.hbase.client.ResultScanner;
44  import org.apache.hadoop.hbase.client.Scan;
45  import org.apache.hadoop.hbase.client.Table;
46  import org.apache.hadoop.hbase.coprocessor.ColumnInterpreter;
47  import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
48  import org.apache.hadoop.hbase.ipc.ServerRpcController;
49  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
50  import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateRequest;
51  import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateResponse;
52  import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateService;
53  import org.apache.hadoop.hbase.util.Bytes;
54  import org.apache.hadoop.hbase.util.Pair;
55  
56  import com.google.protobuf.ByteString;
57  import com.google.protobuf.Message;
58  
59  /**
60   * This client class is for invoking the aggregate functions deployed on the
61   * Region Server side via the AggregateService. This class will implement the
62   * supporting functionality for summing/processing the individual results
63   * obtained from the AggregateService for each region.
64   * <p>
65   * This will serve as the client side handler for invoking the aggregate
66   * functions.
67   * <ul>
68   * For all aggregate functions,
69   * <li>start row < end row is an essential condition (if they are not
70   * {@link HConstants#EMPTY_BYTE_ARRAY})
71   * <li>Column family can't be null. In case where multiple families are
72   * provided, an IOException will be thrown. An optional column qualifier can
73   * also be defined.
74   * <li>For methods to find maximum, minimum, sum, rowcount, it returns the
75   * parameter type. For average and std, it returns a double value. For row
76   * count, it returns a long value.
77   * <p>Call {@link #close()} when done.
78   */
79  @InterfaceAudience.Private
80  public class AggregationClient implements Closeable {
81    // TODO: This class is not used.  Move to examples?
82    private static final Log log = LogFactory.getLog(AggregationClient.class);
83    private final Connection connection;
84  
85    /**
86     * Constructor with Conf object
87     * @param cfg
88     */
89    public AggregationClient(Configuration cfg) {
90      try {
91        // Create a connection on construction. Will use it making each of the calls below.
92        this.connection = ConnectionFactory.createConnection(cfg);
93      } catch (IOException e) {
94        throw new RuntimeException(e);
95      }
96    }
97  
98    @Override
99    public void close() throws IOException {
100     if (this.connection != null && !this.connection.isClosed()) {
101       this.connection.close();
102     }
103   }
104 
105   /**
106    * It gives the maximum value of a column for a given column family for the
107    * given range. In case qualifier is null, a max of all values for the given
108    * family is returned.
109    * @param tableName
110    * @param ci
111    * @param scan
112    * @return max val <R>
113    * @throws Throwable
114    *           The caller is supposed to handle the exception as they are thrown
115    *           & propagated to it.
116    */
117   public <R, S, P extends Message, Q extends Message, T extends Message> R max(
118       final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
119   throws Throwable {
120     try (Table table = connection.getTable(tableName)) {
121       return max(table, ci, scan);
122     }
123   }
124 
125   /**
126    * It gives the maximum value of a column for a given column family for the
127    * given range. In case qualifier is null, a max of all values for the given
128    * family is returned.
129    * @param table
130    * @param ci
131    * @param scan
132    * @return max val <R>
133    * @throws Throwable
134    *           The caller is supposed to handle the exception as they are thrown
135    *           & propagated to it.
136    */
137   public <R, S, P extends Message, Q extends Message, T extends Message> 
138   R max(final Table table, final ColumnInterpreter<R, S, P, Q, T> ci,
139       final Scan scan) throws Throwable {
140     final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false);
141     class MaxCallBack implements Batch.Callback<R> {
142       R max = null;
143 
144       R getMax() {
145         return max;
146       }
147 
148       @Override
149       public synchronized void update(byte[] region, byte[] row, R result) {
150         max = (max == null || (result != null && ci.compare(max, result) < 0)) ? result : max;
151       }
152     }
153     MaxCallBack aMaxCallBack = new MaxCallBack();
154     table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
155         new Batch.Call<AggregateService, R>() {
156           @Override
157           public R call(AggregateService instance) throws IOException {
158             ServerRpcController controller = new ServerRpcController();
159             BlockingRpcCallback<AggregateResponse> rpcCallback = 
160                 new BlockingRpcCallback<AggregateResponse>();
161             instance.getMax(controller, requestArg, rpcCallback);
162             AggregateResponse response = rpcCallback.get();
163             if (controller.failedOnException()) {
164               throw controller.getFailedOn();
165             }
166             if (response.getFirstPartCount() > 0) {
167               ByteString b = response.getFirstPart(0);
168               Q q = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 3, b);
169               return ci.getCellValueFromProto(q);
170             }
171             return null;
172           }
173         }, aMaxCallBack);
174     return aMaxCallBack.getMax();
175   }
176 
177   /*
178    * @param scan
179    * @param canFamilyBeAbsent whether column family can be absent in familyMap of scan
180    */
181   private void validateParameters(Scan scan, boolean canFamilyBeAbsent) throws IOException {
182     if (scan == null
183         || (Bytes.equals(scan.getStartRow(), scan.getStopRow()) && !Bytes.equals(
184           scan.getStartRow(), HConstants.EMPTY_START_ROW))
185         || ((Bytes.compareTo(scan.getStartRow(), scan.getStopRow()) > 0) && !Bytes.equals(
186           scan.getStopRow(), HConstants.EMPTY_END_ROW))) {
187       throw new IOException("Agg client Exception: Startrow should be smaller than Stoprow");
188     } else if (!canFamilyBeAbsent) {
189       if (scan.getFamilyMap().size() != 1) {
190         throw new IOException("There must be only one family.");
191       }
192     }
193   }
194 
195   /**
196    * It gives the minimum value of a column for a given column family for the
197    * given range. In case qualifier is null, a min of all values for the given
198    * family is returned.
199    * @param tableName
200    * @param ci
201    * @param scan
202    * @return min val <R>
203    * @throws Throwable
204    */
205   public <R, S, P extends Message, Q extends Message, T extends Message> R min(
206       final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
207   throws Throwable {
208     try (Table table = connection.getTable(tableName)) {
209       return min(table, ci, scan);
210     }
211   }
212 
213   /**
214    * It gives the minimum value of a column for a given column family for the
215    * given range. In case qualifier is null, a min of all values for the given
216    * family is returned.
217    * @param table
218    * @param ci
219    * @param scan
220    * @return min val <R>
221    * @throws Throwable
222    */
223   public <R, S, P extends Message, Q extends Message, T extends Message> 
224   R min(final Table table, final ColumnInterpreter<R, S, P, Q, T> ci,
225       final Scan scan) throws Throwable {
226     final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false);
227     class MinCallBack implements Batch.Callback<R> {
228 
229       private R min = null;
230 
231       public R getMinimum() {
232         return min;
233       }
234 
235       @Override
236       public synchronized void update(byte[] region, byte[] row, R result) {
237         min = (min == null || (result != null && ci.compare(result, min) < 0)) ? result : min;
238       }
239     }
240     MinCallBack minCallBack = new MinCallBack();
241     table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
242         new Batch.Call<AggregateService, R>() {
243 
244           @Override
245           public R call(AggregateService instance) throws IOException {
246             ServerRpcController controller = new ServerRpcController();
247             BlockingRpcCallback<AggregateResponse> rpcCallback = 
248                 new BlockingRpcCallback<AggregateResponse>();
249             instance.getMin(controller, requestArg, rpcCallback);
250             AggregateResponse response = rpcCallback.get();
251             if (controller.failedOnException()) {
252               throw controller.getFailedOn();
253             }
254             if (response.getFirstPartCount() > 0) {
255               ByteString b = response.getFirstPart(0);
256               Q q = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 3, b);
257               return ci.getCellValueFromProto(q);
258             }
259             return null;
260           }
261         }, minCallBack);
262     log.debug("Min fom all regions is: " + minCallBack.getMinimum());
263     return minCallBack.getMinimum();
264   }
265 
266   /**
267    * It gives the row count, by summing up the individual results obtained from
268    * regions. In case the qualifier is null, FirstKeyValueFilter is used to
269    * optimised the operation. In case qualifier is provided, I can't use the
270    * filter as it may set the flag to skip to next row, but the value read is
271    * not of the given filter: in this case, this particular row will not be
272    * counted ==> an error.
273    * @param tableName
274    * @param ci
275    * @param scan
276    * @return <R, S>
277    * @throws Throwable
278    */
279   public <R, S, P extends Message, Q extends Message, T extends Message> long rowCount(
280       final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
281   throws Throwable {
282     try (Table table = connection.getTable(tableName)) {
283         return rowCount(table, ci, scan);
284     }
285   }
286 
287   /**
288    * It gives the row count, by summing up the individual results obtained from
289    * regions. In case the qualifier is null, FirstKeyValueFilter is used to
290    * optimised the operation. In case qualifier is provided, I can't use the
291    * filter as it may set the flag to skip to next row, but the value read is
292    * not of the given filter: in this case, this particular row will not be
293    * counted ==> an error.
294    * @param table
295    * @param ci
296    * @param scan
297    * @return <R, S>
298    * @throws Throwable
299    */
300   public <R, S, P extends Message, Q extends Message, T extends Message> 
301   long rowCount(final Table table,
302       final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable {
303     final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, true);
304     class RowNumCallback implements Batch.Callback<Long> {
305       private final AtomicLong rowCountL = new AtomicLong(0);
306 
307       public long getRowNumCount() {
308         return rowCountL.get();
309       }
310 
311       @Override
312       public void update(byte[] region, byte[] row, Long result) {
313         rowCountL.addAndGet(result.longValue());
314       }
315     }
316     RowNumCallback rowNum = new RowNumCallback();
317     table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
318         new Batch.Call<AggregateService, Long>() {
319           @Override
320           public Long call(AggregateService instance) throws IOException {
321             ServerRpcController controller = new ServerRpcController();
322             BlockingRpcCallback<AggregateResponse> rpcCallback = 
323                 new BlockingRpcCallback<AggregateResponse>();
324             instance.getRowNum(controller, requestArg, rpcCallback);
325             AggregateResponse response = rpcCallback.get();
326             if (controller.failedOnException()) {
327               throw controller.getFailedOn();
328             }
329             byte[] bytes = getBytesFromResponse(response.getFirstPart(0));
330             ByteBuffer bb = ByteBuffer.allocate(8).put(bytes);
331             bb.rewind();
332             return bb.getLong();
333           }
334         }, rowNum);
335     return rowNum.getRowNumCount();
336   }
337 
338   /**
339    * It sums up the value returned from various regions. In case qualifier is
340    * null, summation of all the column qualifiers in the given family is done.
341    * @param tableName
342    * @param ci
343    * @param scan
344    * @return sum <S>
345    * @throws Throwable
346    */
347   public <R, S, P extends Message, Q extends Message, T extends Message> S sum(
348       final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
349   throws Throwable {
350     try (Table table = connection.getTable(tableName)) {
351         return sum(table, ci, scan);
352     }
353   }
354 
355   /**
356    * It sums up the value returned from various regions. In case qualifier is
357    * null, summation of all the column qualifiers in the given family is done.
358    * @param table
359    * @param ci
360    * @param scan
361    * @return sum <S>
362    * @throws Throwable
363    */
364   public <R, S, P extends Message, Q extends Message, T extends Message> 
365   S sum(final Table table, final ColumnInterpreter<R, S, P, Q, T> ci,
366       final Scan scan) throws Throwable {
367     final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false);
368     
369     class SumCallBack implements Batch.Callback<S> {
370       S sumVal = null;
371 
372       public S getSumResult() {
373         return sumVal;
374       }
375 
376       @Override
377       public synchronized void update(byte[] region, byte[] row, S result) {
378         sumVal = ci.add(sumVal, result);
379       }
380     }
381     SumCallBack sumCallBack = new SumCallBack();
382     table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
383         new Batch.Call<AggregateService, S>() {
384           @Override
385           public S call(AggregateService instance) throws IOException {
386             ServerRpcController controller = new ServerRpcController();
387             BlockingRpcCallback<AggregateResponse> rpcCallback = 
388                 new BlockingRpcCallback<AggregateResponse>();
389             instance.getSum(controller, requestArg, rpcCallback);
390             AggregateResponse response = rpcCallback.get();
391             if (controller.failedOnException()) {
392               throw controller.getFailedOn();
393             }
394             if (response.getFirstPartCount() == 0) {
395               return null;
396             }
397             ByteString b = response.getFirstPart(0);
398             T t = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 4, b);
399             S s = ci.getPromotedValueFromProto(t);
400             return s;
401           }
402         }, sumCallBack);
403     return sumCallBack.getSumResult();
404   }
405 
406   /**
407    * It computes average while fetching sum and row count from all the
408    * corresponding regions. Approach is to compute a global sum of region level
409    * sum and rowcount and then compute the average.
410    * @param tableName
411    * @param scan
412    * @throws Throwable
413    */
414   private <R, S, P extends Message, Q extends Message, T extends Message> Pair<S, Long> getAvgArgs(
415       final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
416       throws Throwable {
417     try (Table table = connection.getTable(tableName)) {
418         return getAvgArgs(table, ci, scan);
419     }
420   }
421 
422   /**
423    * It computes average while fetching sum and row count from all the
424    * corresponding regions. Approach is to compute a global sum of region level
425    * sum and rowcount and then compute the average.
426    * @param table
427    * @param scan
428    * @throws Throwable
429    */
430   private <R, S, P extends Message, Q extends Message, T extends Message>
431   Pair<S, Long> getAvgArgs(final Table table,
432       final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable {
433     final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false);
434     class AvgCallBack implements Batch.Callback<Pair<S, Long>> {
435       S sum = null;
436       Long rowCount = 0l;
437 
438       public synchronized Pair<S, Long> getAvgArgs() {
439         return new Pair<S, Long>(sum, rowCount);
440       }
441 
442       @Override
443       public synchronized void update(byte[] region, byte[] row, Pair<S, Long> result) {
444         sum = ci.add(sum, result.getFirst());
445         rowCount += result.getSecond();
446       }
447     }
448     AvgCallBack avgCallBack = new AvgCallBack();
449     table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
450         new Batch.Call<AggregateService, Pair<S, Long>>() {
451           @Override
452           public Pair<S, Long> call(AggregateService instance) throws IOException {
453             ServerRpcController controller = new ServerRpcController();
454             BlockingRpcCallback<AggregateResponse> rpcCallback = 
455                 new BlockingRpcCallback<AggregateResponse>();
456             instance.getAvg(controller, requestArg, rpcCallback);
457             AggregateResponse response = rpcCallback.get();
458             if (controller.failedOnException()) {
459               throw controller.getFailedOn();
460             }
461             Pair<S, Long> pair = new Pair<S, Long>(null, 0L);
462             if (response.getFirstPartCount() == 0) {
463               return pair;
464             }
465             ByteString b = response.getFirstPart(0);
466             T t = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 4, b);
467             S s = ci.getPromotedValueFromProto(t);
468             pair.setFirst(s);
469             ByteBuffer bb = ByteBuffer.allocate(8).put(
470                 getBytesFromResponse(response.getSecondPart()));
471             bb.rewind();
472             pair.setSecond(bb.getLong());
473             return pair;
474           }
475         }, avgCallBack);
476     return avgCallBack.getAvgArgs();
477   }
478 
479   /**
480    * This is the client side interface/handle for calling the average method for
481    * a given cf-cq combination. It was necessary to add one more call stack as
482    * its return type should be a decimal value, irrespective of what
483    * columninterpreter says. So, this methods collects the necessary parameters
484    * to compute the average and returs the double value.
485    * @param tableName
486    * @param ci
487    * @param scan
488    * @return <R, S>
489    * @throws Throwable
490    */
491   public <R, S, P extends Message, Q extends Message, T extends Message>
492   double avg(final TableName tableName,
493       final ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) throws Throwable {
494     Pair<S, Long> p = getAvgArgs(tableName, ci, scan);
495     return ci.divideForAvg(p.getFirst(), p.getSecond());
496   }
497 
498   /**
499    * This is the client side interface/handle for calling the average method for
500    * a given cf-cq combination. It was necessary to add one more call stack as
501    * its return type should be a decimal value, irrespective of what
502    * columninterpreter says. So, this methods collects the necessary parameters
503    * to compute the average and returs the double value.
504    * @param table
505    * @param ci
506    * @param scan
507    * @return <R, S>
508    * @throws Throwable
509    */
510   public <R, S, P extends Message, Q extends Message, T extends Message> double avg(
511       final Table table, final ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) throws Throwable {
512     Pair<S, Long> p = getAvgArgs(table, ci, scan);
513     return ci.divideForAvg(p.getFirst(), p.getSecond());
514   }
515 
516   /**
517    * It computes a global standard deviation for a given column and its value.
518    * Standard deviation is square root of (average of squares -
519    * average*average). From individual regions, it obtains sum, square sum and
520    * number of rows. With these, the above values are computed to get the global
521    * std.
522    * @param table
523    * @param scan
524    * @return standard deviations
525    * @throws Throwable
526    */
527   private <R, S, P extends Message, Q extends Message, T extends Message>
528   Pair<List<S>, Long> getStdArgs(final Table table,
529       final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable {
530     final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false);
531     class StdCallback implements Batch.Callback<Pair<List<S>, Long>> {
532       long rowCountVal = 0l;
533       S sumVal = null, sumSqVal = null;
534 
535       public synchronized Pair<List<S>, Long> getStdParams() {
536         List<S> l = new ArrayList<S>();
537         l.add(sumVal);
538         l.add(sumSqVal);
539         Pair<List<S>, Long> p = new Pair<List<S>, Long>(l, rowCountVal);
540         return p;
541       }
542 
543       @Override
544       public synchronized void update(byte[] region, byte[] row, Pair<List<S>, Long> result) {
545         if (result.getFirst().size() > 0) {
546           sumVal = ci.add(sumVal, result.getFirst().get(0));
547           sumSqVal = ci.add(sumSqVal, result.getFirst().get(1));
548           rowCountVal += result.getSecond();
549         }
550       }
551     }
552     StdCallback stdCallback = new StdCallback();
553     table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
554         new Batch.Call<AggregateService, Pair<List<S>, Long>>() {
555           @Override
556           public Pair<List<S>, Long> call(AggregateService instance) throws IOException {
557             ServerRpcController controller = new ServerRpcController();
558             BlockingRpcCallback<AggregateResponse> rpcCallback = 
559                 new BlockingRpcCallback<AggregateResponse>();
560             instance.getStd(controller, requestArg, rpcCallback);
561             AggregateResponse response = rpcCallback.get();
562             if (controller.failedOnException()) {
563               throw controller.getFailedOn();
564             }
565             Pair<List<S>, Long> pair = new Pair<List<S>, Long>(new ArrayList<S>(), 0L);
566             if (response.getFirstPartCount() == 0) {
567               return pair;
568             }
569             List<S> list = new ArrayList<S>();
570             for (int i = 0; i < response.getFirstPartCount(); i++) {
571               ByteString b = response.getFirstPart(i);
572               T t = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 4, b);
573               S s = ci.getPromotedValueFromProto(t);
574               list.add(s);
575             }
576             pair.setFirst(list);
577             ByteBuffer bb = ByteBuffer.allocate(8).put(
578                 getBytesFromResponse(response.getSecondPart()));
579             bb.rewind();
580             pair.setSecond(bb.getLong());
581             return pair;
582           }
583         }, stdCallback);
584     return stdCallback.getStdParams();
585   }
586 
587   /**
588    * This is the client side interface/handle for calling the std method for a
589    * given cf-cq combination. It was necessary to add one more call stack as its
590    * return type should be a decimal value, irrespective of what
591    * columninterpreter says. So, this methods collects the necessary parameters
592    * to compute the std and returns the double value.
593    * @param tableName
594    * @param ci
595    * @param scan
596    * @return <R, S>
597    * @throws Throwable
598    */
599   public <R, S, P extends Message, Q extends Message, T extends Message>
600   double std(final TableName tableName, ColumnInterpreter<R, S, P, Q, T> ci,
601       Scan scan) throws Throwable {
602     try (Table table = connection.getTable(tableName)) {
603         return std(table, ci, scan);
604     }
605   }
606 
607   /**
608    * This is the client side interface/handle for calling the std method for a
609    * given cf-cq combination. It was necessary to add one more call stack as its
610    * return type should be a decimal value, irrespective of what
611    * columninterpreter says. So, this methods collects the necessary parameters
612    * to compute the std and returns the double value.
613    * @param table
614    * @param ci
615    * @param scan
616    * @return <R, S>
617    * @throws Throwable
618    */
619   public <R, S, P extends Message, Q extends Message, T extends Message> double std(
620       final Table table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) throws Throwable {
621     Pair<List<S>, Long> p = getStdArgs(table, ci, scan);
622     double res = 0d;
623     double avg = ci.divideForAvg(p.getFirst().get(0), p.getSecond());
624     double avgOfSumSq = ci.divideForAvg(p.getFirst().get(1), p.getSecond());
625     res = avgOfSumSq - (avg) * (avg); // variance
626     res = Math.pow(res, 0.5);
627     return res;
628   }
629 
630   /**
631    * It helps locate the region with median for a given column whose weight 
632    * is specified in an optional column.
633    * From individual regions, it obtains sum of values and sum of weights.
634    * @param table
635    * @param ci
636    * @param scan
637    * @return pair whose first element is a map between start row of the region
638    *  and (sum of values, sum of weights) for the region, the second element is
639    *  (sum of values, sum of weights) for all the regions chosen
640    * @throws Throwable
641    */
642   private <R, S, P extends Message, Q extends Message, T extends Message>
643   Pair<NavigableMap<byte[], List<S>>, List<S>>
644   getMedianArgs(final Table table,
645       final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable {
646     final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false);
647     final NavigableMap<byte[], List<S>> map =
648       new TreeMap<byte[], List<S>>(Bytes.BYTES_COMPARATOR);
649     class StdCallback implements Batch.Callback<List<S>> {
650       S sumVal = null, sumWeights = null;
651 
652       public synchronized Pair<NavigableMap<byte[], List<S>>, List<S>> getMedianParams() {
653         List<S> l = new ArrayList<S>();
654         l.add(sumVal);
655         l.add(sumWeights);
656         Pair<NavigableMap<byte[], List<S>>, List<S>> p =
657           new Pair<NavigableMap<byte[], List<S>>, List<S>>(map, l);
658         return p;
659       }
660 
661       @Override
662       public synchronized void update(byte[] region, byte[] row, List<S> result) {
663         map.put(row, result);
664         sumVal = ci.add(sumVal, result.get(0));
665         sumWeights = ci.add(sumWeights, result.get(1));
666       }
667     }
668     StdCallback stdCallback = new StdCallback();
669     table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
670         new Batch.Call<AggregateService, List<S>>() {
671           @Override
672           public List<S> call(AggregateService instance) throws IOException {
673             ServerRpcController controller = new ServerRpcController();
674             BlockingRpcCallback<AggregateResponse> rpcCallback = 
675                 new BlockingRpcCallback<AggregateResponse>();
676             instance.getMedian(controller, requestArg, rpcCallback);
677             AggregateResponse response = rpcCallback.get();
678             if (controller.failedOnException()) {
679               throw controller.getFailedOn();
680             }
681 
682             List<S> list = new ArrayList<S>();
683             for (int i = 0; i < response.getFirstPartCount(); i++) {
684               ByteString b = response.getFirstPart(i);
685               T t = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 4, b);
686               S s = ci.getPromotedValueFromProto(t);
687               list.add(s);
688             }
689             return list;
690           }
691 
692         }, stdCallback);
693     return stdCallback.getMedianParams();
694   }
695 
696   /**
697    * This is the client side interface/handler for calling the median method for a
698    * given cf-cq combination. This method collects the necessary parameters
699    * to compute the median and returns the median.
700    * @param tableName
701    * @param ci
702    * @param scan
703    * @return R the median
704    * @throws Throwable
705    */
706   public <R, S, P extends Message, Q extends Message, T extends Message>
707   R median(final TableName tableName, ColumnInterpreter<R, S, P, Q, T> ci,
708       Scan scan) throws Throwable {
709     try (Table table = connection.getTable(tableName)) {
710         return median(table, ci, scan);
711     }
712   }
713 
714   /**
715    * This is the client side interface/handler for calling the median method for a
716    * given cf-cq combination. This method collects the necessary parameters
717    * to compute the median and returns the median.
718    * @param table
719    * @param ci
720    * @param scan
721    * @return R the median
722    * @throws Throwable
723    */
724   public <R, S, P extends Message, Q extends Message, T extends Message>
725   R median(final Table table, ColumnInterpreter<R, S, P, Q, T> ci,
726       Scan scan) throws Throwable {
727     Pair<NavigableMap<byte[], List<S>>, List<S>> p = getMedianArgs(table, ci, scan);
728     byte[] startRow = null;
729     byte[] colFamily = scan.getFamilies()[0];
730     NavigableSet<byte[]> quals = scan.getFamilyMap().get(colFamily);
731     NavigableMap<byte[], List<S>> map = p.getFirst();
732     S sumVal = p.getSecond().get(0);
733     S sumWeights = p.getSecond().get(1);
734     double halfSumVal = ci.divideForAvg(sumVal, 2L);
735     double movingSumVal = 0;
736     boolean weighted = false;
737     if (quals.size() > 1) {
738       weighted = true;
739       halfSumVal = ci.divideForAvg(sumWeights, 2L);
740     }
741     
742     for (Map.Entry<byte[], List<S>> entry : map.entrySet()) {
743       S s = weighted ? entry.getValue().get(1) : entry.getValue().get(0);
744       double newSumVal = movingSumVal + ci.divideForAvg(s, 1L);
745       if (newSumVal > halfSumVal) break;  // we found the region with the median
746       movingSumVal = newSumVal;
747       startRow = entry.getKey();
748     }
749     // scan the region with median and find it
750     Scan scan2 = new Scan(scan);
751     // inherit stop row from method parameter
752     if (startRow != null) scan2.setStartRow(startRow);
753     ResultScanner scanner = null;
754     try {
755       int cacheSize = scan2.getCaching();
756       if (!scan2.getCacheBlocks() || scan2.getCaching() < 2) {
757         scan2.setCacheBlocks(true);
758         cacheSize = 5;
759         scan2.setCaching(cacheSize);
760       }
761       scanner = table.getScanner(scan2);
762       Result[] results = null;
763       byte[] qualifier = quals.pollFirst();
764       // qualifier for the weight column
765       byte[] weightQualifier = weighted ? quals.pollLast() : qualifier;
766       R value = null;
767       do {
768         results = scanner.next(cacheSize);
769         if (results != null && results.length > 0) {
770           for (int i = 0; i < results.length; i++) {
771             Result r = results[i];
772             // retrieve weight
773             Cell kv = r.getColumnLatestCell(colFamily, weightQualifier);
774             R newValue = ci.getValue(colFamily, weightQualifier, kv);
775             S s = ci.castToReturnType(newValue);
776             double newSumVal = movingSumVal + ci.divideForAvg(s, 1L);
777             // see if we have moved past the median
778             if (newSumVal > halfSumVal) {
779               return value;
780             }
781             movingSumVal = newSumVal;
782             kv = r.getColumnLatestCell(colFamily, qualifier);
783             value = ci.getValue(colFamily, qualifier, kv);
784             }
785           }
786       } while (results != null && results.length > 0);
787     } finally {
788       if (scanner != null) {
789         scanner.close();
790       }
791     }
792     return null;
793   }
794 
795   <R, S, P extends Message, Q extends Message, T extends Message> AggregateRequest 
796   validateArgAndGetPB(Scan scan, ColumnInterpreter<R,S,P,Q,T> ci, boolean canFamilyBeAbsent)
797       throws IOException {
798     validateParameters(scan, canFamilyBeAbsent);
799     final AggregateRequest.Builder requestBuilder = 
800         AggregateRequest.newBuilder();
801     requestBuilder.setInterpreterClassName(ci.getClass().getCanonicalName());
802     P columnInterpreterSpecificData = null;
803     if ((columnInterpreterSpecificData = ci.getRequestData()) 
804        != null) {
805       requestBuilder.setInterpreterSpecificBytes(columnInterpreterSpecificData.toByteString());
806     }
807     requestBuilder.setScan(ProtobufUtil.toScan(scan));
808     return requestBuilder.build();
809   }
810 
811   byte[] getBytesFromResponse(ByteString response) {
812     ByteBuffer bb = response.asReadOnlyByteBuffer();
813     bb.rewind();
814     byte[] bytes;
815     if (bb.hasArray()) {
816       bytes = bb.array();
817     } else {
818       bytes = response.toByteArray();
819     }
820     return bytes;
821   }
822 }