View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.client.coprocessor;
21  
22  import java.io.IOException;
23  import java.nio.ByteBuffer;
24  import java.util.ArrayList;
25  import java.util.List;
26  import java.util.Map;
27  import java.util.NavigableMap;
28  import java.util.NavigableSet;
29  import java.util.TreeMap;
30  import java.util.concurrent.atomic.AtomicLong;
31  
32  import org.apache.commons.logging.Log;
33  import org.apache.commons.logging.LogFactory;
34  import org.apache.hadoop.classification.InterfaceAudience;
35  import org.apache.hadoop.conf.Configuration;
36  import org.apache.hadoop.hbase.Cell;
37  import org.apache.hadoop.hbase.HConstants;
38  import org.apache.hadoop.hbase.TableName;
39  import org.apache.hadoop.hbase.client.HTable;
40  import org.apache.hadoop.hbase.client.Result;
41  import org.apache.hadoop.hbase.client.ResultScanner;
42  import org.apache.hadoop.hbase.client.Scan;
43  import org.apache.hadoop.hbase.client.Table;
44  import org.apache.hadoop.hbase.coprocessor.ColumnInterpreter;
45  import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
46  import org.apache.hadoop.hbase.ipc.ServerRpcController;
47  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
48  import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateRequest;
49  import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateResponse;
50  import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateService;
51  import org.apache.hadoop.hbase.util.Bytes;
52  import org.apache.hadoop.hbase.util.Pair;
53  
54  import com.google.protobuf.ByteString;
55  import com.google.protobuf.Message;
56  
57  /**
58   * This client class is for invoking the aggregate functions deployed on the
59   * Region Server side via the AggregateService. This class will implement the
60   * supporting functionality for summing/processing the individual results
61   * obtained from the AggregateService for each region.
62   * <p>
63   * This will serve as the client side handler for invoking the aggregate
64   * functions.
65   * <ul>
66   * For all aggregate functions,
67   * <li>start row < end row is an essential condition (if they are not
68   * {@link HConstants#EMPTY_BYTE_ARRAY})
69   * <li>Column family can't be null. In case where multiple families are
70   * provided, an IOException will be thrown. An optional column qualifier can
71   * also be defined.
72   * <li>For methods to find maximum, minimum, sum, rowcount, it returns the
73   * parameter type. For average and std, it returns a double value. For row
74   * count, it returns a long value.
75   */
76  @InterfaceAudience.Private
77  public class AggregationClient {
78  
79    private static final Log log = LogFactory.getLog(AggregationClient.class);
80    Configuration conf;
81  
82    /**
83     * Constructor with Conf object
84     * @param cfg
85     */
86    public AggregationClient(Configuration cfg) {
87      this.conf = cfg;
88    }
89  
90    /**
91     * It gives the maximum value of a column for a given column family for the
92     * given range. In case qualifier is null, a max of all values for the given
93     * family is returned.
94     * @param tableName
95     * @param ci
96     * @param scan
97     * @return max val <R>
98     * @throws Throwable
99     *           The caller is supposed to handle the exception as they are thrown
100    *           & propagated to it.
101    */
102   public <R, S, P extends Message, Q extends Message, T extends Message> R max(
103       final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
104       throws Throwable {
105     Table table = null;
106     try {
107       table = new HTable(conf, tableName);
108       return max(table, ci, scan);
109     } finally {
110       if (table != null) {
111         table.close();
112       }
113     }
114   }
115 
116   /**
117    * It gives the maximum value of a column for a given column family for the
118    * given range. In case qualifier is null, a max of all values for the given
119    * family is returned.
120    * @param table
121    * @param ci
122    * @param scan
123    * @return max val <R>
124    * @throws Throwable
125    *           The caller is supposed to handle the exception as they are thrown
126    *           & propagated to it.
127    */
128   public <R, S, P extends Message, Q extends Message, T extends Message> 
129   R max(final Table table, final ColumnInterpreter<R, S, P, Q, T> ci,
130       final Scan scan) throws Throwable {
131     final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false);
132     class MaxCallBack implements Batch.Callback<R> {
133       R max = null;
134 
135       R getMax() {
136         return max;
137       }
138 
139       @Override
140       public synchronized void update(byte[] region, byte[] row, R result) {
141         max = (max == null || (result != null && ci.compare(max, result) < 0)) ? result : max;
142       }
143     }
144     MaxCallBack aMaxCallBack = new MaxCallBack();
145     table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
146         new Batch.Call<AggregateService, R>() {
147           @Override
148           public R call(AggregateService instance) throws IOException {
149             ServerRpcController controller = new ServerRpcController();
150             BlockingRpcCallback<AggregateResponse> rpcCallback = 
151                 new BlockingRpcCallback<AggregateResponse>();
152             instance.getMax(controller, requestArg, rpcCallback);
153             AggregateResponse response = rpcCallback.get();
154             if (controller.failedOnException()) {
155               throw controller.getFailedOn();
156             }
157             if (response.getFirstPartCount() > 0) {
158               ByteString b = response.getFirstPart(0);
159               Q q = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 3, b);
160               return ci.getCellValueFromProto(q);
161             }
162             return null;
163           }
164         }, aMaxCallBack);
165     return aMaxCallBack.getMax();
166   }
167 
168   /*
169    * @param scan
170    * @param canFamilyBeAbsent whether column family can be absent in familyMap of scan
171    */
172   private void validateParameters(Scan scan, boolean canFamilyBeAbsent) throws IOException {
173     if (scan == null
174         || (Bytes.equals(scan.getStartRow(), scan.getStopRow()) && !Bytes
175             .equals(scan.getStartRow(), HConstants.EMPTY_START_ROW))
176         || ((Bytes.compareTo(scan.getStartRow(), scan.getStopRow()) > 0) &&
177         	!Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW))) {
178       throw new IOException(
179           "Agg client Exception: Startrow should be smaller than Stoprow");
180     } else if (!canFamilyBeAbsent) {
181       if (scan.getFamilyMap().size() != 1) {
182         throw new IOException("There must be only one family.");
183       }
184     }
185   }
186 
187   /**
188    * It gives the minimum value of a column for a given column family for the
189    * given range. In case qualifier is null, a min of all values for the given
190    * family is returned.
191    * @param tableName
192    * @param ci
193    * @param scan
194    * @return min val <R>
195    * @throws Throwable
196    */
197   public <R, S, P extends Message, Q extends Message, T extends Message> R min(
198       final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
199       throws Throwable {
200     Table table = null;
201     try {
202       table = new HTable(conf, tableName);
203       return min(table, ci, scan);
204     } finally {
205       if (table != null) {
206         table.close();
207       }
208     }
209   }
210 
211   /**
212    * It gives the minimum value of a column for a given column family for the
213    * given range. In case qualifier is null, a min of all values for the given
214    * family is returned.
215    * @param table
216    * @param ci
217    * @param scan
218    * @return min val <R>
219    * @throws Throwable
220    */
221   public <R, S, P extends Message, Q extends Message, T extends Message> 
222   R min(final Table table, final ColumnInterpreter<R, S, P, Q, T> ci,
223       final Scan scan) throws Throwable {
224     final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false);
225     class MinCallBack implements Batch.Callback<R> {
226 
227       private R min = null;
228 
229       public R getMinimum() {
230         return min;
231       }
232 
233       @Override
234       public synchronized void update(byte[] region, byte[] row, R result) {
235         min = (min == null || (result != null && ci.compare(result, min) < 0)) ? result : min;
236       }
237     }
238     MinCallBack minCallBack = new MinCallBack();
239     table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
240         new Batch.Call<AggregateService, R>() {
241 
242           @Override
243           public R call(AggregateService instance) throws IOException {
244             ServerRpcController controller = new ServerRpcController();
245             BlockingRpcCallback<AggregateResponse> rpcCallback = 
246                 new BlockingRpcCallback<AggregateResponse>();
247             instance.getMin(controller, requestArg, rpcCallback);
248             AggregateResponse response = rpcCallback.get();
249             if (controller.failedOnException()) {
250               throw controller.getFailedOn();
251             }
252             if (response.getFirstPartCount() > 0) {
253               ByteString b = response.getFirstPart(0);
254               Q q = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 3, b);
255               return ci.getCellValueFromProto(q);
256             }
257             return null;
258           }
259         }, minCallBack);
260     log.debug("Min fom all regions is: " + minCallBack.getMinimum());
261     return minCallBack.getMinimum();
262   }
263 
264   /**
265    * It gives the row count, by summing up the individual results obtained from
266    * regions. In case the qualifier is null, FirstKeyValueFilter is used to
267    * optimised the operation. In case qualifier is provided, I can't use the
268    * filter as it may set the flag to skip to next row, but the value read is
269    * not of the given filter: in this case, this particular row will not be
270    * counted ==> an error.
271    * @param tableName
272    * @param ci
273    * @param scan
274    * @return <R, S>
275    * @throws Throwable
276    */
277   public <R, S, P extends Message, Q extends Message, T extends Message> long rowCount(
278       final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
279       throws Throwable {
280     Table table = null;
281     try {
282       table = new HTable(conf, tableName);
283       return rowCount(table, ci, scan);
284     } finally {
285       if (table != null) {
286         table.close();
287       }
288     }
289   }
290 
291   /**
292    * It gives the row count, by summing up the individual results obtained from
293    * regions. In case the qualifier is null, FirstKeyValueFilter is used to
294    * optimised the operation. In case qualifier is provided, I can't use the
295    * filter as it may set the flag to skip to next row, but the value read is
296    * not of the given filter: in this case, this particular row will not be
297    * counted ==> an error.
298    * @param table
299    * @param ci
300    * @param scan
301    * @return <R, S>
302    * @throws Throwable
303    */
304   public <R, S, P extends Message, Q extends Message, T extends Message> 
305   long rowCount(final Table table,
306       final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable {
307     final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, true);
308     class RowNumCallback implements Batch.Callback<Long> {
309       private final AtomicLong rowCountL = new AtomicLong(0);
310 
311       public long getRowNumCount() {
312         return rowCountL.get();
313       }
314 
315       @Override
316       public void update(byte[] region, byte[] row, Long result) {
317         rowCountL.addAndGet(result.longValue());
318       }
319     }
320     RowNumCallback rowNum = new RowNumCallback();
321     table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
322         new Batch.Call<AggregateService, Long>() {
323           @Override
324           public Long call(AggregateService instance) throws IOException {
325             ServerRpcController controller = new ServerRpcController();
326             BlockingRpcCallback<AggregateResponse> rpcCallback = 
327                 new BlockingRpcCallback<AggregateResponse>();
328             instance.getRowNum(controller, requestArg, rpcCallback);
329             AggregateResponse response = rpcCallback.get();
330             if (controller.failedOnException()) {
331               throw controller.getFailedOn();
332             }
333             byte[] bytes = getBytesFromResponse(response.getFirstPart(0));
334             ByteBuffer bb = ByteBuffer.allocate(8).put(bytes);
335             bb.rewind();
336             return bb.getLong();
337           }
338         }, rowNum);
339     return rowNum.getRowNumCount();
340   }
341 
342   /**
343    * It sums up the value returned from various regions. In case qualifier is
344    * null, summation of all the column qualifiers in the given family is done.
345    * @param tableName
346    * @param ci
347    * @param scan
348    * @return sum <S>
349    * @throws Throwable
350    */
351   public <R, S, P extends Message, Q extends Message, T extends Message> S sum(
352       final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
353       throws Throwable {
354     Table table = null;
355     try {
356       table = new HTable(conf, tableName);
357       return sum(table, ci, scan);
358     } finally {
359       if (table != null) {
360         table.close();
361       }
362     }
363   }
364 
365   /**
366    * It sums up the value returned from various regions. In case qualifier is
367    * null, summation of all the column qualifiers in the given family is done.
368    * @param table
369    * @param ci
370    * @param scan
371    * @return sum <S>
372    * @throws Throwable
373    */
374   public <R, S, P extends Message, Q extends Message, T extends Message> 
375   S sum(final Table table, final ColumnInterpreter<R, S, P, Q, T> ci,
376       final Scan scan) throws Throwable {
377     final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false);
378     
379     class SumCallBack implements Batch.Callback<S> {
380       S sumVal = null;
381 
382       public S getSumResult() {
383         return sumVal;
384       }
385 
386       @Override
387       public synchronized void update(byte[] region, byte[] row, S result) {
388         sumVal = ci.add(sumVal, result);
389       }
390     }
391     SumCallBack sumCallBack = new SumCallBack();
392     table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
393         new Batch.Call<AggregateService, S>() {
394           @Override
395           public S call(AggregateService instance) throws IOException {
396             ServerRpcController controller = new ServerRpcController();
397             BlockingRpcCallback<AggregateResponse> rpcCallback = 
398                 new BlockingRpcCallback<AggregateResponse>();
399             instance.getSum(controller, requestArg, rpcCallback);
400             AggregateResponse response = rpcCallback.get();
401             if (controller.failedOnException()) {
402               throw controller.getFailedOn();
403             }
404             if (response.getFirstPartCount() == 0) {
405               return null;
406             }
407             ByteString b = response.getFirstPart(0);
408             T t = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 4, b);
409             S s = ci.getPromotedValueFromProto(t);
410             return s;
411           }
412         }, sumCallBack);
413     return sumCallBack.getSumResult();
414   }
415 
416   /**
417    * It computes average while fetching sum and row count from all the
418    * corresponding regions. Approach is to compute a global sum of region level
419    * sum and rowcount and then compute the average.
420    * @param tableName
421    * @param scan
422    * @throws Throwable
423    */
424   private <R, S, P extends Message, Q extends Message, T extends Message> Pair<S, Long> getAvgArgs(
425       final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
426       throws Throwable {
427     Table table = null;
428     try {
429       table = new HTable(conf, tableName);
430       return getAvgArgs(table, ci, scan);
431     } finally {
432       if (table != null) {
433         table.close();
434       }
435     }
436   }
437 
438   /**
439    * It computes average while fetching sum and row count from all the
440    * corresponding regions. Approach is to compute a global sum of region level
441    * sum and rowcount and then compute the average.
442    * @param table
443    * @param scan
444    * @throws Throwable
445    */
446   private <R, S, P extends Message, Q extends Message, T extends Message>
447   Pair<S, Long> getAvgArgs(final Table table,
448       final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable {
449     final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false);
450     class AvgCallBack implements Batch.Callback<Pair<S, Long>> {
451       S sum = null;
452       Long rowCount = 0l;
453 
454       public synchronized Pair<S, Long> getAvgArgs() {
455         return new Pair<S, Long>(sum, rowCount);
456       }
457 
458       @Override
459       public synchronized void update(byte[] region, byte[] row, Pair<S, Long> result) {
460         sum = ci.add(sum, result.getFirst());
461         rowCount += result.getSecond();
462       }
463     }
464     AvgCallBack avgCallBack = new AvgCallBack();
465     table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
466         new Batch.Call<AggregateService, Pair<S, Long>>() {
467           @Override
468           public Pair<S, Long> call(AggregateService instance) throws IOException {
469             ServerRpcController controller = new ServerRpcController();
470             BlockingRpcCallback<AggregateResponse> rpcCallback = 
471                 new BlockingRpcCallback<AggregateResponse>();
472             instance.getAvg(controller, requestArg, rpcCallback);
473             AggregateResponse response = rpcCallback.get();
474             if (controller.failedOnException()) {
475               throw controller.getFailedOn();
476             }
477             Pair<S, Long> pair = new Pair<S, Long>(null, 0L);
478             if (response.getFirstPartCount() == 0) {
479               return pair;
480             }
481             ByteString b = response.getFirstPart(0);
482             T t = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 4, b);
483             S s = ci.getPromotedValueFromProto(t);
484             pair.setFirst(s);
485             ByteBuffer bb = ByteBuffer.allocate(8).put(
486                 getBytesFromResponse(response.getSecondPart()));
487             bb.rewind();
488             pair.setSecond(bb.getLong());
489             return pair;
490           }
491         }, avgCallBack);
492     return avgCallBack.getAvgArgs();
493   }
494 
495   /**
496    * This is the client side interface/handle for calling the average method for
497    * a given cf-cq combination. It was necessary to add one more call stack as
498    * its return type should be a decimal value, irrespective of what
499    * columninterpreter says. So, this methods collects the necessary parameters
500    * to compute the average and returs the double value.
501    * @param tableName
502    * @param ci
503    * @param scan
504    * @return <R, S>
505    * @throws Throwable
506    */
507   public <R, S, P extends Message, Q extends Message, T extends Message>
508   double avg(final TableName tableName,
509       final ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) throws Throwable {
510     Pair<S, Long> p = getAvgArgs(tableName, ci, scan);
511     return ci.divideForAvg(p.getFirst(), p.getSecond());
512   }
513 
514   /**
515    * This is the client side interface/handle for calling the average method for
516    * a given cf-cq combination. It was necessary to add one more call stack as
517    * its return type should be a decimal value, irrespective of what
518    * columninterpreter says. So, this methods collects the necessary parameters
519    * to compute the average and returs the double value.
520    * @param table
521    * @param ci
522    * @param scan
523    * @return <R, S>
524    * @throws Throwable
525    */
526   public <R, S, P extends Message, Q extends Message, T extends Message> double avg(
527       final Table table, final ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) throws Throwable {
528     Pair<S, Long> p = getAvgArgs(table, ci, scan);
529     return ci.divideForAvg(p.getFirst(), p.getSecond());
530   }
531 
532   /**
533    * It computes a global standard deviation for a given column and its value.
534    * Standard deviation is square root of (average of squares -
535    * average*average). From individual regions, it obtains sum, square sum and
536    * number of rows. With these, the above values are computed to get the global
537    * std.
538    * @param table
539    * @param scan
540    * @return standard deviations
541    * @throws Throwable
542    */
543   private <R, S, P extends Message, Q extends Message, T extends Message>
544   Pair<List<S>, Long> getStdArgs(final Table table,
545       final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable {
546     final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false);
547     class StdCallback implements Batch.Callback<Pair<List<S>, Long>> {
548       long rowCountVal = 0l;
549       S sumVal = null, sumSqVal = null;
550 
551       public synchronized Pair<List<S>, Long> getStdParams() {
552         List<S> l = new ArrayList<S>();
553         l.add(sumVal);
554         l.add(sumSqVal);
555         Pair<List<S>, Long> p = new Pair<List<S>, Long>(l, rowCountVal);
556         return p;
557       }
558 
559       @Override
560       public synchronized void update(byte[] region, byte[] row, Pair<List<S>, Long> result) {
561         if (result.getFirst().size() > 0) {
562           sumVal = ci.add(sumVal, result.getFirst().get(0));
563           sumSqVal = ci.add(sumSqVal, result.getFirst().get(1));
564           rowCountVal += result.getSecond();
565         }
566       }
567     }
568     StdCallback stdCallback = new StdCallback();
569     table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
570         new Batch.Call<AggregateService, Pair<List<S>, Long>>() {
571           @Override
572           public Pair<List<S>, Long> call(AggregateService instance) throws IOException {
573             ServerRpcController controller = new ServerRpcController();
574             BlockingRpcCallback<AggregateResponse> rpcCallback = 
575                 new BlockingRpcCallback<AggregateResponse>();
576             instance.getStd(controller, requestArg, rpcCallback);
577             AggregateResponse response = rpcCallback.get();
578             if (controller.failedOnException()) {
579               throw controller.getFailedOn();
580             }
581             Pair<List<S>, Long> pair = new Pair<List<S>, Long>(new ArrayList<S>(), 0L);
582             if (response.getFirstPartCount() == 0) {
583               return pair;
584             }
585             List<S> list = new ArrayList<S>();
586             for (int i = 0; i < response.getFirstPartCount(); i++) {
587               ByteString b = response.getFirstPart(i);
588               T t = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 4, b);
589               S s = ci.getPromotedValueFromProto(t);
590               list.add(s);
591             }
592             pair.setFirst(list);
593             ByteBuffer bb = ByteBuffer.allocate(8).put(
594                 getBytesFromResponse(response.getSecondPart()));
595             bb.rewind();
596             pair.setSecond(bb.getLong());
597             return pair;
598           }
599         }, stdCallback);
600     return stdCallback.getStdParams();
601   }
602 
603   /**
604    * This is the client side interface/handle for calling the std method for a
605    * given cf-cq combination. It was necessary to add one more call stack as its
606    * return type should be a decimal value, irrespective of what
607    * columninterpreter says. So, this methods collects the necessary parameters
608    * to compute the std and returns the double value.
609    * @param tableName
610    * @param ci
611    * @param scan
612    * @return <R, S>
613    * @throws Throwable
614    */
615   public <R, S, P extends Message, Q extends Message, T extends Message>
616   double std(final TableName tableName, ColumnInterpreter<R, S, P, Q, T> ci,
617       Scan scan) throws Throwable {
618     Table table = null;
619     try {
620       table = new HTable(conf, tableName);
621       return std(table, ci, scan);
622     } finally {
623       if (table != null) {
624         table.close();
625       }
626     }
627   }
628 
629   /**
630    * This is the client side interface/handle for calling the std method for a
631    * given cf-cq combination. It was necessary to add one more call stack as its
632    * return type should be a decimal value, irrespective of what
633    * columninterpreter says. So, this methods collects the necessary parameters
634    * to compute the std and returns the double value.
635    * @param table
636    * @param ci
637    * @param scan
638    * @return <R, S>
639    * @throws Throwable
640    */
641   public <R, S, P extends Message, Q extends Message, T extends Message> double std(
642       final Table table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) throws Throwable {
643     Pair<List<S>, Long> p = getStdArgs(table, ci, scan);
644     double res = 0d;
645     double avg = ci.divideForAvg(p.getFirst().get(0), p.getSecond());
646     double avgOfSumSq = ci.divideForAvg(p.getFirst().get(1), p.getSecond());
647     res = avgOfSumSq - (avg) * (avg); // variance
648     res = Math.pow(res, 0.5);
649     return res;
650   }
651 
652   /**
653    * It helps locate the region with median for a given column whose weight 
654    * is specified in an optional column.
655    * From individual regions, it obtains sum of values and sum of weights.
656    * @param table
657    * @param ci
658    * @param scan
659    * @return pair whose first element is a map between start row of the region
660    *  and (sum of values, sum of weights) for the region, the second element is
661    *  (sum of values, sum of weights) for all the regions chosen
662    * @throws Throwable
663    */
664   private <R, S, P extends Message, Q extends Message, T extends Message>
665   Pair<NavigableMap<byte[], List<S>>, List<S>>
666   getMedianArgs(final Table table,
667       final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable {
668     final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false);
669     final NavigableMap<byte[], List<S>> map =
670       new TreeMap<byte[], List<S>>(Bytes.BYTES_COMPARATOR);
671     class StdCallback implements Batch.Callback<List<S>> {
672       S sumVal = null, sumWeights = null;
673 
674       public synchronized Pair<NavigableMap<byte[], List<S>>, List<S>> getMedianParams() {
675         List<S> l = new ArrayList<S>();
676         l.add(sumVal);
677         l.add(sumWeights);
678         Pair<NavigableMap<byte[], List<S>>, List<S>> p =
679           new Pair<NavigableMap<byte[], List<S>>, List<S>>(map, l);
680         return p;
681       }
682 
683       @Override
684       public synchronized void update(byte[] region, byte[] row, List<S> result) {
685         map.put(row, result);
686         sumVal = ci.add(sumVal, result.get(0));
687         sumWeights = ci.add(sumWeights, result.get(1));
688       }
689     }
690     StdCallback stdCallback = new StdCallback();
691     table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
692         new Batch.Call<AggregateService, List<S>>() {
693           @Override
694           public List<S> call(AggregateService instance) throws IOException {
695             ServerRpcController controller = new ServerRpcController();
696             BlockingRpcCallback<AggregateResponse> rpcCallback = 
697                 new BlockingRpcCallback<AggregateResponse>();
698             instance.getMedian(controller, requestArg, rpcCallback);
699             AggregateResponse response = rpcCallback.get();
700             if (controller.failedOnException()) {
701               throw controller.getFailedOn();
702             }
703 
704             List<S> list = new ArrayList<S>();
705             for (int i = 0; i < response.getFirstPartCount(); i++) {
706               ByteString b = response.getFirstPart(i);
707               T t = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 4, b);
708               S s = ci.getPromotedValueFromProto(t);
709               list.add(s);
710             }
711             return list;
712           }
713 
714         }, stdCallback);
715     return stdCallback.getMedianParams();
716   }
717 
718   /**
719    * This is the client side interface/handler for calling the median method for a
720    * given cf-cq combination. This method collects the necessary parameters
721    * to compute the median and returns the median.
722    * @param tableName
723    * @param ci
724    * @param scan
725    * @return R the median
726    * @throws Throwable
727    */
728   public <R, S, P extends Message, Q extends Message, T extends Message>
729   R median(final TableName tableName, ColumnInterpreter<R, S, P, Q, T> ci,
730       Scan scan) throws Throwable {
731     Table table = null;
732     try {
733       table = new HTable(conf, tableName);
734       return median(table, ci, scan);
735     } finally {
736       if (table != null) {
737         table.close();
738       }
739     }
740   }
741 
742   /**
743    * This is the client side interface/handler for calling the median method for a
744    * given cf-cq combination. This method collects the necessary parameters
745    * to compute the median and returns the median.
746    * @param table
747    * @param ci
748    * @param scan
749    * @return R the median
750    * @throws Throwable
751    */
752   public <R, S, P extends Message, Q extends Message, T extends Message>
753   R median(final Table table, ColumnInterpreter<R, S, P, Q, T> ci,
754       Scan scan) throws Throwable {
755     Pair<NavigableMap<byte[], List<S>>, List<S>> p = getMedianArgs(table, ci, scan);
756     byte[] startRow = null;
757     byte[] colFamily = scan.getFamilies()[0];
758     NavigableSet<byte[]> quals = scan.getFamilyMap().get(colFamily);
759     NavigableMap<byte[], List<S>> map = p.getFirst();
760     S sumVal = p.getSecond().get(0);
761     S sumWeights = p.getSecond().get(1);
762     double halfSumVal = ci.divideForAvg(sumVal, 2L);
763     double movingSumVal = 0;
764     boolean weighted = false;
765     if (quals.size() > 1) {
766       weighted = true;
767       halfSumVal = ci.divideForAvg(sumWeights, 2L);
768     }
769     
770     for (Map.Entry<byte[], List<S>> entry : map.entrySet()) {
771       S s = weighted ? entry.getValue().get(1) : entry.getValue().get(0);
772       double newSumVal = movingSumVal + ci.divideForAvg(s, 1L);
773       if (newSumVal > halfSumVal) break;  // we found the region with the median
774       movingSumVal = newSumVal;
775       startRow = entry.getKey();
776     }
777     // scan the region with median and find it
778     Scan scan2 = new Scan(scan);
779     // inherit stop row from method parameter
780     if (startRow != null) scan2.setStartRow(startRow);
781     ResultScanner scanner = null;
782     try {
783       int cacheSize = scan2.getCaching();
784       if (!scan2.getCacheBlocks() || scan2.getCaching() < 2) {
785         scan2.setCacheBlocks(true);
786         cacheSize = 5;
787         scan2.setCaching(cacheSize);
788       }
789       scanner = table.getScanner(scan2);
790       Result[] results = null;
791       byte[] qualifier = quals.pollFirst();
792       // qualifier for the weight column
793       byte[] weightQualifier = weighted ? quals.pollLast() : qualifier;
794       R value = null;
795       do {
796         results = scanner.next(cacheSize);
797         if (results != null && results.length > 0) {
798           for (int i = 0; i < results.length; i++) {
799             Result r = results[i];
800             // retrieve weight
801             Cell kv = r.getColumnLatest(colFamily, weightQualifier);
802             R newValue = ci.getValue(colFamily, weightQualifier, kv);
803             S s = ci.castToReturnType(newValue);
804             double newSumVal = movingSumVal + ci.divideForAvg(s, 1L);
805             // see if we have moved past the median
806             if (newSumVal > halfSumVal) {
807               return value;
808             }
809             movingSumVal = newSumVal;
810             kv = r.getColumnLatest(colFamily, qualifier);
811             value = ci.getValue(colFamily, qualifier, kv);
812             }
813           }
814       } while (results != null && results.length > 0);
815     } finally {
816       if (scanner != null) {
817         scanner.close();
818       }
819     }
820     return null;
821   }
822 
823   <R, S, P extends Message, Q extends Message, T extends Message> AggregateRequest 
824   validateArgAndGetPB(Scan scan, ColumnInterpreter<R,S,P,Q,T> ci, boolean canFamilyBeAbsent)
825       throws IOException {
826     validateParameters(scan, canFamilyBeAbsent);
827     final AggregateRequest.Builder requestBuilder = 
828         AggregateRequest.newBuilder();
829     requestBuilder.setInterpreterClassName(ci.getClass().getCanonicalName());
830     P columnInterpreterSpecificData = null;
831     if ((columnInterpreterSpecificData = ci.getRequestData()) 
832        != null) {
833       requestBuilder.setInterpreterSpecificBytes(columnInterpreterSpecificData.toByteString());
834     }
835     requestBuilder.setScan(ProtobufUtil.toScan(scan));
836     return requestBuilder.build();
837   }
838 
839   byte[] getBytesFromResponse(ByteString response) {
840     ByteBuffer bb = response.asReadOnlyByteBuffer();
841     bb.rewind();
842     byte[] bytes;
843     if (bb.hasArray()) {
844       bytes = bb.array();
845     } else {
846       bytes = response.toByteArray();
847     }
848     return bytes;
849   }
850 }