View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.client.coprocessor;
21  
22  import java.io.Closeable;
23  import java.io.IOException;
24  import java.nio.ByteBuffer;
25  import java.util.ArrayList;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.NavigableMap;
29  import java.util.NavigableSet;
30  import java.util.TreeMap;
31  import java.util.concurrent.atomic.AtomicLong;
32  
33  import org.apache.commons.logging.Log;
34  import org.apache.commons.logging.LogFactory;
35  import org.apache.hadoop.hbase.classification.InterfaceAudience;
36  import org.apache.hadoop.conf.Configuration;
37  import org.apache.hadoop.hbase.Cell;
38  import org.apache.hadoop.hbase.HConstants;
39  import org.apache.hadoop.hbase.TableName;
40  import org.apache.hadoop.hbase.classification.InterfaceAudience;
41  import org.apache.hadoop.hbase.client.Connection;
42  import org.apache.hadoop.hbase.client.ConnectionFactory;
43  import org.apache.hadoop.hbase.client.Result;
44  import org.apache.hadoop.hbase.client.ResultScanner;
45  import org.apache.hadoop.hbase.client.Scan;
46  import org.apache.hadoop.hbase.client.Table;
47  import org.apache.hadoop.hbase.coprocessor.ColumnInterpreter;
48  import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
49  import org.apache.hadoop.hbase.ipc.ServerRpcController;
50  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
51  import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateRequest;
52  import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateResponse;
53  import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateService;
54  import org.apache.hadoop.hbase.util.Bytes;
55  import org.apache.hadoop.hbase.util.Pair;
56  
57  import com.google.protobuf.ByteString;
58  import com.google.protobuf.Message;
59  
60  /**
61   * This client class is for invoking the aggregate functions deployed on the
62   * Region Server side via the AggregateService. This class will implement the
63   * supporting functionality for summing/processing the individual results
64   * obtained from the AggregateService for each region.
65   * <p>
66   * This will serve as the client side handler for invoking the aggregate
67   * functions.
68   * For all aggregate functions,
69   * <ul>
70   * <li>start row &lt; end row is an essential condition (if they are not
71   * {@link HConstants#EMPTY_BYTE_ARRAY})
72   * <li>Column family can't be null. In case where multiple families are
73   * provided, an IOException will be thrown. An optional column qualifier can
74   * also be defined.</li>
75   * <li>For methods to find maximum, minimum, sum, rowcount, it returns the
76   * parameter type. For average and std, it returns a double value. For row
77   * count, it returns a long value.</li>
78   * </ul>
79   * <p>Call {@link #close()} when done.
80   */
81  @InterfaceAudience.Private
82  public class AggregationClient implements Closeable {
83    // TODO: This class is not used.  Move to examples?
84    private static final Log log = LogFactory.getLog(AggregationClient.class);
85    private final Connection connection;
86  
87    /**
88     * Constructor with Conf object
89     * @param cfg
90     */
91    public AggregationClient(Configuration cfg) {
92      try {
93        // Create a connection on construction. Will use it making each of the calls below.
94        this.connection = ConnectionFactory.createConnection(cfg);
95      } catch (IOException e) {
96        throw new RuntimeException(e);
97      }
98    }
99  
100   @Override
101   public void close() throws IOException {
102     if (this.connection != null && !this.connection.isClosed()) {
103       this.connection.close();
104     }
105   }
106 
107   /**
108    * It gives the maximum value of a column for a given column family for the
109    * given range. In case qualifier is null, a max of all values for the given
110    * family is returned.
111    * @param tableName
112    * @param ci
113    * @param scan
114    * @return max val &lt;R&gt;
115    * @throws Throwable
116    *           The caller is supposed to handle the exception as they are thrown
117    *           &amp; propagated to it.
118    */
119   public <R, S, P extends Message, Q extends Message, T extends Message> R max(
120       final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
121   throws Throwable {
122     try (Table table = connection.getTable(tableName)) {
123       return max(table, ci, scan);
124     }
125   }
126 
127   /**
128    * It gives the maximum value of a column for a given column family for the
129    * given range. In case qualifier is null, a max of all values for the given
130    * family is returned.
131    * @param table
132    * @param ci
133    * @param scan
134    * @return max val &lt;&gt;
135    * @throws Throwable
136    *           The caller is supposed to handle the exception as they are thrown
137    *           &amp; propagated to it.
138    */
139   public <R, S, P extends Message, Q extends Message, T extends Message>
140   R max(final Table table, final ColumnInterpreter<R, S, P, Q, T> ci,
141       final Scan scan) throws Throwable {
142     final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false);
143     class MaxCallBack implements Batch.Callback<R> {
144       R max = null;
145 
146       R getMax() {
147         return max;
148       }
149 
150       @Override
151       public synchronized void update(byte[] region, byte[] row, R result) {
152         max = (max == null || (result != null && ci.compare(max, result) < 0)) ? result : max;
153       }
154     }
155     MaxCallBack aMaxCallBack = new MaxCallBack();
156     table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
157         new Batch.Call<AggregateService, R>() {
158           @Override
159           public R call(AggregateService instance) throws IOException {
160             ServerRpcController controller = new ServerRpcController();
161             BlockingRpcCallback<AggregateResponse> rpcCallback =
162                 new BlockingRpcCallback<AggregateResponse>();
163             instance.getMax(controller, requestArg, rpcCallback);
164             AggregateResponse response = rpcCallback.get();
165             if (controller.failedOnException()) {
166               throw controller.getFailedOn();
167             }
168             if (response.getFirstPartCount() > 0) {
169               ByteString b = response.getFirstPart(0);
170               Q q = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 3, b);
171               return ci.getCellValueFromProto(q);
172             }
173             return null;
174           }
175         }, aMaxCallBack);
176     return aMaxCallBack.getMax();
177   }
178 
179   /*
180    * @param scan
181    * @param canFamilyBeAbsent whether column family can be absent in familyMap of scan
182    */
183   private void validateParameters(Scan scan, boolean canFamilyBeAbsent) throws IOException {
184     if (scan == null
185         || (Bytes.equals(scan.getStartRow(), scan.getStopRow()) && !Bytes
186             .equals(scan.getStartRow(), HConstants.EMPTY_START_ROW))
187         || ((Bytes.compareTo(scan.getStartRow(), scan.getStopRow()) > 0) &&
188           !Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW))) {
189       throw new IOException(
190           "Agg client Exception: Startrow should be smaller than Stoprow");
191     } else if (!canFamilyBeAbsent) {
192       if (scan.getFamilyMap().size() != 1) {
193         throw new IOException("There must be only one family.");
194       }
195     }
196   }
197 
198   /**
199    * It gives the minimum value of a column for a given column family for the
200    * given range. In case qualifier is null, a min of all values for the given
201    * family is returned.
202    * @param tableName
203    * @param ci
204    * @param scan
205    * @return min val &lt;R&gt;
206    * @throws Throwable
207    */
208   public <R, S, P extends Message, Q extends Message, T extends Message> R min(
209       final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
210   throws Throwable {
211     try (Table table = connection.getTable(tableName)) {
212       return min(table, ci, scan);
213     }
214   }
215 
216   /**
217    * It gives the minimum value of a column for a given column family for the
218    * given range. In case qualifier is null, a min of all values for the given
219    * family is returned.
220    * @param table
221    * @param ci
222    * @param scan
223    * @return min val &lt;R&gt;
224    * @throws Throwable
225    */
226   public <R, S, P extends Message, Q extends Message, T extends Message>
227   R min(final Table table, final ColumnInterpreter<R, S, P, Q, T> ci,
228       final Scan scan) throws Throwable {
229     final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false);
230     class MinCallBack implements Batch.Callback<R> {
231 
232       private R min = null;
233 
234       public R getMinimum() {
235         return min;
236       }
237 
238       @Override
239       public synchronized void update(byte[] region, byte[] row, R result) {
240         min = (min == null || (result != null && ci.compare(result, min) < 0)) ? result : min;
241       }
242     }
243     MinCallBack minCallBack = new MinCallBack();
244     table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
245         new Batch.Call<AggregateService, R>() {
246 
247           @Override
248           public R call(AggregateService instance) throws IOException {
249             ServerRpcController controller = new ServerRpcController();
250             BlockingRpcCallback<AggregateResponse> rpcCallback =
251                 new BlockingRpcCallback<AggregateResponse>();
252             instance.getMin(controller, requestArg, rpcCallback);
253             AggregateResponse response = rpcCallback.get();
254             if (controller.failedOnException()) {
255               throw controller.getFailedOn();
256             }
257             if (response.getFirstPartCount() > 0) {
258               ByteString b = response.getFirstPart(0);
259               Q q = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 3, b);
260               return ci.getCellValueFromProto(q);
261             }
262             return null;
263           }
264         }, minCallBack);
265     log.debug("Min fom all regions is: " + minCallBack.getMinimum());
266     return minCallBack.getMinimum();
267   }
268 
269   /**
270    * It gives the row count, by summing up the individual results obtained from
271    * regions. In case the qualifier is null, FirstKeyValueFilter is used to
272    * optimised the operation. In case qualifier is provided, I can't use the
273    * filter as it may set the flag to skip to next row, but the value read is
274    * not of the given filter: in this case, this particular row will not be
275    * counted ==&gt; an error.
276    * @param tableName
277    * @param ci
278    * @param scan
279    * @return &lt;R, S&gt;
280    * @throws Throwable
281    */
282   public <R, S, P extends Message, Q extends Message, T extends Message> long rowCount(
283       final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
284   throws Throwable {
285     try (Table table = connection.getTable(tableName)) {
286         return rowCount(table, ci, scan);
287     }
288   }
289 
290   /**
291    * It gives the row count, by summing up the individual results obtained from
292    * regions. In case the qualifier is null, FirstKeyValueFilter is used to
293    * optimised the operation. In case qualifier is provided, I can't use the
294    * filter as it may set the flag to skip to next row, but the value read is
295    * not of the given filter: in this case, this particular row will not be
296    * counted ==&gt; an error.
297    * @param table
298    * @param ci
299    * @param scan
300    * @return &lt;R, S&gt;
301    * @throws Throwable
302    */
303   public <R, S, P extends Message, Q extends Message, T extends Message>
304   long rowCount(final Table table,
305       final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable {
306     final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, true);
307     class RowNumCallback implements Batch.Callback<Long> {
308       private final AtomicLong rowCountL = new AtomicLong(0);
309 
310       public long getRowNumCount() {
311         return rowCountL.get();
312       }
313 
314       @Override
315       public void update(byte[] region, byte[] row, Long result) {
316         rowCountL.addAndGet(result.longValue());
317       }
318     }
319     RowNumCallback rowNum = new RowNumCallback();
320     table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
321         new Batch.Call<AggregateService, Long>() {
322           @Override
323           public Long call(AggregateService instance) throws IOException {
324             ServerRpcController controller = new ServerRpcController();
325             BlockingRpcCallback<AggregateResponse> rpcCallback =
326                 new BlockingRpcCallback<AggregateResponse>();
327             instance.getRowNum(controller, requestArg, rpcCallback);
328             AggregateResponse response = rpcCallback.get();
329             if (controller.failedOnException()) {
330               throw controller.getFailedOn();
331             }
332             byte[] bytes = getBytesFromResponse(response.getFirstPart(0));
333             ByteBuffer bb = ByteBuffer.allocate(8).put(bytes);
334             bb.rewind();
335             return bb.getLong();
336           }
337         }, rowNum);
338     return rowNum.getRowNumCount();
339   }
340 
341   /**
342    * It sums up the value returned from various regions. In case qualifier is
343    * null, summation of all the column qualifiers in the given family is done.
344    * @param tableName
345    * @param ci
346    * @param scan
347    * @return sum &lt;S&gt;
348    * @throws Throwable
349    */
350   public <R, S, P extends Message, Q extends Message, T extends Message> S sum(
351       final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
352   throws Throwable {
353     try (Table table = connection.getTable(tableName)) {
354         return sum(table, ci, scan);
355     }
356   }
357 
358   /**
359    * It sums up the value returned from various regions. In case qualifier is
360    * null, summation of all the column qualifiers in the given family is done.
361    * @param table
362    * @param ci
363    * @param scan
364    * @return sum &lt;S&gt;
365    * @throws Throwable
366    */
367   public <R, S, P extends Message, Q extends Message, T extends Message>
368   S sum(final Table table, final ColumnInterpreter<R, S, P, Q, T> ci,
369       final Scan scan) throws Throwable {
370     final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false);
371 
372     class SumCallBack implements Batch.Callback<S> {
373       S sumVal = null;
374 
375       public S getSumResult() {
376         return sumVal;
377       }
378 
379       @Override
380       public synchronized void update(byte[] region, byte[] row, S result) {
381         sumVal = ci.add(sumVal, result);
382       }
383     }
384     SumCallBack sumCallBack = new SumCallBack();
385     table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
386         new Batch.Call<AggregateService, S>() {
387           @Override
388           public S call(AggregateService instance) throws IOException {
389             ServerRpcController controller = new ServerRpcController();
390             BlockingRpcCallback<AggregateResponse> rpcCallback =
391                 new BlockingRpcCallback<AggregateResponse>();
392             instance.getSum(controller, requestArg, rpcCallback);
393             AggregateResponse response = rpcCallback.get();
394             if (controller.failedOnException()) {
395               throw controller.getFailedOn();
396             }
397             if (response.getFirstPartCount() == 0) {
398               return null;
399             }
400             ByteString b = response.getFirstPart(0);
401             T t = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 4, b);
402             S s = ci.getPromotedValueFromProto(t);
403             return s;
404           }
405         }, sumCallBack);
406     return sumCallBack.getSumResult();
407   }
408 
409   /**
410    * It computes average while fetching sum and row count from all the
411    * corresponding regions. Approach is to compute a global sum of region level
412    * sum and rowcount and then compute the average.
413    * @param tableName
414    * @param scan
415    * @throws Throwable
416    */
417   private <R, S, P extends Message, Q extends Message, T extends Message> Pair<S, Long> getAvgArgs(
418       final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
419       throws Throwable {
420     try (Table table = connection.getTable(tableName)) {
421         return getAvgArgs(table, ci, scan);
422     }
423   }
424 
425   /**
426    * It computes average while fetching sum and row count from all the
427    * corresponding regions. Approach is to compute a global sum of region level
428    * sum and rowcount and then compute the average.
429    * @param table
430    * @param scan
431    * @throws Throwable
432    */
433   private <R, S, P extends Message, Q extends Message, T extends Message>
434   Pair<S, Long> getAvgArgs(final Table table,
435       final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable {
436     final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false);
437     class AvgCallBack implements Batch.Callback<Pair<S, Long>> {
438       S sum = null;
439       Long rowCount = 0l;
440 
441       public synchronized Pair<S, Long> getAvgArgs() {
442         return new Pair<S, Long>(sum, rowCount);
443       }
444 
445       @Override
446       public synchronized void update(byte[] region, byte[] row, Pair<S, Long> result) {
447         sum = ci.add(sum, result.getFirst());
448         rowCount += result.getSecond();
449       }
450     }
451     AvgCallBack avgCallBack = new AvgCallBack();
452     table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
453         new Batch.Call<AggregateService, Pair<S, Long>>() {
454           @Override
455           public Pair<S, Long> call(AggregateService instance) throws IOException {
456             ServerRpcController controller = new ServerRpcController();
457             BlockingRpcCallback<AggregateResponse> rpcCallback =
458                 new BlockingRpcCallback<AggregateResponse>();
459             instance.getAvg(controller, requestArg, rpcCallback);
460             AggregateResponse response = rpcCallback.get();
461             if (controller.failedOnException()) {
462               throw controller.getFailedOn();
463             }
464             Pair<S, Long> pair = new Pair<S, Long>(null, 0L);
465             if (response.getFirstPartCount() == 0) {
466               return pair;
467             }
468             ByteString b = response.getFirstPart(0);
469             T t = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 4, b);
470             S s = ci.getPromotedValueFromProto(t);
471             pair.setFirst(s);
472             ByteBuffer bb = ByteBuffer.allocate(8).put(
473                 getBytesFromResponse(response.getSecondPart()));
474             bb.rewind();
475             pair.setSecond(bb.getLong());
476             return pair;
477           }
478         }, avgCallBack);
479     return avgCallBack.getAvgArgs();
480   }
481 
482   /**
483    * This is the client side interface/handle for calling the average method for
484    * a given cf-cq combination. It was necessary to add one more call stack as
485    * its return type should be a decimal value, irrespective of what
486    * columninterpreter says. So, this methods collects the necessary parameters
487    * to compute the average and returs the double value.
488    * @param tableName
489    * @param ci
490    * @param scan
491    * @return &lt;R, S&gt;
492    * @throws Throwable
493    */
494   public <R, S, P extends Message, Q extends Message, T extends Message>
495   double avg(final TableName tableName,
496       final ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) throws Throwable {
497     Pair<S, Long> p = getAvgArgs(tableName, ci, scan);
498     return ci.divideForAvg(p.getFirst(), p.getSecond());
499   }
500 
501   /**
502    * This is the client side interface/handle for calling the average method for
503    * a given cf-cq combination. It was necessary to add one more call stack as
504    * its return type should be a decimal value, irrespective of what
505    * columninterpreter says. So, this methods collects the necessary parameters
506    * to compute the average and returs the double value.
507    * @param table
508    * @param ci
509    * @param scan
510    * @return &lt;R, S&gt;
511    * @throws Throwable
512    */
513   public <R, S, P extends Message, Q extends Message, T extends Message> double avg(
514       final Table table, final ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) throws Throwable {
515     Pair<S, Long> p = getAvgArgs(table, ci, scan);
516     return ci.divideForAvg(p.getFirst(), p.getSecond());
517   }
518 
519   /**
520    * It computes a global standard deviation for a given column and its value.
521    * Standard deviation is square root of (average of squares -
522    * average*average). From individual regions, it obtains sum, square sum and
523    * number of rows. With these, the above values are computed to get the global
524    * std.
525    * @param table
526    * @param scan
527    * @return standard deviations
528    * @throws Throwable
529    */
530   private <R, S, P extends Message, Q extends Message, T extends Message>
531   Pair<List<S>, Long> getStdArgs(final Table table,
532       final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable {
533     final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false);
534     class StdCallback implements Batch.Callback<Pair<List<S>, Long>> {
535       long rowCountVal = 0l;
536       S sumVal = null, sumSqVal = null;
537 
538       public synchronized Pair<List<S>, Long> getStdParams() {
539         List<S> l = new ArrayList<S>();
540         l.add(sumVal);
541         l.add(sumSqVal);
542         Pair<List<S>, Long> p = new Pair<List<S>, Long>(l, rowCountVal);
543         return p;
544       }
545 
546       @Override
547       public synchronized void update(byte[] region, byte[] row, Pair<List<S>, Long> result) {
548         if (result.getFirst().size() > 0) {
549           sumVal = ci.add(sumVal, result.getFirst().get(0));
550           sumSqVal = ci.add(sumSqVal, result.getFirst().get(1));
551           rowCountVal += result.getSecond();
552         }
553       }
554     }
555     StdCallback stdCallback = new StdCallback();
556     table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
557         new Batch.Call<AggregateService, Pair<List<S>, Long>>() {
558           @Override
559           public Pair<List<S>, Long> call(AggregateService instance) throws IOException {
560             ServerRpcController controller = new ServerRpcController();
561             BlockingRpcCallback<AggregateResponse> rpcCallback =
562                 new BlockingRpcCallback<AggregateResponse>();
563             instance.getStd(controller, requestArg, rpcCallback);
564             AggregateResponse response = rpcCallback.get();
565             if (controller.failedOnException()) {
566               throw controller.getFailedOn();
567             }
568             Pair<List<S>, Long> pair = new Pair<List<S>, Long>(new ArrayList<S>(), 0L);
569             if (response.getFirstPartCount() == 0) {
570               return pair;
571             }
572             List<S> list = new ArrayList<S>();
573             for (int i = 0; i < response.getFirstPartCount(); i++) {
574               ByteString b = response.getFirstPart(i);
575               T t = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 4, b);
576               S s = ci.getPromotedValueFromProto(t);
577               list.add(s);
578             }
579             pair.setFirst(list);
580             ByteBuffer bb = ByteBuffer.allocate(8).put(
581                 getBytesFromResponse(response.getSecondPart()));
582             bb.rewind();
583             pair.setSecond(bb.getLong());
584             return pair;
585           }
586         }, stdCallback);
587     return stdCallback.getStdParams();
588   }
589 
590   /**
591    * This is the client side interface/handle for calling the std method for a
592    * given cf-cq combination. It was necessary to add one more call stack as its
593    * return type should be a decimal value, irrespective of what
594    * columninterpreter says. So, this methods collects the necessary parameters
595    * to compute the std and returns the double value.
596    * @param tableName
597    * @param ci
598    * @param scan
599    * @return &lt;R, S&gt;
600    * @throws Throwable
601    */
602   public <R, S, P extends Message, Q extends Message, T extends Message>
603   double std(final TableName tableName, ColumnInterpreter<R, S, P, Q, T> ci,
604       Scan scan) throws Throwable {
605     try (Table table = connection.getTable(tableName)) {
606         return std(table, ci, scan);
607     }
608   }
609 
610   /**
611    * This is the client side interface/handle for calling the std method for a
612    * given cf-cq combination. It was necessary to add one more call stack as its
613    * return type should be a decimal value, irrespective of what
614    * columninterpreter says. So, this methods collects the necessary parameters
615    * to compute the std and returns the double value.
616    * @param table
617    * @param ci
618    * @param scan
619    * @return &lt;R, S&gt;
620    * @throws Throwable
621    */
622   public <R, S, P extends Message, Q extends Message, T extends Message> double std(
623       final Table table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) throws Throwable {
624     Pair<List<S>, Long> p = getStdArgs(table, ci, scan);
625     double res = 0d;
626     double avg = ci.divideForAvg(p.getFirst().get(0), p.getSecond());
627     double avgOfSumSq = ci.divideForAvg(p.getFirst().get(1), p.getSecond());
628     res = avgOfSumSq - (avg) * (avg); // variance
629     res = Math.pow(res, 0.5);
630     return res;
631   }
632 
633   /**
634    * It helps locate the region with median for a given column whose weight
635    * is specified in an optional column.
636    * From individual regions, it obtains sum of values and sum of weights.
637    * @param table
638    * @param ci
639    * @param scan
640    * @return pair whose first element is a map between start row of the region
641    *  and (sum of values, sum of weights) for the region, the second element is
642    *  (sum of values, sum of weights) for all the regions chosen
643    * @throws Throwable
644    */
645   private <R, S, P extends Message, Q extends Message, T extends Message>
646   Pair<NavigableMap<byte[], List<S>>, List<S>>
647   getMedianArgs(final Table table,
648       final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable {
649     final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false);
650     final NavigableMap<byte[], List<S>> map =
651       new TreeMap<byte[], List<S>>(Bytes.BYTES_COMPARATOR);
652     class StdCallback implements Batch.Callback<List<S>> {
653       S sumVal = null, sumWeights = null;
654 
655       public synchronized Pair<NavigableMap<byte[], List<S>>, List<S>> getMedianParams() {
656         List<S> l = new ArrayList<S>();
657         l.add(sumVal);
658         l.add(sumWeights);
659         Pair<NavigableMap<byte[], List<S>>, List<S>> p =
660           new Pair<NavigableMap<byte[], List<S>>, List<S>>(map, l);
661         return p;
662       }
663 
664       @Override
665       public synchronized void update(byte[] region, byte[] row, List<S> result) {
666         map.put(row, result);
667         sumVal = ci.add(sumVal, result.get(0));
668         sumWeights = ci.add(sumWeights, result.get(1));
669       }
670     }
671     StdCallback stdCallback = new StdCallback();
672     table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
673         new Batch.Call<AggregateService, List<S>>() {
674           @Override
675           public List<S> call(AggregateService instance) throws IOException {
676             ServerRpcController controller = new ServerRpcController();
677             BlockingRpcCallback<AggregateResponse> rpcCallback =
678                 new BlockingRpcCallback<AggregateResponse>();
679             instance.getMedian(controller, requestArg, rpcCallback);
680             AggregateResponse response = rpcCallback.get();
681             if (controller.failedOnException()) {
682               throw controller.getFailedOn();
683             }
684 
685             List<S> list = new ArrayList<S>();
686             for (int i = 0; i < response.getFirstPartCount(); i++) {
687               ByteString b = response.getFirstPart(i);
688               T t = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 4, b);
689               S s = ci.getPromotedValueFromProto(t);
690               list.add(s);
691             }
692             return list;
693           }
694 
695         }, stdCallback);
696     return stdCallback.getMedianParams();
697   }
698 
699   /**
700    * This is the client side interface/handler for calling the median method for a
701    * given cf-cq combination. This method collects the necessary parameters
702    * to compute the median and returns the median.
703    * @param tableName
704    * @param ci
705    * @param scan
706    * @return R the median
707    * @throws Throwable
708    */
709   public <R, S, P extends Message, Q extends Message, T extends Message>
710   R median(final TableName tableName, ColumnInterpreter<R, S, P, Q, T> ci,
711       Scan scan) throws Throwable {
712     try (Table table = connection.getTable(tableName)) {
713         return median(table, ci, scan);
714     }
715   }
716 
717   /**
718    * This is the client side interface/handler for calling the median method for a
719    * given cf-cq combination. This method collects the necessary parameters
720    * to compute the median and returns the median.
721    * @param table
722    * @param ci
723    * @param scan
724    * @return R the median
725    * @throws Throwable
726    */
727   public <R, S, P extends Message, Q extends Message, T extends Message>
728   R median(final Table table, ColumnInterpreter<R, S, P, Q, T> ci,
729       Scan scan) throws Throwable {
730     Pair<NavigableMap<byte[], List<S>>, List<S>> p = getMedianArgs(table, ci, scan);
731     byte[] startRow = null;
732     byte[] colFamily = scan.getFamilies()[0];
733     NavigableSet<byte[]> quals = scan.getFamilyMap().get(colFamily);
734     NavigableMap<byte[], List<S>> map = p.getFirst();
735     S sumVal = p.getSecond().get(0);
736     S sumWeights = p.getSecond().get(1);
737     double halfSumVal = ci.divideForAvg(sumVal, 2L);
738     double movingSumVal = 0;
739     boolean weighted = false;
740     if (quals.size() > 1) {
741       weighted = true;
742       halfSumVal = ci.divideForAvg(sumWeights, 2L);
743     }
744 
745     for (Map.Entry<byte[], List<S>> entry : map.entrySet()) {
746       S s = weighted ? entry.getValue().get(1) : entry.getValue().get(0);
747       double newSumVal = movingSumVal + ci.divideForAvg(s, 1L);
748       if (newSumVal > halfSumVal) break;  // we found the region with the median
749       movingSumVal = newSumVal;
750       startRow = entry.getKey();
751     }
752     // scan the region with median and find it
753     Scan scan2 = new Scan(scan);
754     // inherit stop row from method parameter
755     if (startRow != null) scan2.setStartRow(startRow);
756     ResultScanner scanner = null;
757     try {
758       int cacheSize = scan2.getCaching();
759       if (!scan2.getCacheBlocks() || scan2.getCaching() < 2) {
760         scan2.setCacheBlocks(true);
761         cacheSize = 5;
762         scan2.setCaching(cacheSize);
763       }
764       scanner = table.getScanner(scan2);
765       Result[] results = null;
766       byte[] qualifier = quals.pollFirst();
767       // qualifier for the weight column
768       byte[] weightQualifier = weighted ? quals.pollLast() : qualifier;
769       R value = null;
770       do {
771         results = scanner.next(cacheSize);
772         if (results != null && results.length > 0) {
773           for (int i = 0; i < results.length; i++) {
774             Result r = results[i];
775             // retrieve weight
776             Cell kv = r.getColumnLatest(colFamily, weightQualifier);
777             R newValue = ci.getValue(colFamily, weightQualifier, kv);
778             S s = ci.castToReturnType(newValue);
779             double newSumVal = movingSumVal + ci.divideForAvg(s, 1L);
780             // see if we have moved past the median
781             if (newSumVal > halfSumVal) {
782               return value;
783             }
784             movingSumVal = newSumVal;
785             kv = r.getColumnLatest(colFamily, qualifier);
786             value = ci.getValue(colFamily, qualifier, kv);
787             }
788           }
789       } while (results != null && results.length > 0);
790     } finally {
791       if (scanner != null) {
792         scanner.close();
793       }
794     }
795     return null;
796   }
797 
798   <R, S, P extends Message, Q extends Message, T extends Message> AggregateRequest
799   validateArgAndGetPB(Scan scan, ColumnInterpreter<R,S,P,Q,T> ci, boolean canFamilyBeAbsent)
800       throws IOException {
801     validateParameters(scan, canFamilyBeAbsent);
802     final AggregateRequest.Builder requestBuilder =
803         AggregateRequest.newBuilder();
804     requestBuilder.setInterpreterClassName(ci.getClass().getCanonicalName());
805     P columnInterpreterSpecificData = null;
806     if ((columnInterpreterSpecificData = ci.getRequestData())
807        != null) {
808       requestBuilder.setInterpreterSpecificBytes(columnInterpreterSpecificData.toByteString());
809     }
810     requestBuilder.setScan(ProtobufUtil.toScan(scan));
811     return requestBuilder.build();
812   }
813 
814   byte[] getBytesFromResponse(ByteString response) {
815     ByteBuffer bb = response.asReadOnlyByteBuffer();
816     bb.rewind();
817     byte[] bytes;
818     if (bb.hasArray()) {
819       bytes = bb.array();
820     } else {
821       bytes = response.toByteArray();
822     }
823     return bytes;
824   }
825 }