View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.client.coprocessor;
21  
22  import java.io.Closeable;
23  import java.io.IOException;
24  import java.nio.ByteBuffer;
25  import java.util.ArrayList;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.NavigableMap;
29  import java.util.NavigableSet;
30  import java.util.TreeMap;
31  import java.util.concurrent.atomic.AtomicLong;
32  
33  import org.apache.commons.logging.Log;
34  import org.apache.commons.logging.LogFactory;
35  import org.apache.hadoop.conf.Configuration;
36  import org.apache.hadoop.hbase.Cell;
37  import org.apache.hadoop.hbase.HConstants;
38  import org.apache.hadoop.hbase.TableName;
39  import org.apache.hadoop.hbase.classification.InterfaceAudience;
40  import org.apache.hadoop.hbase.client.Connection;
41  import org.apache.hadoop.hbase.client.ConnectionFactory;
42  import org.apache.hadoop.hbase.client.Result;
43  import org.apache.hadoop.hbase.client.ResultScanner;
44  import org.apache.hadoop.hbase.client.Scan;
45  import org.apache.hadoop.hbase.client.Table;
46  import org.apache.hadoop.hbase.coprocessor.ColumnInterpreter;
47  import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
48  import org.apache.hadoop.hbase.ipc.ServerRpcController;
49  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
50  import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateRequest;
51  import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateResponse;
52  import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateService;
53  import org.apache.hadoop.hbase.util.Bytes;
54  import org.apache.hadoop.hbase.util.Pair;
55  
56  import com.google.protobuf.ByteString;
57  import com.google.protobuf.Message;
58  
59  /**
60   * This client class is for invoking the aggregate functions deployed on the
61   * Region Server side via the AggregateService. This class will implement the
62   * supporting functionality for summing/processing the individual results
63   * obtained from the AggregateService for each region.
64   * <p>
65   * This will serve as the client side handler for invoking the aggregate
66   * functions.
67   * For all aggregate functions,
68   * <ul>
69   * <li>start row &lt; end row is an essential condition (if they are not
70   * {@link HConstants#EMPTY_BYTE_ARRAY})
71   * <li>Column family can't be null. In case where multiple families are
72   * provided, an IOException will be thrown. An optional column qualifier can
73   * also be defined.</li>
74   * <li>For methods to find maximum, minimum, sum, rowcount, it returns the
75   * parameter type. For average and std, it returns a double value. For row
76   * count, it returns a long value.</li>
77   * </ul>
78   * <p>Call {@link #close()} when done.
79   */
80  @InterfaceAudience.Private
81  public class AggregationClient implements Closeable {
82    // TODO: This class is not used.  Move to examples?
83    private static final Log log = LogFactory.getLog(AggregationClient.class);
84    private final Connection connection;
85  
86    /**
87     * Constructor with Conf object
88     * @param cfg
89     */
90    public AggregationClient(Configuration cfg) {
91      try {
92        // Create a connection on construction. Will use it making each of the calls below.
93        this.connection = ConnectionFactory.createConnection(cfg);
94      } catch (IOException e) {
95        throw new RuntimeException(e);
96      }
97    }
98  
99    @Override
100   public void close() throws IOException {
101     if (this.connection != null && !this.connection.isClosed()) {
102       this.connection.close();
103     }
104   }
105 
106   /**
107    * It gives the maximum value of a column for a given column family for the
108    * given range. In case qualifier is null, a max of all values for the given
109    * family is returned.
110    * @param tableName
111    * @param ci
112    * @param scan
113    * @return max val &lt;R&gt;
114    * @throws Throwable
115    *           The caller is supposed to handle the exception as they are thrown
116    *           &amp; propagated to it.
117    */
118   public <R, S, P extends Message, Q extends Message, T extends Message> R max(
119       final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
120   throws Throwable {
121     try (Table table = connection.getTable(tableName)) {
122       return max(table, ci, scan);
123     }
124   }
125 
126   /**
127    * It gives the maximum value of a column for a given column family for the
128    * given range. In case qualifier is null, a max of all values for the given
129    * family is returned.
130    * @param table
131    * @param ci
132    * @param scan
133    * @return max val &lt;&gt;
134    * @throws Throwable
135    *           The caller is supposed to handle the exception as they are thrown
136    *           &amp; propagated to it.
137    */
138   public <R, S, P extends Message, Q extends Message, T extends Message> 
139   R max(final Table table, final ColumnInterpreter<R, S, P, Q, T> ci,
140       final Scan scan) throws Throwable {
141     final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false);
142     class MaxCallBack implements Batch.Callback<R> {
143       R max = null;
144 
145       R getMax() {
146         return max;
147       }
148 
149       @Override
150       public synchronized void update(byte[] region, byte[] row, R result) {
151         max = (max == null || (result != null && ci.compare(max, result) < 0)) ? result : max;
152       }
153     }
154     MaxCallBack aMaxCallBack = new MaxCallBack();
155     table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
156         new Batch.Call<AggregateService, R>() {
157           @Override
158           public R call(AggregateService instance) throws IOException {
159             ServerRpcController controller = new ServerRpcController();
160             BlockingRpcCallback<AggregateResponse> rpcCallback = 
161                 new BlockingRpcCallback<AggregateResponse>();
162             instance.getMax(controller, requestArg, rpcCallback);
163             AggregateResponse response = rpcCallback.get();
164             if (controller.failedOnException()) {
165               throw controller.getFailedOn();
166             }
167             if (response.getFirstPartCount() > 0) {
168               ByteString b = response.getFirstPart(0);
169               Q q = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 3, b);
170               return ci.getCellValueFromProto(q);
171             }
172             return null;
173           }
174         }, aMaxCallBack);
175     return aMaxCallBack.getMax();
176   }
177 
178   /*
179    * @param scan
180    * @param canFamilyBeAbsent whether column family can be absent in familyMap of scan
181    */
182   private void validateParameters(Scan scan, boolean canFamilyBeAbsent) throws IOException {
183     if (scan == null
184         || (Bytes.equals(scan.getStartRow(), scan.getStopRow()) && !Bytes.equals(
185           scan.getStartRow(), HConstants.EMPTY_START_ROW))
186         || ((Bytes.compareTo(scan.getStartRow(), scan.getStopRow()) > 0) && !Bytes.equals(
187           scan.getStopRow(), HConstants.EMPTY_END_ROW))) {
188       throw new IOException("Agg client Exception: Startrow should be smaller than Stoprow");
189     } else if (!canFamilyBeAbsent) {
190       if (scan.getFamilyMap().size() != 1) {
191         throw new IOException("There must be only one family.");
192       }
193     }
194   }
195 
196   /**
197    * It gives the minimum value of a column for a given column family for the
198    * given range. In case qualifier is null, a min of all values for the given
199    * family is returned.
200    * @param tableName
201    * @param ci
202    * @param scan
203    * @return min val &lt;R&gt;
204    * @throws Throwable
205    */
206   public <R, S, P extends Message, Q extends Message, T extends Message> R min(
207       final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
208   throws Throwable {
209     try (Table table = connection.getTable(tableName)) {
210       return min(table, ci, scan);
211     }
212   }
213 
214   /**
215    * It gives the minimum value of a column for a given column family for the
216    * given range. In case qualifier is null, a min of all values for the given
217    * family is returned.
218    * @param table
219    * @param ci
220    * @param scan
221    * @return min val &lt;R&gt;
222    * @throws Throwable
223    */
224   public <R, S, P extends Message, Q extends Message, T extends Message> 
225   R min(final Table table, final ColumnInterpreter<R, S, P, Q, T> ci,
226       final Scan scan) throws Throwable {
227     final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false);
228     class MinCallBack implements Batch.Callback<R> {
229 
230       private R min = null;
231 
232       public R getMinimum() {
233         return min;
234       }
235 
236       @Override
237       public synchronized void update(byte[] region, byte[] row, R result) {
238         min = (min == null || (result != null && ci.compare(result, min) < 0)) ? result : min;
239       }
240     }
241     MinCallBack minCallBack = new MinCallBack();
242     table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
243         new Batch.Call<AggregateService, R>() {
244 
245           @Override
246           public R call(AggregateService instance) throws IOException {
247             ServerRpcController controller = new ServerRpcController();
248             BlockingRpcCallback<AggregateResponse> rpcCallback = 
249                 new BlockingRpcCallback<AggregateResponse>();
250             instance.getMin(controller, requestArg, rpcCallback);
251             AggregateResponse response = rpcCallback.get();
252             if (controller.failedOnException()) {
253               throw controller.getFailedOn();
254             }
255             if (response.getFirstPartCount() > 0) {
256               ByteString b = response.getFirstPart(0);
257               Q q = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 3, b);
258               return ci.getCellValueFromProto(q);
259             }
260             return null;
261           }
262         }, minCallBack);
263     log.debug("Min fom all regions is: " + minCallBack.getMinimum());
264     return minCallBack.getMinimum();
265   }
266 
267   /**
268    * It gives the row count, by summing up the individual results obtained from
269    * regions. In case the qualifier is null, FirstKeyValueFilter is used to
270    * optimised the operation. In case qualifier is provided, I can't use the
271    * filter as it may set the flag to skip to next row, but the value read is
272    * not of the given filter: in this case, this particular row will not be
273    * counted ==&gt; an error.
274    * @param tableName
275    * @param ci
276    * @param scan
277    * @return &lt;R, S&gt;
278    * @throws Throwable
279    */
280   public <R, S, P extends Message, Q extends Message, T extends Message> long rowCount(
281       final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
282   throws Throwable {
283     try (Table table = connection.getTable(tableName)) {
284         return rowCount(table, ci, scan);
285     }
286   }
287 
288   /**
289    * It gives the row count, by summing up the individual results obtained from
290    * regions. In case the qualifier is null, FirstKeyValueFilter is used to
291    * optimised the operation. In case qualifier is provided, I can't use the
292    * filter as it may set the flag to skip to next row, but the value read is
293    * not of the given filter: in this case, this particular row will not be
294    * counted ==&gt; an error.
295    * @param table
296    * @param ci
297    * @param scan
298    * @return &lt;R, S&gt;
299    * @throws Throwable
300    */
301   public <R, S, P extends Message, Q extends Message, T extends Message> 
302   long rowCount(final Table table,
303       final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable {
304     final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, true);
305     class RowNumCallback implements Batch.Callback<Long> {
306       private final AtomicLong rowCountL = new AtomicLong(0);
307 
308       public long getRowNumCount() {
309         return rowCountL.get();
310       }
311 
312       @Override
313       public void update(byte[] region, byte[] row, Long result) {
314         rowCountL.addAndGet(result.longValue());
315       }
316     }
317     RowNumCallback rowNum = new RowNumCallback();
318     table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
319         new Batch.Call<AggregateService, Long>() {
320           @Override
321           public Long call(AggregateService instance) throws IOException {
322             ServerRpcController controller = new ServerRpcController();
323             BlockingRpcCallback<AggregateResponse> rpcCallback = 
324                 new BlockingRpcCallback<AggregateResponse>();
325             instance.getRowNum(controller, requestArg, rpcCallback);
326             AggregateResponse response = rpcCallback.get();
327             if (controller.failedOnException()) {
328               throw controller.getFailedOn();
329             }
330             byte[] bytes = getBytesFromResponse(response.getFirstPart(0));
331             ByteBuffer bb = ByteBuffer.allocate(8).put(bytes);
332             bb.rewind();
333             return bb.getLong();
334           }
335         }, rowNum);
336     return rowNum.getRowNumCount();
337   }
338 
339   /**
340    * It sums up the value returned from various regions. In case qualifier is
341    * null, summation of all the column qualifiers in the given family is done.
342    * @param tableName
343    * @param ci
344    * @param scan
345    * @return sum &lt;S&gt;
346    * @throws Throwable
347    */
348   public <R, S, P extends Message, Q extends Message, T extends Message> S sum(
349       final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
350   throws Throwable {
351     try (Table table = connection.getTable(tableName)) {
352         return sum(table, ci, scan);
353     }
354   }
355 
356   /**
357    * It sums up the value returned from various regions. In case qualifier is
358    * null, summation of all the column qualifiers in the given family is done.
359    * @param table
360    * @param ci
361    * @param scan
362    * @return sum &lt;S&gt;
363    * @throws Throwable
364    */
365   public <R, S, P extends Message, Q extends Message, T extends Message> 
366   S sum(final Table table, final ColumnInterpreter<R, S, P, Q, T> ci,
367       final Scan scan) throws Throwable {
368     final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false);
369     
370     class SumCallBack implements Batch.Callback<S> {
371       S sumVal = null;
372 
373       public S getSumResult() {
374         return sumVal;
375       }
376 
377       @Override
378       public synchronized void update(byte[] region, byte[] row, S result) {
379         sumVal = ci.add(sumVal, result);
380       }
381     }
382     SumCallBack sumCallBack = new SumCallBack();
383     table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
384         new Batch.Call<AggregateService, S>() {
385           @Override
386           public S call(AggregateService instance) throws IOException {
387             ServerRpcController controller = new ServerRpcController();
388             BlockingRpcCallback<AggregateResponse> rpcCallback = 
389                 new BlockingRpcCallback<AggregateResponse>();
390             instance.getSum(controller, requestArg, rpcCallback);
391             AggregateResponse response = rpcCallback.get();
392             if (controller.failedOnException()) {
393               throw controller.getFailedOn();
394             }
395             if (response.getFirstPartCount() == 0) {
396               return null;
397             }
398             ByteString b = response.getFirstPart(0);
399             T t = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 4, b);
400             S s = ci.getPromotedValueFromProto(t);
401             return s;
402           }
403         }, sumCallBack);
404     return sumCallBack.getSumResult();
405   }
406 
407   /**
408    * It computes average while fetching sum and row count from all the
409    * corresponding regions. Approach is to compute a global sum of region level
410    * sum and rowcount and then compute the average.
411    * @param tableName
412    * @param scan
413    * @throws Throwable
414    */
415   private <R, S, P extends Message, Q extends Message, T extends Message> Pair<S, Long> getAvgArgs(
416       final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
417       throws Throwable {
418     try (Table table = connection.getTable(tableName)) {
419         return getAvgArgs(table, ci, scan);
420     }
421   }
422 
423   /**
424    * It computes average while fetching sum and row count from all the
425    * corresponding regions. Approach is to compute a global sum of region level
426    * sum and rowcount and then compute the average.
427    * @param table
428    * @param scan
429    * @throws Throwable
430    */
431   private <R, S, P extends Message, Q extends Message, T extends Message>
432   Pair<S, Long> getAvgArgs(final Table table,
433       final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable {
434     final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false);
435     class AvgCallBack implements Batch.Callback<Pair<S, Long>> {
436       S sum = null;
437       Long rowCount = 0l;
438 
439       public synchronized Pair<S, Long> getAvgArgs() {
440         return new Pair<S, Long>(sum, rowCount);
441       }
442 
443       @Override
444       public synchronized void update(byte[] region, byte[] row, Pair<S, Long> result) {
445         sum = ci.add(sum, result.getFirst());
446         rowCount += result.getSecond();
447       }
448     }
449     AvgCallBack avgCallBack = new AvgCallBack();
450     table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
451         new Batch.Call<AggregateService, Pair<S, Long>>() {
452           @Override
453           public Pair<S, Long> call(AggregateService instance) throws IOException {
454             ServerRpcController controller = new ServerRpcController();
455             BlockingRpcCallback<AggregateResponse> rpcCallback = 
456                 new BlockingRpcCallback<AggregateResponse>();
457             instance.getAvg(controller, requestArg, rpcCallback);
458             AggregateResponse response = rpcCallback.get();
459             if (controller.failedOnException()) {
460               throw controller.getFailedOn();
461             }
462             Pair<S, Long> pair = new Pair<S, Long>(null, 0L);
463             if (response.getFirstPartCount() == 0) {
464               return pair;
465             }
466             ByteString b = response.getFirstPart(0);
467             T t = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 4, b);
468             S s = ci.getPromotedValueFromProto(t);
469             pair.setFirst(s);
470             ByteBuffer bb = ByteBuffer.allocate(8).put(
471                 getBytesFromResponse(response.getSecondPart()));
472             bb.rewind();
473             pair.setSecond(bb.getLong());
474             return pair;
475           }
476         }, avgCallBack);
477     return avgCallBack.getAvgArgs();
478   }
479 
480   /**
481    * This is the client side interface/handle for calling the average method for
482    * a given cf-cq combination. It was necessary to add one more call stack as
483    * its return type should be a decimal value, irrespective of what
484    * columninterpreter says. So, this methods collects the necessary parameters
485    * to compute the average and returs the double value.
486    * @param tableName
487    * @param ci
488    * @param scan
489    * @return &lt;R, S&gt;
490    * @throws Throwable
491    */
492   public <R, S, P extends Message, Q extends Message, T extends Message>
493   double avg(final TableName tableName,
494       final ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) throws Throwable {
495     Pair<S, Long> p = getAvgArgs(tableName, ci, scan);
496     return ci.divideForAvg(p.getFirst(), p.getSecond());
497   }
498 
499   /**
500    * This is the client side interface/handle for calling the average method for
501    * a given cf-cq combination. It was necessary to add one more call stack as
502    * its return type should be a decimal value, irrespective of what
503    * columninterpreter says. So, this methods collects the necessary parameters
504    * to compute the average and returs the double value.
505    * @param table
506    * @param ci
507    * @param scan
508    * @return &lt;R, S&gt;
509    * @throws Throwable
510    */
511   public <R, S, P extends Message, Q extends Message, T extends Message> double avg(
512       final Table table, final ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) throws Throwable {
513     Pair<S, Long> p = getAvgArgs(table, ci, scan);
514     return ci.divideForAvg(p.getFirst(), p.getSecond());
515   }
516 
517   /**
518    * It computes a global standard deviation for a given column and its value.
519    * Standard deviation is square root of (average of squares -
520    * average*average). From individual regions, it obtains sum, square sum and
521    * number of rows. With these, the above values are computed to get the global
522    * std.
523    * @param table
524    * @param scan
525    * @return standard deviations
526    * @throws Throwable
527    */
528   private <R, S, P extends Message, Q extends Message, T extends Message>
529   Pair<List<S>, Long> getStdArgs(final Table table,
530       final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable {
531     final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false);
532     class StdCallback implements Batch.Callback<Pair<List<S>, Long>> {
533       long rowCountVal = 0l;
534       S sumVal = null, sumSqVal = null;
535 
536       public synchronized Pair<List<S>, Long> getStdParams() {
537         List<S> l = new ArrayList<S>();
538         l.add(sumVal);
539         l.add(sumSqVal);
540         Pair<List<S>, Long> p = new Pair<List<S>, Long>(l, rowCountVal);
541         return p;
542       }
543 
544       @Override
545       public synchronized void update(byte[] region, byte[] row, Pair<List<S>, Long> result) {
546         if (result.getFirst().size() > 0) {
547           sumVal = ci.add(sumVal, result.getFirst().get(0));
548           sumSqVal = ci.add(sumSqVal, result.getFirst().get(1));
549           rowCountVal += result.getSecond();
550         }
551       }
552     }
553     StdCallback stdCallback = new StdCallback();
554     table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
555         new Batch.Call<AggregateService, Pair<List<S>, Long>>() {
556           @Override
557           public Pair<List<S>, Long> call(AggregateService instance) throws IOException {
558             ServerRpcController controller = new ServerRpcController();
559             BlockingRpcCallback<AggregateResponse> rpcCallback = 
560                 new BlockingRpcCallback<AggregateResponse>();
561             instance.getStd(controller, requestArg, rpcCallback);
562             AggregateResponse response = rpcCallback.get();
563             if (controller.failedOnException()) {
564               throw controller.getFailedOn();
565             }
566             Pair<List<S>, Long> pair = new Pair<List<S>, Long>(new ArrayList<S>(), 0L);
567             if (response.getFirstPartCount() == 0) {
568               return pair;
569             }
570             List<S> list = new ArrayList<S>();
571             for (int i = 0; i < response.getFirstPartCount(); i++) {
572               ByteString b = response.getFirstPart(i);
573               T t = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 4, b);
574               S s = ci.getPromotedValueFromProto(t);
575               list.add(s);
576             }
577             pair.setFirst(list);
578             ByteBuffer bb = ByteBuffer.allocate(8).put(
579                 getBytesFromResponse(response.getSecondPart()));
580             bb.rewind();
581             pair.setSecond(bb.getLong());
582             return pair;
583           }
584         }, stdCallback);
585     return stdCallback.getStdParams();
586   }
587 
588   /**
589    * This is the client side interface/handle for calling the std method for a
590    * given cf-cq combination. It was necessary to add one more call stack as its
591    * return type should be a decimal value, irrespective of what
592    * columninterpreter says. So, this methods collects the necessary parameters
593    * to compute the std and returns the double value.
594    * @param tableName
595    * @param ci
596    * @param scan
597    * @return &lt;R, S&gt;
598    * @throws Throwable
599    */
600   public <R, S, P extends Message, Q extends Message, T extends Message>
601   double std(final TableName tableName, ColumnInterpreter<R, S, P, Q, T> ci,
602       Scan scan) throws Throwable {
603     try (Table table = connection.getTable(tableName)) {
604         return std(table, ci, scan);
605     }
606   }
607 
608   /**
609    * This is the client side interface/handle for calling the std method for a
610    * given cf-cq combination. It was necessary to add one more call stack as its
611    * return type should be a decimal value, irrespective of what
612    * columninterpreter says. So, this methods collects the necessary parameters
613    * to compute the std and returns the double value.
614    * @param table
615    * @param ci
616    * @param scan
617    * @return &lt;R, S&gt;
618    * @throws Throwable
619    */
620   public <R, S, P extends Message, Q extends Message, T extends Message> double std(
621       final Table table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) throws Throwable {
622     Pair<List<S>, Long> p = getStdArgs(table, ci, scan);
623     double res = 0d;
624     double avg = ci.divideForAvg(p.getFirst().get(0), p.getSecond());
625     double avgOfSumSq = ci.divideForAvg(p.getFirst().get(1), p.getSecond());
626     res = avgOfSumSq - (avg) * (avg); // variance
627     res = Math.pow(res, 0.5);
628     return res;
629   }
630 
631   /**
632    * It helps locate the region with median for a given column whose weight 
633    * is specified in an optional column.
634    * From individual regions, it obtains sum of values and sum of weights.
635    * @param table
636    * @param ci
637    * @param scan
638    * @return pair whose first element is a map between start row of the region
639    *  and (sum of values, sum of weights) for the region, the second element is
640    *  (sum of values, sum of weights) for all the regions chosen
641    * @throws Throwable
642    */
643   private <R, S, P extends Message, Q extends Message, T extends Message>
644   Pair<NavigableMap<byte[], List<S>>, List<S>>
645   getMedianArgs(final Table table,
646       final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable {
647     final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false);
648     final NavigableMap<byte[], List<S>> map =
649       new TreeMap<byte[], List<S>>(Bytes.BYTES_COMPARATOR);
650     class StdCallback implements Batch.Callback<List<S>> {
651       S sumVal = null, sumWeights = null;
652 
653       public synchronized Pair<NavigableMap<byte[], List<S>>, List<S>> getMedianParams() {
654         List<S> l = new ArrayList<S>();
655         l.add(sumVal);
656         l.add(sumWeights);
657         Pair<NavigableMap<byte[], List<S>>, List<S>> p =
658           new Pair<NavigableMap<byte[], List<S>>, List<S>>(map, l);
659         return p;
660       }
661 
662       @Override
663       public synchronized void update(byte[] region, byte[] row, List<S> result) {
664         map.put(row, result);
665         sumVal = ci.add(sumVal, result.get(0));
666         sumWeights = ci.add(sumWeights, result.get(1));
667       }
668     }
669     StdCallback stdCallback = new StdCallback();
670     table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
671         new Batch.Call<AggregateService, List<S>>() {
672           @Override
673           public List<S> call(AggregateService instance) throws IOException {
674             ServerRpcController controller = new ServerRpcController();
675             BlockingRpcCallback<AggregateResponse> rpcCallback = 
676                 new BlockingRpcCallback<AggregateResponse>();
677             instance.getMedian(controller, requestArg, rpcCallback);
678             AggregateResponse response = rpcCallback.get();
679             if (controller.failedOnException()) {
680               throw controller.getFailedOn();
681             }
682 
683             List<S> list = new ArrayList<S>();
684             for (int i = 0; i < response.getFirstPartCount(); i++) {
685               ByteString b = response.getFirstPart(i);
686               T t = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 4, b);
687               S s = ci.getPromotedValueFromProto(t);
688               list.add(s);
689             }
690             return list;
691           }
692 
693         }, stdCallback);
694     return stdCallback.getMedianParams();
695   }
696 
697   /**
698    * This is the client side interface/handler for calling the median method for a
699    * given cf-cq combination. This method collects the necessary parameters
700    * to compute the median and returns the median.
701    * @param tableName
702    * @param ci
703    * @param scan
704    * @return R the median
705    * @throws Throwable
706    */
707   public <R, S, P extends Message, Q extends Message, T extends Message>
708   R median(final TableName tableName, ColumnInterpreter<R, S, P, Q, T> ci,
709       Scan scan) throws Throwable {
710     try (Table table = connection.getTable(tableName)) {
711         return median(table, ci, scan);
712     }
713   }
714 
715   /**
716    * This is the client side interface/handler for calling the median method for a
717    * given cf-cq combination. This method collects the necessary parameters
718    * to compute the median and returns the median.
719    * @param table
720    * @param ci
721    * @param scan
722    * @return R the median
723    * @throws Throwable
724    */
725   public <R, S, P extends Message, Q extends Message, T extends Message>
726   R median(final Table table, ColumnInterpreter<R, S, P, Q, T> ci,
727       Scan scan) throws Throwable {
728     Pair<NavigableMap<byte[], List<S>>, List<S>> p = getMedianArgs(table, ci, scan);
729     byte[] startRow = null;
730     byte[] colFamily = scan.getFamilies()[0];
731     NavigableSet<byte[]> quals = scan.getFamilyMap().get(colFamily);
732     NavigableMap<byte[], List<S>> map = p.getFirst();
733     S sumVal = p.getSecond().get(0);
734     S sumWeights = p.getSecond().get(1);
735     double halfSumVal = ci.divideForAvg(sumVal, 2L);
736     double movingSumVal = 0;
737     boolean weighted = false;
738     if (quals.size() > 1) {
739       weighted = true;
740       halfSumVal = ci.divideForAvg(sumWeights, 2L);
741     }
742     
743     for (Map.Entry<byte[], List<S>> entry : map.entrySet()) {
744       S s = weighted ? entry.getValue().get(1) : entry.getValue().get(0);
745       double newSumVal = movingSumVal + ci.divideForAvg(s, 1L);
746       if (newSumVal > halfSumVal) break;  // we found the region with the median
747       movingSumVal = newSumVal;
748       startRow = entry.getKey();
749     }
750     // scan the region with median and find it
751     Scan scan2 = new Scan(scan);
752     // inherit stop row from method parameter
753     if (startRow != null) scan2.setStartRow(startRow);
754     ResultScanner scanner = null;
755     try {
756       int cacheSize = scan2.getCaching();
757       if (!scan2.getCacheBlocks() || scan2.getCaching() < 2) {
758         scan2.setCacheBlocks(true);
759         cacheSize = 5;
760         scan2.setCaching(cacheSize);
761       }
762       scanner = table.getScanner(scan2);
763       Result[] results = null;
764       byte[] qualifier = quals.pollFirst();
765       // qualifier for the weight column
766       byte[] weightQualifier = weighted ? quals.pollLast() : qualifier;
767       R value = null;
768       do {
769         results = scanner.next(cacheSize);
770         if (results != null && results.length > 0) {
771           for (int i = 0; i < results.length; i++) {
772             Result r = results[i];
773             // retrieve weight
774             Cell kv = r.getColumnLatestCell(colFamily, weightQualifier);
775             R newValue = ci.getValue(colFamily, weightQualifier, kv);
776             S s = ci.castToReturnType(newValue);
777             double newSumVal = movingSumVal + ci.divideForAvg(s, 1L);
778             // see if we have moved past the median
779             if (newSumVal > halfSumVal) {
780               return value;
781             }
782             movingSumVal = newSumVal;
783             kv = r.getColumnLatestCell(colFamily, qualifier);
784             value = ci.getValue(colFamily, qualifier, kv);
785             }
786           }
787       } while (results != null && results.length > 0);
788     } finally {
789       if (scanner != null) {
790         scanner.close();
791       }
792     }
793     return null;
794   }
795 
796   <R, S, P extends Message, Q extends Message, T extends Message> AggregateRequest 
797   validateArgAndGetPB(Scan scan, ColumnInterpreter<R,S,P,Q,T> ci, boolean canFamilyBeAbsent)
798       throws IOException {
799     validateParameters(scan, canFamilyBeAbsent);
800     final AggregateRequest.Builder requestBuilder = 
801         AggregateRequest.newBuilder();
802     requestBuilder.setInterpreterClassName(ci.getClass().getCanonicalName());
803     P columnInterpreterSpecificData = null;
804     if ((columnInterpreterSpecificData = ci.getRequestData()) 
805        != null) {
806       requestBuilder.setInterpreterSpecificBytes(columnInterpreterSpecificData.toByteString());
807     }
808     requestBuilder.setScan(ProtobufUtil.toScan(scan));
809     return requestBuilder.build();
810   }
811 
812   byte[] getBytesFromResponse(ByteString response) {
813     ByteBuffer bb = response.asReadOnlyByteBuffer();
814     bb.rewind();
815     byte[] bytes;
816     if (bb.hasArray()) {
817       bytes = bb.array();
818     } else {
819       bytes = response.toByteArray();
820     }
821     return bytes;
822   }
823 }