1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.client.coprocessor;
21  
22  import com.google.protobuf.ByteString;
23  import com.google.protobuf.Message;
24  import org.apache.commons.logging.Log;
25  import org.apache.commons.logging.LogFactory;
26  import org.apache.hadoop.classification.InterfaceAudience;
27  import org.apache.hadoop.classification.InterfaceStability;
28  import org.apache.hadoop.conf.Configuration;
29  import org.apache.hadoop.hbase.HConstants;
30  import org.apache.hadoop.hbase.KeyValue;
31  import org.apache.hadoop.hbase.client.HTable;
32  import org.apache.hadoop.hbase.client.Result;
33  import org.apache.hadoop.hbase.client.ResultScanner;
34  import org.apache.hadoop.hbase.client.Scan;
35  import org.apache.hadoop.hbase.coprocessor.ColumnInterpreter;
36  import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
37  import org.apache.hadoop.hbase.ipc.ServerRpcController;
38  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
39  import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateArgument;
40  import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateResponse;
41  import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateService;
42  import org.apache.hadoop.hbase.util.Bytes;
43  import org.apache.hadoop.hbase.util.Pair;
44  
45  import java.io.IOException;
46  import java.nio.ByteBuffer;
47  import java.util.ArrayList;
48  import java.util.List;
49  import java.util.Map;
50  import java.util.NavigableMap;
51  import java.util.NavigableSet;
52  import java.util.TreeMap;
53  import java.util.concurrent.atomic.AtomicLong;
54  
55  /**
56   * This client class is for invoking the aggregate functions deployed on the
57   * Region Server side via the AggregateService. This class will implement the
58   * supporting functionality for summing/processing the individual results
59   * obtained from the AggregateService for each region.
60   * <p>
61   * This will serve as the client side handler for invoking the aggregate
62   * functions.
63   * <ul>
64   * For all aggregate functions,
65   * <li>start row < end row is an essential condition (if they are not
66   * {@link HConstants#EMPTY_BYTE_ARRAY})
67   * <li>Column family can't be null. In case where multiple families are
68   * provided, an IOException will be thrown. An optional column qualifier can
69   * also be defined.
70   * <li>For methods to find maximum, minimum, sum, rowcount, it returns the
71   * parameter type. For average and std, it returns a double value. For row
72   * count, it returns a long value.
73   */
74  @InterfaceAudience.Public
75  @InterfaceStability.Evolving
76  public class AggregationClient {
77  
78    private static final Log log = LogFactory.getLog(AggregationClient.class);
79    Configuration conf;
80  
81    /**
82     * Constructor with Conf object
83     * @param cfg
84     */
85    public AggregationClient(Configuration cfg) {
86      this.conf = cfg;
87    }
88  
89    /**
90     * It gives the maximum value of a column for a given column family for the
91     * given range. In case qualifier is null, a max of all values for the given
92     * family is returned.
93     * @param tableName
94     * @param ci
95     * @param scan
96     * @return max val <R>
97     * @throws Throwable
98     *           The caller is supposed to handle the exception as they are thrown
99     *           & propagated to it.
100    */
101   public <R, S, P extends Message, Q extends Message, T extends Message> R max(
102       final byte[] tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
103       throws Throwable {
104     HTable table = null;
105     try {
106       table = new HTable(conf, tableName);
107       return max(table, ci, scan);
108     } finally {
109       if (table != null) {
110         table.close();
111       }
112     }
113   }
114 
115   /**
116    * It gives the maximum value of a column for a given column family for the
117    * given range. In case qualifier is null, a max of all values for the given
118    * family is returned.
119    * @param table
120    * @param ci
121    * @param scan
122    * @return max val <R>
123    * @throws Throwable
124    *           The caller is supposed to handle the exception as they are thrown
125    *           & propagated to it.
126    */
127   public <R, S, P extends Message, Q extends Message, T extends Message> 
128   R max(final HTable table, final ColumnInterpreter<R, S, P, Q, T> ci,
129       final Scan scan) throws Throwable {
130     final AggregateArgument requestArg = validateArgAndGetPB(scan, ci);
131     class MaxCallBack implements Batch.Callback<R> {
132       R max = null;
133 
134       R getMax() {
135         return max;
136       }
137 
138       @Override
139       public synchronized void update(byte[] region, byte[] row, R result) {
140         max = (max == null || (result != null && ci.compare(max, result) < 0)) ? result : max;
141       }
142     }
143     MaxCallBack aMaxCallBack = new MaxCallBack();
144     table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
145         new Batch.Call<AggregateService, R>() {
146           @Override
147           public R call(AggregateService instance) throws IOException {
148             ServerRpcController controller = new ServerRpcController();
149             BlockingRpcCallback<AggregateResponse> rpcCallback = 
150                 new BlockingRpcCallback<AggregateResponse>();
151             instance.getMax(controller, requestArg, rpcCallback);
152             AggregateResponse response = rpcCallback.get();
153             if (controller.failedOnException()) {
154               throw controller.getFailedOn();
155             }
156             if (response.getFirstPartCount() > 0) {
157               ByteString b = response.getFirstPart(0);
158               Q q = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 3, b);
159               return ci.getCellValueFromProto(q);
160             }
161             return null;
162           }
163         }, aMaxCallBack);
164     return aMaxCallBack.getMax();
165   }
166 
167   private void validateParameters(Scan scan) throws IOException {
168     if (scan == null
169         || (Bytes.equals(scan.getStartRow(), scan.getStopRow()) && !Bytes
170             .equals(scan.getStartRow(), HConstants.EMPTY_START_ROW))
171         || ((Bytes.compareTo(scan.getStartRow(), scan.getStopRow()) > 0) &&
172         	!Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW))) {
173       throw new IOException(
174           "Agg client Exception: Startrow should be smaller than Stoprow");
175     } else if (scan.getFamilyMap().size() != 1) {
176       throw new IOException("There must be only one family.");
177     }
178   }
179 
180   /**
181    * It gives the minimum value of a column for a given column family for the
182    * given range. In case qualifier is null, a min of all values for the given
183    * family is returned.
184    * @param tableName
185    * @param ci
186    * @param scan
187    * @return min val <R>
188    * @throws Throwable
189    */
190   public <R, S, P extends Message, Q extends Message, T extends Message> R min(
191       final byte[] tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
192       throws Throwable {
193     HTable table = null;
194     try {
195       table = new HTable(conf, tableName);
196       return min(table, ci, scan);
197     } finally {
198       if (table != null) {
199         table.close();
200       }
201     }
202   }
203 
204   /**
205    * It gives the minimum value of a column for a given column family for the
206    * given range. In case qualifier is null, a min of all values for the given
207    * family is returned.
208    * @param table
209    * @param ci
210    * @param scan
211    * @return min val <R>
212    * @throws Throwable
213    */
214   public <R, S, P extends Message, Q extends Message, T extends Message> 
215   R min(final HTable table, final ColumnInterpreter<R, S, P, Q, T> ci,
216       final Scan scan) throws Throwable {
217     final AggregateArgument requestArg = validateArgAndGetPB(scan, ci);
218     class MinCallBack implements Batch.Callback<R> {
219 
220       private R min = null;
221 
222       public R getMinimum() {
223         return min;
224       }
225 
226       @Override
227       public synchronized void update(byte[] region, byte[] row, R result) {
228         min = (min == null || (result != null && ci.compare(result, min) < 0)) ? result : min;
229       }
230     }
231     MinCallBack minCallBack = new MinCallBack();
232     table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
233         new Batch.Call<AggregateService, R>() {
234 
235           @Override
236           public R call(AggregateService instance) throws IOException {
237             ServerRpcController controller = new ServerRpcController();
238             BlockingRpcCallback<AggregateResponse> rpcCallback = 
239                 new BlockingRpcCallback<AggregateResponse>();
240             instance.getMin(controller, requestArg, rpcCallback);
241             AggregateResponse response = rpcCallback.get();
242             if (controller.failedOnException()) {
243               throw controller.getFailedOn();
244             }
245             if (response.getFirstPartCount() > 0) {
246               ByteString b = response.getFirstPart(0);
247               Q q = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 3, b);
248               return ci.getCellValueFromProto(q);
249             }
250             return null;
251           }
252         }, minCallBack);
253     log.debug("Min fom all regions is: " + minCallBack.getMinimum());
254     return minCallBack.getMinimum();
255   }
256 
257   /**
258    * It gives the row count, by summing up the individual results obtained from
259    * regions. In case the qualifier is null, FirstKeyValueFilter is used to
260    * optimised the operation. In case qualifier is provided, I can't use the
261    * filter as it may set the flag to skip to next row, but the value read is
262    * not of the given filter: in this case, this particular row will not be
263    * counted ==> an error.
264    * @param tableName
265    * @param ci
266    * @param scan
267    * @return <R, S>
268    * @throws Throwable
269    */
270   public <R, S, P extends Message, Q extends Message, T extends Message> long rowCount(
271       final byte[] tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
272       throws Throwable {
273     HTable table = null;
274     try {
275       table = new HTable(conf, tableName);
276       return rowCount(table, ci, scan);
277     } finally {
278       if (table != null) {
279         table.close();
280       }
281     }
282   }
283 
284   /**
285    * It gives the row count, by summing up the individual results obtained from
286    * regions. In case the qualifier is null, FirstKeyValueFilter is used to
287    * optimised the operation. In case qualifier is provided, I can't use the
288    * filter as it may set the flag to skip to next row, but the value read is
289    * not of the given filter: in this case, this particular row will not be
290    * counted ==> an error.
291    * @param table
292    * @param ci
293    * @param scan
294    * @return <R, S>
295    * @throws Throwable
296    */
297   public <R, S, P extends Message, Q extends Message, T extends Message> 
298   long rowCount(final HTable table,
299       final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable {
300     final AggregateArgument requestArg = validateArgAndGetPB(scan, ci);
301     class RowNumCallback implements Batch.Callback<Long> {
302       private final AtomicLong rowCountL = new AtomicLong(0);
303 
304       public long getRowNumCount() {
305         return rowCountL.get();
306       }
307 
308       @Override
309       public void update(byte[] region, byte[] row, Long result) {
310         rowCountL.addAndGet(result.longValue());
311       }
312     }
313     RowNumCallback rowNum = new RowNumCallback();
314     table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
315         new Batch.Call<AggregateService, Long>() {
316           @Override
317           public Long call(AggregateService instance) throws IOException {
318             ServerRpcController controller = new ServerRpcController();
319             BlockingRpcCallback<AggregateResponse> rpcCallback = 
320                 new BlockingRpcCallback<AggregateResponse>();
321             instance.getRowNum(controller, requestArg, rpcCallback);
322             AggregateResponse response = rpcCallback.get();
323             if (controller.failedOnException()) {
324               throw controller.getFailedOn();
325             }
326             byte[] bytes = getBytesFromResponse(response.getFirstPart(0));
327             ByteBuffer bb = ByteBuffer.allocate(8).put(bytes);
328             bb.rewind();
329             return bb.getLong();
330           }
331         }, rowNum);
332     return rowNum.getRowNumCount();
333   }
334 
335   /**
336    * It sums up the value returned from various regions. In case qualifier is
337    * null, summation of all the column qualifiers in the given family is done.
338    * @param tableName
339    * @param ci
340    * @param scan
341    * @return sum <S>
342    * @throws Throwable
343    */
344   public <R, S, P extends Message, Q extends Message, T extends Message> S sum(
345       final byte[] tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
346       throws Throwable {
347     HTable table = null;
348     try {
349       table = new HTable(conf, tableName);
350       return sum(table, ci, scan);
351     } finally {
352       if (table != null) {
353         table.close();
354       }
355     }
356   }
357 
358   /**
359    * It sums up the value returned from various regions. In case qualifier is
360    * null, summation of all the column qualifiers in the given family is done.
361    * @param table
362    * @param ci
363    * @param scan
364    * @return sum <S>
365    * @throws Throwable
366    */
367   public <R, S, P extends Message, Q extends Message, T extends Message> 
368   S sum(final HTable table, final ColumnInterpreter<R, S, P, Q, T> ci,
369       final Scan scan) throws Throwable {
370     final AggregateArgument requestArg = validateArgAndGetPB(scan, ci);
371     
372     class SumCallBack implements Batch.Callback<S> {
373       S sumVal = null;
374 
375       public S getSumResult() {
376         return sumVal;
377       }
378 
379       @Override
380       public synchronized void update(byte[] region, byte[] row, S result) {
381         sumVal = ci.add(sumVal, result);
382       }
383     }
384     SumCallBack sumCallBack = new SumCallBack();
385     table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
386         new Batch.Call<AggregateService, S>() {
387           @Override
388           public S call(AggregateService instance) throws IOException {
389             ServerRpcController controller = new ServerRpcController();
390             BlockingRpcCallback<AggregateResponse> rpcCallback = 
391                 new BlockingRpcCallback<AggregateResponse>();
392             instance.getSum(controller, requestArg, rpcCallback);
393             AggregateResponse response = rpcCallback.get();
394             if (controller.failedOnException()) {
395               throw controller.getFailedOn();
396             }
397             if (response.getFirstPartCount() == 0) {
398               return null;
399             }
400             ByteString b = response.getFirstPart(0);
401             T t = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 4, b);
402             S s = ci.getPromotedValueFromProto(t);
403             return s;
404           }
405         }, sumCallBack);
406     return sumCallBack.getSumResult();
407   }
408 
409   /**
410    * It computes average while fetching sum and row count from all the
411    * corresponding regions. Approach is to compute a global sum of region level
412    * sum and rowcount and then compute the average.
413    * @param tableName
414    * @param scan
415    * @throws Throwable
416    */
417   private <R, S, P extends Message, Q extends Message, T extends Message> Pair<S, Long> getAvgArgs(
418       final byte[] tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
419       throws Throwable {
420     HTable table = null;
421     try {
422       table = new HTable(conf, tableName);
423       return getAvgArgs(table, ci, scan);
424     } finally {
425       if (table != null) {
426         table.close();
427       }
428     }
429   }
430 
431   /**
432    * It computes average while fetching sum and row count from all the
433    * corresponding regions. Approach is to compute a global sum of region level
434    * sum and rowcount and then compute the average.
435    * @param table
436    * @param scan
437    * @throws Throwable
438    */
439   private <R, S, P extends Message, Q extends Message, T extends Message>
440   Pair<S, Long> getAvgArgs(final HTable table,
441       final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable {
442     final AggregateArgument requestArg = validateArgAndGetPB(scan, ci);
443     class AvgCallBack implements Batch.Callback<Pair<S, Long>> {
444       S sum = null;
445       Long rowCount = 0l;
446 
447       public Pair<S, Long> getAvgArgs() {
448         return new Pair<S, Long>(sum, rowCount);
449       }
450 
451       @Override
452       public synchronized void update(byte[] region, byte[] row, Pair<S, Long> result) {
453         sum = ci.add(sum, result.getFirst());
454         rowCount += result.getSecond();
455       }
456     }
457     AvgCallBack avgCallBack = new AvgCallBack();
458     table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
459         new Batch.Call<AggregateService, Pair<S, Long>>() {
460           @Override
461           public Pair<S, Long> call(AggregateService instance) throws IOException {
462             ServerRpcController controller = new ServerRpcController();
463             BlockingRpcCallback<AggregateResponse> rpcCallback = 
464                 new BlockingRpcCallback<AggregateResponse>();
465             instance.getAvg(controller, requestArg, rpcCallback);
466             AggregateResponse response = rpcCallback.get();
467             if (controller.failedOnException()) {
468               throw controller.getFailedOn();
469             }
470             Pair<S, Long> pair = new Pair<S, Long>(null, 0L);
471             if (response.getFirstPartCount() == 0) {
472               return pair;
473             }
474             ByteString b = response.getFirstPart(0);
475             T t = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 4, b);
476             S s = ci.getPromotedValueFromProto(t);
477             pair.setFirst(s);
478             ByteBuffer bb = ByteBuffer.allocate(8).put(
479                 getBytesFromResponse(response.getSecondPart()));
480             bb.rewind();
481             pair.setSecond(bb.getLong());
482             return pair;
483           }
484         }, avgCallBack);
485     return avgCallBack.getAvgArgs();
486   }
487 
488   /**
489    * This is the client side interface/handle for calling the average method for
490    * a given cf-cq combination. It was necessary to add one more call stack as
491    * its return type should be a decimal value, irrespective of what
492    * columninterpreter says. So, this methods collects the necessary parameters
493    * to compute the average and returs the double value.
494    * @param tableName
495    * @param ci
496    * @param scan
497    * @return <R, S>
498    * @throws Throwable
499    */
500   public <R, S, P extends Message, Q extends Message, T extends Message>
501   double avg(final byte[] tableName,
502       final ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) throws Throwable {
503     Pair<S, Long> p = getAvgArgs(tableName, ci, scan);
504     return ci.divideForAvg(p.getFirst(), p.getSecond());
505   }
506 
507   /**
508    * This is the client side interface/handle for calling the average method for
509    * a given cf-cq combination. It was necessary to add one more call stack as
510    * its return type should be a decimal value, irrespective of what
511    * columninterpreter says. So, this methods collects the necessary parameters
512    * to compute the average and returs the double value.
513    * @param table
514    * @param ci
515    * @param scan
516    * @return <R, S>
517    * @throws Throwable
518    */
519   public <R, S, P extends Message, Q extends Message, T extends Message> double avg(
520       final HTable table, final ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) throws Throwable {
521     Pair<S, Long> p = getAvgArgs(table, ci, scan);
522     return ci.divideForAvg(p.getFirst(), p.getSecond());
523   }
524 
525   /**
526    * It computes a global standard deviation for a given column and its value.
527    * Standard deviation is square root of (average of squares -
528    * average*average). From individual regions, it obtains sum, square sum and
529    * number of rows. With these, the above values are computed to get the global
530    * std.
531    * @param table
532    * @param scan
533    * @return
534    * @throws Throwable
535    */
536   private <R, S, P extends Message, Q extends Message, T extends Message>
537   Pair<List<S>, Long> getStdArgs(final HTable table,
538       final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable {
539     final AggregateArgument requestArg = validateArgAndGetPB(scan, ci);
540     class StdCallback implements Batch.Callback<Pair<List<S>, Long>> {
541       long rowCountVal = 0l;
542       S sumVal = null, sumSqVal = null;
543 
544       public Pair<List<S>, Long> getStdParams() {
545         List<S> l = new ArrayList<S>();
546         l.add(sumVal);
547         l.add(sumSqVal);
548         Pair<List<S>, Long> p = new Pair<List<S>, Long>(l, rowCountVal);
549         return p;
550       }
551 
552       @Override
553       public synchronized void update(byte[] region, byte[] row, Pair<List<S>, Long> result) {
554         if (result.getFirst().size() > 0) {
555           sumVal = ci.add(sumVal, result.getFirst().get(0));
556           sumSqVal = ci.add(sumSqVal, result.getFirst().get(1));
557           rowCountVal += result.getSecond();
558         }
559       }
560     }
561     StdCallback stdCallback = new StdCallback();
562     table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
563         new Batch.Call<AggregateService, Pair<List<S>, Long>>() {
564           @Override
565           public Pair<List<S>, Long> call(AggregateService instance) throws IOException {
566             ServerRpcController controller = new ServerRpcController();
567             BlockingRpcCallback<AggregateResponse> rpcCallback = 
568                 new BlockingRpcCallback<AggregateResponse>();
569             instance.getStd(controller, requestArg, rpcCallback);
570             AggregateResponse response = rpcCallback.get();
571             if (controller.failedOnException()) {
572               throw controller.getFailedOn();
573             }
574             Pair<List<S>, Long> pair = new Pair<List<S>, Long>(new ArrayList<S>(), 0L);
575             if (response.getFirstPartCount() == 0) {
576               return pair;
577             }
578             List<S> list = new ArrayList<S>();
579             for (int i = 0; i < response.getFirstPartCount(); i++) {
580               ByteString b = response.getFirstPart(i);
581               T t = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 4, b);
582               S s = ci.getPromotedValueFromProto(t);
583               list.add(s);
584             }
585             pair.setFirst(list);
586             ByteBuffer bb = ByteBuffer.allocate(8).put(
587                 getBytesFromResponse(response.getSecondPart()));
588             bb.rewind();
589             pair.setSecond(bb.getLong());
590             return pair;
591           }
592         }, stdCallback);
593     return stdCallback.getStdParams();
594   }
595 
596   /**
597    * This is the client side interface/handle for calling the std method for a
598    * given cf-cq combination. It was necessary to add one more call stack as its
599    * return type should be a decimal value, irrespective of what
600    * columninterpreter says. So, this methods collects the necessary parameters
601    * to compute the std and returns the double value.
602    * @param tableName
603    * @param ci
604    * @param scan
605    * @return <R, S>
606    * @throws Throwable
607    */
608   public <R, S, P extends Message, Q extends Message, T extends Message>
609   double std(final byte[] tableName, ColumnInterpreter<R, S, P, Q, T> ci,
610       Scan scan) throws Throwable {
611     HTable table = null;
612     try {
613       table = new HTable(conf, tableName);
614       return std(table, ci, scan);
615     } finally {
616       if (table != null) {
617         table.close();
618       }
619     }
620   }
621 
622   /**
623    * This is the client side interface/handle for calling the std method for a
624    * given cf-cq combination. It was necessary to add one more call stack as its
625    * return type should be a decimal value, irrespective of what
626    * columninterpreter says. So, this methods collects the necessary parameters
627    * to compute the std and returns the double value.
628    * @param table
629    * @param ci
630    * @param scan
631    * @return <R, S>
632    * @throws Throwable
633    */
634   public <R, S, P extends Message, Q extends Message, T extends Message> double std(
635       final HTable table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) throws Throwable {
636     Pair<List<S>, Long> p = getStdArgs(table, ci, scan);
637     double res = 0d;
638     double avg = ci.divideForAvg(p.getFirst().get(0), p.getSecond());
639     double avgOfSumSq = ci.divideForAvg(p.getFirst().get(1), p.getSecond());
640     res = avgOfSumSq - (avg) * (avg); // variance
641     res = Math.pow(res, 0.5);
642     return res;
643   }
644 
645   /**
646    * It helps locate the region with median for a given column whose weight 
647    * is specified in an optional column.
648    * From individual regions, it obtains sum of values and sum of weights.
649    * @param table
650    * @param ci
651    * @param scan
652    * @return pair whose first element is a map between start row of the region
653    *  and (sum of values, sum of weights) for the region, the second element is
654    *  (sum of values, sum of weights) for all the regions chosen
655    * @throws Throwable
656    */
657   private <R, S, P extends Message, Q extends Message, T extends Message>
658   Pair<NavigableMap<byte[], List<S>>, List<S>>
659   getMedianArgs(final HTable table,
660       final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable {
661     final AggregateArgument requestArg = validateArgAndGetPB(scan, ci);
662     final NavigableMap<byte[], List<S>> map =
663       new TreeMap<byte[], List<S>>(Bytes.BYTES_COMPARATOR);
664     class StdCallback implements Batch.Callback<List<S>> {
665       S sumVal = null, sumWeights = null;
666 
667       public Pair<NavigableMap<byte[], List<S>>, List<S>> getMedianParams() {
668         List<S> l = new ArrayList<S>();
669         l.add(sumVal);
670         l.add(sumWeights);
671         Pair<NavigableMap<byte[], List<S>>, List<S>> p =
672           new Pair<NavigableMap<byte[], List<S>>, List<S>>(map, l);
673         return p;
674       }
675 
676       @Override
677       public synchronized void update(byte[] region, byte[] row, List<S> result) {
678         map.put(row, result);
679         sumVal = ci.add(sumVal, result.get(0));
680         sumWeights = ci.add(sumWeights, result.get(1));
681       }
682     }
683     StdCallback stdCallback = new StdCallback();
684     table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
685         new Batch.Call<AggregateService, List<S>>() {
686           @Override
687           public List<S> call(AggregateService instance) throws IOException {
688             ServerRpcController controller = new ServerRpcController();
689             BlockingRpcCallback<AggregateResponse> rpcCallback = 
690                 new BlockingRpcCallback<AggregateResponse>();
691             instance.getMedian(controller, requestArg, rpcCallback);
692             AggregateResponse response = rpcCallback.get();
693             if (controller.failedOnException()) {
694               throw controller.getFailedOn();
695             }
696 
697             List<S> list = new ArrayList<S>();
698             for (int i = 0; i < response.getFirstPartCount(); i++) {
699               ByteString b = response.getFirstPart(i);
700               T t = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 4, b);
701               S s = ci.getPromotedValueFromProto(t);
702               list.add(s);
703             }
704             return list;
705           }
706 
707         }, stdCallback);
708     return stdCallback.getMedianParams();
709   }
710 
711   /**
712    * This is the client side interface/handler for calling the median method for a
713    * given cf-cq combination. This method collects the necessary parameters
714    * to compute the median and returns the median.
715    * @param tableName
716    * @param ci
717    * @param scan
718    * @return R the median
719    * @throws Throwable
720    */
721   public <R, S, P extends Message, Q extends Message, T extends Message>
722   R median(final byte[] tableName, ColumnInterpreter<R, S, P, Q, T> ci,
723       Scan scan) throws Throwable {
724     HTable table = null;
725     try {
726       table = new HTable(conf, tableName);
727       return median(table, ci, scan);
728     } finally {
729       if (table != null) {
730         table.close();
731       }
732     }
733   }
734 
735   /**
736    * This is the client side interface/handler for calling the median method for a
737    * given cf-cq combination. This method collects the necessary parameters
738    * to compute the median and returns the median.
739    * @param table
740    * @param ci
741    * @param scan
742    * @return R the median
743    * @throws Throwable
744    */
745   public <R, S, P extends Message, Q extends Message, T extends Message>
746   R median(final HTable table, ColumnInterpreter<R, S, P, Q, T> ci,
747       Scan scan) throws Throwable {
748     Pair<NavigableMap<byte[], List<S>>, List<S>> p = getMedianArgs(table, ci, scan);
749     byte[] startRow = null;
750     byte[] colFamily = scan.getFamilies()[0];
751     NavigableSet<byte[]> quals = scan.getFamilyMap().get(colFamily);
752     NavigableMap<byte[], List<S>> map = p.getFirst();
753     S sumVal = p.getSecond().get(0);
754     S sumWeights = p.getSecond().get(1);
755     double halfSumVal = ci.divideForAvg(sumVal, 2L);
756     double movingSumVal = 0;
757     boolean weighted = false;
758     if (quals.size() > 1) {
759       weighted = true;
760       halfSumVal = ci.divideForAvg(sumWeights, 2L);
761     }
762     
763     for (Map.Entry<byte[], List<S>> entry : map.entrySet()) {
764       S s = weighted ? entry.getValue().get(1) : entry.getValue().get(0);
765       double newSumVal = movingSumVal + ci.divideForAvg(s, 1L);
766       if (newSumVal > halfSumVal) break;  // we found the region with the median
767       movingSumVal = newSumVal;
768       startRow = entry.getKey();
769     }
770     // scan the region with median and find it
771     Scan scan2 = new Scan(scan);
772     // inherit stop row from method parameter
773     if (startRow != null) scan2.setStartRow(startRow);
774     ResultScanner scanner = null;
775     try {
776       int cacheSize = scan2.getCaching();
777       if (!scan2.getCacheBlocks() || scan2.getCaching() < 2) {
778         scan2.setCacheBlocks(true);
779         cacheSize = 5;
780         scan2.setCaching(cacheSize);
781       }
782       scanner = table.getScanner(scan2);
783       Result[] results = null;
784       byte[] qualifier = quals.pollFirst();
785       // qualifier for the weight column
786       byte[] weightQualifier = weighted ? quals.pollLast() : qualifier;
787       R value = null;
788       do {
789         results = scanner.next(cacheSize);
790         if (results != null && results.length > 0) {
791           for (int i = 0; i < results.length; i++) {
792             Result r = results[i];
793             // retrieve weight
794             KeyValue kv = r.getColumnLatest(colFamily, weightQualifier);
795             R newValue = ci.getValue(colFamily, weightQualifier, kv);
796             S s = ci.castToReturnType(newValue);
797             double newSumVal = movingSumVal + ci.divideForAvg(s, 1L);
798             // see if we have moved past the median
799             if (newSumVal > halfSumVal) {
800               return value;
801             }
802             movingSumVal = newSumVal;
803             kv = r.getColumnLatest(colFamily, qualifier);
804             value = ci.getValue(colFamily, qualifier, kv);
805             }
806           }
807       } while (results != null && results.length > 0);
808     } finally {
809       if (scanner != null) {
810         scanner.close();
811       }
812     }
813     return null;
814   }
815 
816   <R, S, P extends Message, Q extends Message, T extends Message> AggregateArgument 
817   validateArgAndGetPB(Scan scan, ColumnInterpreter<R,S,P,Q,T> ci)
818       throws IOException {
819     validateParameters(scan);
820     final AggregateArgument.Builder requestBuilder = 
821         AggregateArgument.newBuilder();
822     requestBuilder.setInterpreterClassName(ci.getClass().getCanonicalName());
823     P columnInterpreterSpecificData = null;
824     if ((columnInterpreterSpecificData = ci.getRequestData()) 
825        != null) {
826       requestBuilder.setInterpreterSpecificBytes(columnInterpreterSpecificData.toByteString());
827     }
828     requestBuilder.setScan(ProtobufUtil.toScan(scan));
829     return requestBuilder.build();
830   }
831 
832   byte[] getBytesFromResponse(ByteString response) {
833     ByteBuffer bb = response.asReadOnlyByteBuffer();
834     bb.rewind();
835     byte[] bytes;
836     if (bb.hasArray()) {
837       bytes = bb.array();
838     } else {
839       bytes = response.toByteArray();
840     }
841     return bytes;
842   }
843 }