View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.client.coprocessor;
21  
22  import java.io.IOException;
23  import java.nio.ByteBuffer;
24  import java.util.ArrayList;
25  import java.util.List;
26  import java.util.Map;
27  import java.util.NavigableMap;
28  import java.util.NavigableSet;
29  import java.util.TreeMap;
30  import java.util.concurrent.atomic.AtomicLong;
31  
32  import org.apache.commons.logging.Log;
33  import org.apache.commons.logging.LogFactory;
34  import org.apache.hadoop.classification.InterfaceAudience;
35  import org.apache.hadoop.conf.Configuration;
36  import org.apache.hadoop.hbase.Cell;
37  import org.apache.hadoop.hbase.HConstants;
38  import org.apache.hadoop.hbase.TableName;
39  import org.apache.hadoop.hbase.client.HTable;
40  import org.apache.hadoop.hbase.client.Result;
41  import org.apache.hadoop.hbase.client.ResultScanner;
42  import org.apache.hadoop.hbase.client.Scan;
43  import org.apache.hadoop.hbase.coprocessor.ColumnInterpreter;
44  import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
45  import org.apache.hadoop.hbase.ipc.ServerRpcController;
46  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
47  import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateRequest;
48  import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateResponse;
49  import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateService;
50  import org.apache.hadoop.hbase.util.Bytes;
51  import org.apache.hadoop.hbase.util.Pair;
52  
53  import com.google.protobuf.ByteString;
54  import com.google.protobuf.Message;
55  
56  /**
57   * This client class is for invoking the aggregate functions deployed on the
58   * Region Server side via the AggregateService. This class will implement the
59   * supporting functionality for summing/processing the individual results
60   * obtained from the AggregateService for each region.
61   * <p>
62   * This will serve as the client side handler for invoking the aggregate
63   * functions.
64   * <ul>
65   * For all aggregate functions,
66   * <li>start row < end row is an essential condition (if they are not
67   * {@link HConstants#EMPTY_BYTE_ARRAY})
68   * <li>Column family can't be null. In case where multiple families are
69   * provided, an IOException will be thrown. An optional column qualifier can
70   * also be defined.
71   * <li>For methods to find maximum, minimum, sum, rowcount, it returns the
72   * parameter type. For average and std, it returns a double value. For row
73   * count, it returns a long value.
74   */
75  @InterfaceAudience.Private
76  public class AggregationClient {
77  
78    private static final Log log = LogFactory.getLog(AggregationClient.class);
79    Configuration conf;
80  
81    /**
82     * Constructor with Conf object
83     * @param cfg
84     */
85    public AggregationClient(Configuration cfg) {
86      this.conf = cfg;
87    }
88  
89    /**
90     * It gives the maximum value of a column for a given column family for the
91     * given range. In case qualifier is null, a max of all values for the given
92     * family is returned.
93     * @param tableName
94     * @param ci
95     * @param scan
96     * @return max val <R>
97     * @throws Throwable
98     *           The caller is supposed to handle the exception as they are thrown
99     *           & propagated to it.
100    */
101   public <R, S, P extends Message, Q extends Message, T extends Message> R max(
102       final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
103       throws Throwable {
104     HTable table = null;
105     try {
106       table = new HTable(conf, tableName);
107       return max(table, ci, scan);
108     } finally {
109       if (table != null) {
110         table.close();
111       }
112     }
113   }
114 
115   /**
116    * It gives the maximum value of a column for a given column family for the
117    * given range. In case qualifier is null, a max of all values for the given
118    * family is returned.
119    * @param table
120    * @param ci
121    * @param scan
122    * @return max val <R>
123    * @throws Throwable
124    *           The caller is supposed to handle the exception as they are thrown
125    *           & propagated to it.
126    */
127   public <R, S, P extends Message, Q extends Message, T extends Message> 
128   R max(final HTable table, final ColumnInterpreter<R, S, P, Q, T> ci,
129       final Scan scan) throws Throwable {
130     final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false);
131     class MaxCallBack implements Batch.Callback<R> {
132       R max = null;
133 
134       R getMax() {
135         return max;
136       }
137 
138       @Override
139       public synchronized void update(byte[] region, byte[] row, R result) {
140         max = (max == null || (result != null && ci.compare(max, result) < 0)) ? result : max;
141       }
142     }
143     MaxCallBack aMaxCallBack = new MaxCallBack();
144     table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
145         new Batch.Call<AggregateService, R>() {
146           @Override
147           public R call(AggregateService instance) throws IOException {
148             ServerRpcController controller = new ServerRpcController();
149             BlockingRpcCallback<AggregateResponse> rpcCallback = 
150                 new BlockingRpcCallback<AggregateResponse>();
151             instance.getMax(controller, requestArg, rpcCallback);
152             AggregateResponse response = rpcCallback.get();
153             if (controller.failedOnException()) {
154               throw controller.getFailedOn();
155             }
156             if (response.getFirstPartCount() > 0) {
157               ByteString b = response.getFirstPart(0);
158               Q q = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 3, b);
159               return ci.getCellValueFromProto(q);
160             }
161             return null;
162           }
163         }, aMaxCallBack);
164     return aMaxCallBack.getMax();
165   }
166 
167   /*
168    * @param scan
169    * @param canFamilyBeAbsent whether column family can be absent in familyMap of scan
170    */
171   private void validateParameters(Scan scan, boolean canFamilyBeAbsent) throws IOException {
172     if (scan == null
173         || (Bytes.equals(scan.getStartRow(), scan.getStopRow()) && !Bytes
174             .equals(scan.getStartRow(), HConstants.EMPTY_START_ROW))
175         || ((Bytes.compareTo(scan.getStartRow(), scan.getStopRow()) > 0) &&
176         	!Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW))) {
177       throw new IOException(
178           "Agg client Exception: Startrow should be smaller than Stoprow");
179     } else if (!canFamilyBeAbsent) {
180       if (scan.getFamilyMap().size() != 1) {
181         throw new IOException("There must be only one family.");
182       }
183     }
184   }
185 
186   /**
187    * It gives the minimum value of a column for a given column family for the
188    * given range. In case qualifier is null, a min of all values for the given
189    * family is returned.
190    * @param tableName
191    * @param ci
192    * @param scan
193    * @return min val <R>
194    * @throws Throwable
195    */
196   public <R, S, P extends Message, Q extends Message, T extends Message> R min(
197       final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
198       throws Throwable {
199     HTable table = null;
200     try {
201       table = new HTable(conf, tableName);
202       return min(table, ci, scan);
203     } finally {
204       if (table != null) {
205         table.close();
206       }
207     }
208   }
209 
210   /**
211    * It gives the minimum value of a column for a given column family for the
212    * given range. In case qualifier is null, a min of all values for the given
213    * family is returned.
214    * @param table
215    * @param ci
216    * @param scan
217    * @return min val <R>
218    * @throws Throwable
219    */
220   public <R, S, P extends Message, Q extends Message, T extends Message> 
221   R min(final HTable table, final ColumnInterpreter<R, S, P, Q, T> ci,
222       final Scan scan) throws Throwable {
223     final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false);
224     class MinCallBack implements Batch.Callback<R> {
225 
226       private R min = null;
227 
228       public R getMinimum() {
229         return min;
230       }
231 
232       @Override
233       public synchronized void update(byte[] region, byte[] row, R result) {
234         min = (min == null || (result != null && ci.compare(result, min) < 0)) ? result : min;
235       }
236     }
237     MinCallBack minCallBack = new MinCallBack();
238     table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
239         new Batch.Call<AggregateService, R>() {
240 
241           @Override
242           public R call(AggregateService instance) throws IOException {
243             ServerRpcController controller = new ServerRpcController();
244             BlockingRpcCallback<AggregateResponse> rpcCallback = 
245                 new BlockingRpcCallback<AggregateResponse>();
246             instance.getMin(controller, requestArg, rpcCallback);
247             AggregateResponse response = rpcCallback.get();
248             if (controller.failedOnException()) {
249               throw controller.getFailedOn();
250             }
251             if (response.getFirstPartCount() > 0) {
252               ByteString b = response.getFirstPart(0);
253               Q q = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 3, b);
254               return ci.getCellValueFromProto(q);
255             }
256             return null;
257           }
258         }, minCallBack);
259     log.debug("Min fom all regions is: " + minCallBack.getMinimum());
260     return minCallBack.getMinimum();
261   }
262 
263   /**
264    * It gives the row count, by summing up the individual results obtained from
265    * regions. In case the qualifier is null, FirstKeyValueFilter is used to
266    * optimised the operation. In case qualifier is provided, I can't use the
267    * filter as it may set the flag to skip to next row, but the value read is
268    * not of the given filter: in this case, this particular row will not be
269    * counted ==> an error.
270    * @param tableName
271    * @param ci
272    * @param scan
273    * @return <R, S>
274    * @throws Throwable
275    */
276   public <R, S, P extends Message, Q extends Message, T extends Message> long rowCount(
277       final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
278       throws Throwable {
279     HTable table = null;
280     try {
281       table = new HTable(conf, tableName);
282       return rowCount(table, ci, scan);
283     } finally {
284       if (table != null) {
285         table.close();
286       }
287     }
288   }
289 
290   /**
291    * It gives the row count, by summing up the individual results obtained from
292    * regions. In case the qualifier is null, FirstKeyValueFilter is used to
293    * optimised the operation. In case qualifier is provided, I can't use the
294    * filter as it may set the flag to skip to next row, but the value read is
295    * not of the given filter: in this case, this particular row will not be
296    * counted ==> an error.
297    * @param table
298    * @param ci
299    * @param scan
300    * @return <R, S>
301    * @throws Throwable
302    */
303   public <R, S, P extends Message, Q extends Message, T extends Message> 
304   long rowCount(final HTable table,
305       final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable {
306     final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, true);
307     class RowNumCallback implements Batch.Callback<Long> {
308       private final AtomicLong rowCountL = new AtomicLong(0);
309 
310       public long getRowNumCount() {
311         return rowCountL.get();
312       }
313 
314       @Override
315       public void update(byte[] region, byte[] row, Long result) {
316         rowCountL.addAndGet(result.longValue());
317       }
318     }
319     RowNumCallback rowNum = new RowNumCallback();
320     table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
321         new Batch.Call<AggregateService, Long>() {
322           @Override
323           public Long call(AggregateService instance) throws IOException {
324             ServerRpcController controller = new ServerRpcController();
325             BlockingRpcCallback<AggregateResponse> rpcCallback = 
326                 new BlockingRpcCallback<AggregateResponse>();
327             instance.getRowNum(controller, requestArg, rpcCallback);
328             AggregateResponse response = rpcCallback.get();
329             if (controller.failedOnException()) {
330               throw controller.getFailedOn();
331             }
332             byte[] bytes = getBytesFromResponse(response.getFirstPart(0));
333             ByteBuffer bb = ByteBuffer.allocate(8).put(bytes);
334             bb.rewind();
335             return bb.getLong();
336           }
337         }, rowNum);
338     return rowNum.getRowNumCount();
339   }
340 
341   /**
342    * It sums up the value returned from various regions. In case qualifier is
343    * null, summation of all the column qualifiers in the given family is done.
344    * @param tableName
345    * @param ci
346    * @param scan
347    * @return sum <S>
348    * @throws Throwable
349    */
350   public <R, S, P extends Message, Q extends Message, T extends Message> S sum(
351       final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
352       throws Throwable {
353     HTable table = null;
354     try {
355       table = new HTable(conf, tableName);
356       return sum(table, ci, scan);
357     } finally {
358       if (table != null) {
359         table.close();
360       }
361     }
362   }
363 
364   /**
365    * It sums up the value returned from various regions. In case qualifier is
366    * null, summation of all the column qualifiers in the given family is done.
367    * @param table
368    * @param ci
369    * @param scan
370    * @return sum <S>
371    * @throws Throwable
372    */
373   public <R, S, P extends Message, Q extends Message, T extends Message> 
374   S sum(final HTable table, final ColumnInterpreter<R, S, P, Q, T> ci,
375       final Scan scan) throws Throwable {
376     final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false);
377     
378     class SumCallBack implements Batch.Callback<S> {
379       S sumVal = null;
380 
381       public S getSumResult() {
382         return sumVal;
383       }
384 
385       @Override
386       public synchronized void update(byte[] region, byte[] row, S result) {
387         sumVal = ci.add(sumVal, result);
388       }
389     }
390     SumCallBack sumCallBack = new SumCallBack();
391     table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
392         new Batch.Call<AggregateService, S>() {
393           @Override
394           public S call(AggregateService instance) throws IOException {
395             ServerRpcController controller = new ServerRpcController();
396             BlockingRpcCallback<AggregateResponse> rpcCallback = 
397                 new BlockingRpcCallback<AggregateResponse>();
398             instance.getSum(controller, requestArg, rpcCallback);
399             AggregateResponse response = rpcCallback.get();
400             if (controller.failedOnException()) {
401               throw controller.getFailedOn();
402             }
403             if (response.getFirstPartCount() == 0) {
404               return null;
405             }
406             ByteString b = response.getFirstPart(0);
407             T t = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 4, b);
408             S s = ci.getPromotedValueFromProto(t);
409             return s;
410           }
411         }, sumCallBack);
412     return sumCallBack.getSumResult();
413   }
414 
415   /**
416    * It computes average while fetching sum and row count from all the
417    * corresponding regions. Approach is to compute a global sum of region level
418    * sum and rowcount and then compute the average.
419    * @param tableName
420    * @param scan
421    * @throws Throwable
422    */
423   private <R, S, P extends Message, Q extends Message, T extends Message> Pair<S, Long> getAvgArgs(
424       final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
425       throws Throwable {
426     HTable table = null;
427     try {
428       table = new HTable(conf, tableName);
429       return getAvgArgs(table, ci, scan);
430     } finally {
431       if (table != null) {
432         table.close();
433       }
434     }
435   }
436 
437   /**
438    * It computes average while fetching sum and row count from all the
439    * corresponding regions. Approach is to compute a global sum of region level
440    * sum and rowcount and then compute the average.
441    * @param table
442    * @param scan
443    * @throws Throwable
444    */
445   private <R, S, P extends Message, Q extends Message, T extends Message>
446   Pair<S, Long> getAvgArgs(final HTable table,
447       final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable {
448     final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false);
449     class AvgCallBack implements Batch.Callback<Pair<S, Long>> {
450       S sum = null;
451       Long rowCount = 0l;
452 
453       public synchronized Pair<S, Long> getAvgArgs() {
454         return new Pair<S, Long>(sum, rowCount);
455       }
456 
457       @Override
458       public synchronized void update(byte[] region, byte[] row, Pair<S, Long> result) {
459         sum = ci.add(sum, result.getFirst());
460         rowCount += result.getSecond();
461       }
462     }
463     AvgCallBack avgCallBack = new AvgCallBack();
464     table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
465         new Batch.Call<AggregateService, Pair<S, Long>>() {
466           @Override
467           public Pair<S, Long> call(AggregateService instance) throws IOException {
468             ServerRpcController controller = new ServerRpcController();
469             BlockingRpcCallback<AggregateResponse> rpcCallback = 
470                 new BlockingRpcCallback<AggregateResponse>();
471             instance.getAvg(controller, requestArg, rpcCallback);
472             AggregateResponse response = rpcCallback.get();
473             if (controller.failedOnException()) {
474               throw controller.getFailedOn();
475             }
476             Pair<S, Long> pair = new Pair<S, Long>(null, 0L);
477             if (response.getFirstPartCount() == 0) {
478               return pair;
479             }
480             ByteString b = response.getFirstPart(0);
481             T t = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 4, b);
482             S s = ci.getPromotedValueFromProto(t);
483             pair.setFirst(s);
484             ByteBuffer bb = ByteBuffer.allocate(8).put(
485                 getBytesFromResponse(response.getSecondPart()));
486             bb.rewind();
487             pair.setSecond(bb.getLong());
488             return pair;
489           }
490         }, avgCallBack);
491     return avgCallBack.getAvgArgs();
492   }
493 
494   /**
495    * This is the client side interface/handle for calling the average method for
496    * a given cf-cq combination. It was necessary to add one more call stack as
497    * its return type should be a decimal value, irrespective of what
498    * columninterpreter says. So, this methods collects the necessary parameters
499    * to compute the average and returs the double value.
500    * @param tableName
501    * @param ci
502    * @param scan
503    * @return <R, S>
504    * @throws Throwable
505    */
506   public <R, S, P extends Message, Q extends Message, T extends Message>
507   double avg(final TableName tableName,
508       final ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) throws Throwable {
509     Pair<S, Long> p = getAvgArgs(tableName, ci, scan);
510     return ci.divideForAvg(p.getFirst(), p.getSecond());
511   }
512 
513   /**
514    * This is the client side interface/handle for calling the average method for
515    * a given cf-cq combination. It was necessary to add one more call stack as
516    * its return type should be a decimal value, irrespective of what
517    * columninterpreter says. So, this methods collects the necessary parameters
518    * to compute the average and returs the double value.
519    * @param table
520    * @param ci
521    * @param scan
522    * @return <R, S>
523    * @throws Throwable
524    */
525   public <R, S, P extends Message, Q extends Message, T extends Message> double avg(
526       final HTable table, final ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) throws Throwable {
527     Pair<S, Long> p = getAvgArgs(table, ci, scan);
528     return ci.divideForAvg(p.getFirst(), p.getSecond());
529   }
530 
531   /**
532    * It computes a global standard deviation for a given column and its value.
533    * Standard deviation is square root of (average of squares -
534    * average*average). From individual regions, it obtains sum, square sum and
535    * number of rows. With these, the above values are computed to get the global
536    * std.
537    * @param table
538    * @param scan
539    * @return standard deviations
540    * @throws Throwable
541    */
542   private <R, S, P extends Message, Q extends Message, T extends Message>
543   Pair<List<S>, Long> getStdArgs(final HTable table,
544       final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable {
545     final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false);
546     class StdCallback implements Batch.Callback<Pair<List<S>, Long>> {
547       long rowCountVal = 0l;
548       S sumVal = null, sumSqVal = null;
549 
550       public synchronized Pair<List<S>, Long> getStdParams() {
551         List<S> l = new ArrayList<S>();
552         l.add(sumVal);
553         l.add(sumSqVal);
554         Pair<List<S>, Long> p = new Pair<List<S>, Long>(l, rowCountVal);
555         return p;
556       }
557 
558       @Override
559       public synchronized void update(byte[] region, byte[] row, Pair<List<S>, Long> result) {
560         if (result.getFirst().size() > 0) {
561           sumVal = ci.add(sumVal, result.getFirst().get(0));
562           sumSqVal = ci.add(sumSqVal, result.getFirst().get(1));
563           rowCountVal += result.getSecond();
564         }
565       }
566     }
567     StdCallback stdCallback = new StdCallback();
568     table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
569         new Batch.Call<AggregateService, Pair<List<S>, Long>>() {
570           @Override
571           public Pair<List<S>, Long> call(AggregateService instance) throws IOException {
572             ServerRpcController controller = new ServerRpcController();
573             BlockingRpcCallback<AggregateResponse> rpcCallback = 
574                 new BlockingRpcCallback<AggregateResponse>();
575             instance.getStd(controller, requestArg, rpcCallback);
576             AggregateResponse response = rpcCallback.get();
577             if (controller.failedOnException()) {
578               throw controller.getFailedOn();
579             }
580             Pair<List<S>, Long> pair = new Pair<List<S>, Long>(new ArrayList<S>(), 0L);
581             if (response.getFirstPartCount() == 0) {
582               return pair;
583             }
584             List<S> list = new ArrayList<S>();
585             for (int i = 0; i < response.getFirstPartCount(); i++) {
586               ByteString b = response.getFirstPart(i);
587               T t = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 4, b);
588               S s = ci.getPromotedValueFromProto(t);
589               list.add(s);
590             }
591             pair.setFirst(list);
592             ByteBuffer bb = ByteBuffer.allocate(8).put(
593                 getBytesFromResponse(response.getSecondPart()));
594             bb.rewind();
595             pair.setSecond(bb.getLong());
596             return pair;
597           }
598         }, stdCallback);
599     return stdCallback.getStdParams();
600   }
601 
602   /**
603    * This is the client side interface/handle for calling the std method for a
604    * given cf-cq combination. It was necessary to add one more call stack as its
605    * return type should be a decimal value, irrespective of what
606    * columninterpreter says. So, this methods collects the necessary parameters
607    * to compute the std and returns the double value.
608    * @param tableName
609    * @param ci
610    * @param scan
611    * @return <R, S>
612    * @throws Throwable
613    */
614   public <R, S, P extends Message, Q extends Message, T extends Message>
615   double std(final TableName tableName, ColumnInterpreter<R, S, P, Q, T> ci,
616       Scan scan) throws Throwable {
617     HTable table = null;
618     try {
619       table = new HTable(conf, tableName);
620       return std(table, ci, scan);
621     } finally {
622       if (table != null) {
623         table.close();
624       }
625     }
626   }
627 
628   /**
629    * This is the client side interface/handle for calling the std method for a
630    * given cf-cq combination. It was necessary to add one more call stack as its
631    * return type should be a decimal value, irrespective of what
632    * columninterpreter says. So, this methods collects the necessary parameters
633    * to compute the std and returns the double value.
634    * @param table
635    * @param ci
636    * @param scan
637    * @return <R, S>
638    * @throws Throwable
639    */
640   public <R, S, P extends Message, Q extends Message, T extends Message> double std(
641       final HTable table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) throws Throwable {
642     Pair<List<S>, Long> p = getStdArgs(table, ci, scan);
643     double res = 0d;
644     double avg = ci.divideForAvg(p.getFirst().get(0), p.getSecond());
645     double avgOfSumSq = ci.divideForAvg(p.getFirst().get(1), p.getSecond());
646     res = avgOfSumSq - (avg) * (avg); // variance
647     res = Math.pow(res, 0.5);
648     return res;
649   }
650 
651   /**
652    * It helps locate the region with median for a given column whose weight 
653    * is specified in an optional column.
654    * From individual regions, it obtains sum of values and sum of weights.
655    * @param table
656    * @param ci
657    * @param scan
658    * @return pair whose first element is a map between start row of the region
659    *  and (sum of values, sum of weights) for the region, the second element is
660    *  (sum of values, sum of weights) for all the regions chosen
661    * @throws Throwable
662    */
663   private <R, S, P extends Message, Q extends Message, T extends Message>
664   Pair<NavigableMap<byte[], List<S>>, List<S>>
665   getMedianArgs(final HTable table,
666       final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable {
667     final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false);
668     final NavigableMap<byte[], List<S>> map =
669       new TreeMap<byte[], List<S>>(Bytes.BYTES_COMPARATOR);
670     class StdCallback implements Batch.Callback<List<S>> {
671       S sumVal = null, sumWeights = null;
672 
673       public synchronized Pair<NavigableMap<byte[], List<S>>, List<S>> getMedianParams() {
674         List<S> l = new ArrayList<S>();
675         l.add(sumVal);
676         l.add(sumWeights);
677         Pair<NavigableMap<byte[], List<S>>, List<S>> p =
678           new Pair<NavigableMap<byte[], List<S>>, List<S>>(map, l);
679         return p;
680       }
681 
682       @Override
683       public synchronized void update(byte[] region, byte[] row, List<S> result) {
684         map.put(row, result);
685         sumVal = ci.add(sumVal, result.get(0));
686         sumWeights = ci.add(sumWeights, result.get(1));
687       }
688     }
689     StdCallback stdCallback = new StdCallback();
690     table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
691         new Batch.Call<AggregateService, List<S>>() {
692           @Override
693           public List<S> call(AggregateService instance) throws IOException {
694             ServerRpcController controller = new ServerRpcController();
695             BlockingRpcCallback<AggregateResponse> rpcCallback = 
696                 new BlockingRpcCallback<AggregateResponse>();
697             instance.getMedian(controller, requestArg, rpcCallback);
698             AggregateResponse response = rpcCallback.get();
699             if (controller.failedOnException()) {
700               throw controller.getFailedOn();
701             }
702 
703             List<S> list = new ArrayList<S>();
704             for (int i = 0; i < response.getFirstPartCount(); i++) {
705               ByteString b = response.getFirstPart(i);
706               T t = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 4, b);
707               S s = ci.getPromotedValueFromProto(t);
708               list.add(s);
709             }
710             return list;
711           }
712 
713         }, stdCallback);
714     return stdCallback.getMedianParams();
715   }
716 
717   /**
718    * This is the client side interface/handler for calling the median method for a
719    * given cf-cq combination. This method collects the necessary parameters
720    * to compute the median and returns the median.
721    * @param tableName
722    * @param ci
723    * @param scan
724    * @return R the median
725    * @throws Throwable
726    */
727   public <R, S, P extends Message, Q extends Message, T extends Message>
728   R median(final TableName tableName, ColumnInterpreter<R, S, P, Q, T> ci,
729       Scan scan) throws Throwable {
730     HTable table = null;
731     try {
732       table = new HTable(conf, tableName);
733       return median(table, ci, scan);
734     } finally {
735       if (table != null) {
736         table.close();
737       }
738     }
739   }
740 
741   /**
742    * This is the client side interface/handler for calling the median method for a
743    * given cf-cq combination. This method collects the necessary parameters
744    * to compute the median and returns the median.
745    * @param table
746    * @param ci
747    * @param scan
748    * @return R the median
749    * @throws Throwable
750    */
751   public <R, S, P extends Message, Q extends Message, T extends Message>
752   R median(final HTable table, ColumnInterpreter<R, S, P, Q, T> ci,
753       Scan scan) throws Throwable {
754     Pair<NavigableMap<byte[], List<S>>, List<S>> p = getMedianArgs(table, ci, scan);
755     byte[] startRow = null;
756     byte[] colFamily = scan.getFamilies()[0];
757     NavigableSet<byte[]> quals = scan.getFamilyMap().get(colFamily);
758     NavigableMap<byte[], List<S>> map = p.getFirst();
759     S sumVal = p.getSecond().get(0);
760     S sumWeights = p.getSecond().get(1);
761     double halfSumVal = ci.divideForAvg(sumVal, 2L);
762     double movingSumVal = 0;
763     boolean weighted = false;
764     if (quals.size() > 1) {
765       weighted = true;
766       halfSumVal = ci.divideForAvg(sumWeights, 2L);
767     }
768     
769     for (Map.Entry<byte[], List<S>> entry : map.entrySet()) {
770       S s = weighted ? entry.getValue().get(1) : entry.getValue().get(0);
771       double newSumVal = movingSumVal + ci.divideForAvg(s, 1L);
772       if (newSumVal > halfSumVal) break;  // we found the region with the median
773       movingSumVal = newSumVal;
774       startRow = entry.getKey();
775     }
776     // scan the region with median and find it
777     Scan scan2 = new Scan(scan);
778     // inherit stop row from method parameter
779     if (startRow != null) scan2.setStartRow(startRow);
780     ResultScanner scanner = null;
781     try {
782       int cacheSize = scan2.getCaching();
783       if (!scan2.getCacheBlocks() || scan2.getCaching() < 2) {
784         scan2.setCacheBlocks(true);
785         cacheSize = 5;
786         scan2.setCaching(cacheSize);
787       }
788       scanner = table.getScanner(scan2);
789       Result[] results = null;
790       byte[] qualifier = quals.pollFirst();
791       // qualifier for the weight column
792       byte[] weightQualifier = weighted ? quals.pollLast() : qualifier;
793       R value = null;
794       do {
795         results = scanner.next(cacheSize);
796         if (results != null && results.length > 0) {
797           for (int i = 0; i < results.length; i++) {
798             Result r = results[i];
799             // retrieve weight
800             Cell kv = r.getColumnLatest(colFamily, weightQualifier);
801             R newValue = ci.getValue(colFamily, weightQualifier, kv);
802             S s = ci.castToReturnType(newValue);
803             double newSumVal = movingSumVal + ci.divideForAvg(s, 1L);
804             // see if we have moved past the median
805             if (newSumVal > halfSumVal) {
806               return value;
807             }
808             movingSumVal = newSumVal;
809             kv = r.getColumnLatest(colFamily, qualifier);
810             value = ci.getValue(colFamily, qualifier, kv);
811             }
812           }
813       } while (results != null && results.length > 0);
814     } finally {
815       if (scanner != null) {
816         scanner.close();
817       }
818     }
819     return null;
820   }
821 
822   <R, S, P extends Message, Q extends Message, T extends Message> AggregateRequest 
823   validateArgAndGetPB(Scan scan, ColumnInterpreter<R,S,P,Q,T> ci, boolean canFamilyBeAbsent)
824       throws IOException {
825     validateParameters(scan, canFamilyBeAbsent);
826     final AggregateRequest.Builder requestBuilder = 
827         AggregateRequest.newBuilder();
828     requestBuilder.setInterpreterClassName(ci.getClass().getCanonicalName());
829     P columnInterpreterSpecificData = null;
830     if ((columnInterpreterSpecificData = ci.getRequestData()) 
831        != null) {
832       requestBuilder.setInterpreterSpecificBytes(columnInterpreterSpecificData.toByteString());
833     }
834     requestBuilder.setScan(ProtobufUtil.toScan(scan));
835     return requestBuilder.build();
836   }
837 
838   byte[] getBytesFromResponse(ByteString response) {
839     ByteBuffer bb = response.asReadOnlyByteBuffer();
840     bb.rewind();
841     byte[] bytes;
842     if (bb.hasArray()) {
843       bytes = bb.array();
844     } else {
845       bytes = response.toByteArray();
846     }
847     return bytes;
848   }
849 }