View Javadoc

1   /*
2    * Copyright 2011 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  
21  package org.apache.hadoop.hbase.client.coprocessor;
22  
23  import java.io.IOException;
24  import java.util.ArrayList;
25  import java.util.List;
26  import java.util.Map;
27  import java.util.NavigableMap;
28  import java.util.NavigableSet;
29  import java.util.TreeMap;
30  import java.util.concurrent.atomic.AtomicLong;
31  
32  import org.apache.commons.logging.Log;
33  import org.apache.commons.logging.LogFactory;
34  import org.apache.hadoop.conf.Configuration;
35  import org.apache.hadoop.hbase.HConstants;
36  import org.apache.hadoop.hbase.KeyValue;
37  import org.apache.hadoop.hbase.client.HTable;
38  import org.apache.hadoop.hbase.client.Result;
39  import org.apache.hadoop.hbase.client.ResultScanner;
40  import org.apache.hadoop.hbase.client.Scan;
41  import org.apache.hadoop.hbase.coprocessor.AggregateProtocol;
42  import org.apache.hadoop.hbase.coprocessor.ColumnInterpreter;
43  import org.apache.hadoop.hbase.util.Bytes;
44  import org.apache.hadoop.hbase.util.Pair;
45  
46  /**
47   * This client class is for invoking the aggregate functions deployed on the
48   * Region Server side via the AggregateProtocol. This class will implement the
49   * supporting functionality for summing/processing the individual results
50   * obtained from the AggregateProtocol for each region.
51   * <p>
52   * This will serve as the client side handler for invoking the aggregate
53   * functions.
54   * <ul>
55   * For all aggregate functions,
56   * <li>start row < end row is an essential condition (if they are not
57   * {@link HConstants#EMPTY_BYTE_ARRAY})
58   * <li>Column family can't be null. In case where multiple families are
59   * provided, an IOException will be thrown. An optional column qualifier can
60   * also be defined.
61   * <li>For methods to find maximum, minimum, sum, rowcount, it returns the
62   * parameter type. For average and std, it returns a double value. For row
63   * count, it returns a long value.
64   */
65  public class AggregationClient {
66  
67    private static final Log log = LogFactory.getLog(AggregationClient.class);
68    Configuration conf;
69  
70    /**
71     * Constructor with Conf object
72     * @param cfg
73     */
74    public AggregationClient(Configuration cfg) {
75      this.conf = cfg;
76    }
77  
78    /**
79     * It gives the maximum value of a column for a given column family for the
80     * given range. In case qualifier is null, a max of all values for the given
81     * family is returned.
82     * @param tableName
83     * @param ci
84     * @param scan
85     * @return max val <R>
86     * @throws Throwable
87     *           The caller is supposed to handle the exception as they are thrown
88     *           & propagated to it.
89     */
90    public <R, S> R max(final byte[] tableName, final ColumnInterpreter<R, S> ci,
91        final Scan scan) throws Throwable {
92      validateParameters(scan);
93      class MaxCallBack implements Batch.Callback<R> {
94        R max = null;
95  
96        R getMax() {
97          return max;
98        }
99  
100       @Override
101       public synchronized void update(byte[] region, byte[] row, R result) {
102         max = (max == null || (result != null && ci.compare(max, result) < 0)) ? result : max;
103       }
104     }
105     MaxCallBack aMaxCallBack = new MaxCallBack();
106     HTable table = null;
107     try {
108       table = new HTable(conf, tableName);
109       table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(),
110           scan.getStopRow(), new Batch.Call<AggregateProtocol, R>() {
111             @Override
112             public R call(AggregateProtocol instance) throws IOException {
113               return instance.getMax(ci, scan);
114             }
115           }, aMaxCallBack);
116     } finally {
117       if (table != null) {
118         table.close();
119       }
120     }
121     return aMaxCallBack.getMax();
122   }
123 
124   private void validateParameters(Scan scan) throws IOException {
125     if (scan == null
126         || (Bytes.equals(scan.getStartRow(), scan.getStopRow()) && !Bytes
127             .equals(scan.getStartRow(), HConstants.EMPTY_START_ROW))
128         || ((Bytes.compareTo(scan.getStartRow(), scan.getStopRow()) > 0) &&
129         	!Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW))) {
130       throw new IOException(
131           "Agg client Exception: Startrow should be smaller than Stoprow");
132     } else if (scan.getFamilyMap().size() != 1) {
133       throw new IOException("There must be only one family.");
134     }
135   }
136 
137   /**
138    * It gives the minimum value of a column for a given column family for the
139    * given range. In case qualifier is null, a min of all values for the given
140    * family is returned.
141    * @param tableName
142    * @param ci
143    * @param scan
144    * @return min val <R>
145    * @throws Throwable
146    */
147   public <R, S> R min(final byte[] tableName, final ColumnInterpreter<R, S> ci,
148       final Scan scan) throws Throwable {
149     validateParameters(scan);
150     class MinCallBack implements Batch.Callback<R> {
151 
152       private R min = null;
153 
154       public R getMinimum() {
155         return min;
156       }
157 
158       @Override
159       public synchronized void update(byte[] region, byte[] row, R result) {
160         min = (min == null || (result != null && ci.compare(result, min) < 0)) ? result : min;
161       }
162     }
163     MinCallBack minCallBack = new MinCallBack();
164     HTable table = null;
165     try {
166       table = new HTable(conf, tableName);
167       table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(),
168           scan.getStopRow(), new Batch.Call<AggregateProtocol, R>() {
169 
170             @Override
171             public R call(AggregateProtocol instance) throws IOException {
172               return instance.getMin(ci, scan);
173             }
174           }, minCallBack);
175     } finally {
176       if (table != null) {
177         table.close();
178       }
179     }
180     log.debug("Min fom all regions is: " + minCallBack.getMinimum());
181     return minCallBack.getMinimum();
182   }
183 
184   /**
185    * It gives the row count, by summing up the individual results obtained from
186    * regions. In case the qualifier is null, FirstKEyValueFilter is used to
187    * optimised the operation. In case qualifier is provided, I can't use the
188    * filter as it may set the flag to skip to next row, but the value read is
189    * not of the given filter: in this case, this particular row will not be
190    * counted ==> an error.
191    * @param tableName
192    * @param ci
193    * @param scan
194    * @return <R, S>
195    * @throws Throwable
196    */
197   public <R, S> long rowCount(final byte[] tableName,
198       final ColumnInterpreter<R, S> ci, final Scan scan) throws Throwable {
199     validateParameters(scan);
200     class RowNumCallback implements Batch.Callback<Long> {
201       private final AtomicLong rowCountL = new AtomicLong(0);
202 
203       public long getRowNumCount() {
204         return rowCountL.get();
205       }
206 
207       @Override
208       public void update(byte[] region, byte[] row, Long result) {
209         rowCountL.addAndGet(result.longValue());
210       }
211     }
212     RowNumCallback rowNum = new RowNumCallback();
213     HTable table = null;
214     try {
215       table = new HTable(conf, tableName);
216       table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(),
217           scan.getStopRow(), new Batch.Call<AggregateProtocol, Long>() {
218             @Override
219             public Long call(AggregateProtocol instance) throws IOException {
220               return instance.getRowNum(ci, scan);
221             }
222           }, rowNum);
223     } finally {
224       if (table != null) {
225         table.close();
226       }
227     }
228     return rowNum.getRowNumCount();
229   }
230 
231   /**
232    * It sums up the value returned from various regions. In case qualifier is
233    * null, summation of all the column qualifiers in the given family is done.
234    * @param tableName
235    * @param ci
236    * @param scan
237    * @return sum <S>
238    * @throws Throwable
239    */
240   public <R, S> S sum(final byte[] tableName, final ColumnInterpreter<R, S> ci,
241       final Scan scan) throws Throwable {
242     validateParameters(scan);
243     class SumCallBack implements Batch.Callback<S> {
244       S sumVal = null;
245 
246       public S getSumResult() {
247         return sumVal;
248       }
249 
250       @Override
251       public synchronized void update(byte[] region, byte[] row, S result) {
252         sumVal = ci.add(sumVal, result);
253       }
254     }
255     SumCallBack sumCallBack = new SumCallBack();
256     HTable table = null;
257     try {
258       table = new HTable(conf, tableName);
259       table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(),
260           scan.getStopRow(), new Batch.Call<AggregateProtocol, S>() {
261             @Override
262             public S call(AggregateProtocol instance) throws IOException {
263               return instance.getSum(ci, scan);
264             }
265           }, sumCallBack);
266     } finally {
267       if (table != null) {
268         table.close();
269       }
270     }
271     return sumCallBack.getSumResult();
272   }
273 
274   /**
275    * It computes average while fetching sum and row count from all the
276    * corresponding regions. Approach is to compute a global sum of region level
277    * sum and rowcount and then compute the average.
278    * @param tableName
279    * @param scan
280    * @throws Throwable
281    */
282   private <R, S> Pair<S, Long> getAvgArgs(final byte[] tableName,
283       final ColumnInterpreter<R, S> ci, final Scan scan) throws Throwable {
284     validateParameters(scan);
285     class AvgCallBack implements Batch.Callback<Pair<S, Long>> {
286       S sum = null;
287       Long rowCount = 0l;
288 
289       public Pair<S, Long> getAvgArgs() {
290         return new Pair<S, Long>(sum, rowCount);
291       }
292 
293       @Override
294       public synchronized void update(byte[] region, byte[] row, Pair<S, Long> result) {
295         sum = ci.add(sum, result.getFirst());
296         rowCount += result.getSecond();
297       }
298     }
299     AvgCallBack avgCallBack = new AvgCallBack();
300     HTable table = null;
301     try {
302       table = new HTable(conf, tableName);
303       table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(),
304           scan.getStopRow(),
305           new Batch.Call<AggregateProtocol, Pair<S, Long>>() {
306             @Override
307             public Pair<S, Long> call(AggregateProtocol instance)
308                 throws IOException {
309               return instance.getAvg(ci, scan);
310             }
311           }, avgCallBack);
312     } finally {
313       if (table != null) {
314         table.close();
315       }
316     }
317     return avgCallBack.getAvgArgs();
318   }
319 
320   /**
321    * This is the client side interface/handle for calling the average method for
322    * a given cf-cq combination. It was necessary to add one more call stack as
323    * its return type should be a decimal value, irrespective of what
324    * columninterpreter says. So, this methods collects the necessary parameters
325    * to compute the average and returs the double value.
326    * @param tableName
327    * @param ci
328    * @param scan
329    * @return <R, S>
330    * @throws Throwable
331    */
332   public <R, S> double avg(final byte[] tableName,
333       final ColumnInterpreter<R, S> ci, Scan scan) throws Throwable {
334     Pair<S, Long> p = getAvgArgs(tableName, ci, scan);
335     return ci.divideForAvg(p.getFirst(), p.getSecond());
336   }
337 
338   /**
339    * It computes a global standard deviation for a given column and its value.
340    * Standard deviation is square root of (average of squares -
341    * average*average). From individual regions, it obtains sum, square sum and
342    * number of rows. With these, the above values are computed to get the global
343    * std.
344    * @param tableName
345    * @param scan
346    * @return
347    * @throws Throwable
348    */
349   private <R, S> Pair<List<S>, Long> getStdArgs(final byte[] tableName,
350       final ColumnInterpreter<R, S> ci, final Scan scan) throws Throwable {
351     validateParameters(scan);
352     class StdCallback implements Batch.Callback<Pair<List<S>, Long>> {
353       long rowCountVal = 0l;
354       S sumVal = null, sumSqVal = null;
355 
356       public Pair<List<S>, Long> getStdParams() {
357         List<S> l = new ArrayList<S>();
358         l.add(sumVal);
359         l.add(sumSqVal);
360         Pair<List<S>, Long> p = new Pair<List<S>, Long>(l, rowCountVal);
361         return p;
362       }
363 
364       @Override
365       public synchronized void update(byte[] region, byte[] row, Pair<List<S>, Long> result) {
366         sumVal = ci.add(sumVal, result.getFirst().get(0));
367         sumSqVal = ci.add(sumSqVal, result.getFirst().get(1));
368         rowCountVal += result.getSecond();
369       }
370     }
371     StdCallback stdCallback = new StdCallback();
372     HTable table = null;
373     try {
374       table = new HTable(conf, tableName);
375       table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(),
376           scan.getStopRow(),
377           new Batch.Call<AggregateProtocol, Pair<List<S>, Long>>() {
378             @Override
379             public Pair<List<S>, Long> call(AggregateProtocol instance)
380                 throws IOException {
381               return instance.getStd(ci, scan);
382             }
383 
384           }, stdCallback);
385     } finally {
386       if (table != null) {
387         table.close();
388       }
389     }
390     return stdCallback.getStdParams();
391   }
392 
393   /**
394    * This is the client side interface/handle for calling the std method for a
395    * given cf-cq combination. It was necessary to add one more call stack as its
396    * return type should be a decimal value, irrespective of what
397    * columninterpreter says. So, this methods collects the necessary parameters
398    * to compute the std and returns the double value.
399    * @param tableName
400    * @param ci
401    * @param scan
402    * @return <R, S>
403    * @throws Throwable
404    */
405   public <R, S> double std(final byte[] tableName, ColumnInterpreter<R, S> ci,
406       Scan scan) throws Throwable {
407     Pair<List<S>, Long> p = getStdArgs(tableName, ci, scan);
408     double res = 0d;
409     double avg = ci.divideForAvg(p.getFirst().get(0), p.getSecond());
410     double avgOfSumSq = ci.divideForAvg(p.getFirst().get(1), p.getSecond());
411     res = avgOfSumSq - (avg) * (avg); // variance
412     res = Math.pow(res, 0.5);
413     return res;
414   }
415 
416   /**
417    * It helps locate the region with median for a given column whose weight 
418    * is specified in an optional column.
419    * From individual regions, it obtains sum of values and sum of weights.
420    * @param tableName
421    * @param ci
422    * @param scan
423    * @return pair whose first element is a map between start row of the region
424    *  and (sum of values, sum of weights) for the region, the second element is
425    *  (sum of values, sum of weights) for all the regions chosen
426    * @throws Throwable
427    */
428   private <R, S> Pair<NavigableMap<byte[], List<S>>, List<S>>
429   getMedianArgs(final byte[] tableName,
430       final ColumnInterpreter<R, S> ci, final Scan scan) throws Throwable {
431     validateParameters(scan);
432     final NavigableMap<byte[], List<S>> map =
433       new TreeMap<byte[], List<S>>(Bytes.BYTES_COMPARATOR);
434     class StdCallback implements Batch.Callback<List<S>> {
435       S sumVal = null, sumWeights = null;
436 
437       public Pair<NavigableMap<byte[], List<S>>, List<S>> getMedianParams() {
438         List<S> l = new ArrayList<S>();
439         l.add(sumVal);
440         l.add(sumWeights);
441         Pair<NavigableMap<byte[], List<S>>, List<S>> p =
442           new Pair<NavigableMap<byte[], List<S>>, List<S>>(map, l);
443         return p;
444       }
445 
446       @Override
447       public synchronized void update(byte[] region, byte[] row, List<S> result) {
448         map.put(row, result);
449         sumVal = ci.add(sumVal, result.get(0));
450         sumWeights = ci.add(sumWeights, result.get(1));
451       }
452     }
453     StdCallback stdCallback = new StdCallback();
454     HTable table = null;
455     try {
456       table = new HTable(conf, tableName);
457       table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(),
458           scan.getStopRow(), new Batch.Call<AggregateProtocol, List<S>>() {
459             @Override
460             public List<S> call(AggregateProtocol instance) throws IOException {
461               return instance.getMedian(ci, scan);
462             }
463 
464           }, stdCallback);
465     } finally {
466       if (table != null) {
467         table.close();
468       }
469     }
470     return stdCallback.getMedianParams();
471   }
472 
473   /**
474    * This is the client side interface/handler for calling the median method for a
475    * given cf-cq combination. This method collects the necessary parameters
476    * to compute the median and returns the median.
477    * @param tableName
478    * @param ci
479    * @param scan
480    * @return R the median
481    * @throws Throwable
482    */
483   public <R, S> R median(final byte[] tableName, ColumnInterpreter<R, S> ci,
484       Scan scan) throws Throwable {
485     Pair<NavigableMap<byte[], List<S>>, List<S>> p = getMedianArgs(tableName, ci, scan);
486     byte[] startRow = null;
487     byte[] colFamily = scan.getFamilies()[0];
488     NavigableSet<byte[]> quals = scan.getFamilyMap().get(colFamily);
489     NavigableMap<byte[], List<S>> map = p.getFirst();
490     S sumVal = p.getSecond().get(0);
491     S sumWeights = p.getSecond().get(1);
492     double halfSumVal = ci.divideForAvg(sumVal, 2L);
493     double movingSumVal = 0;
494     boolean weighted = false;
495     if (quals.size() > 1) {
496       weighted = true;
497       halfSumVal = ci.divideForAvg(sumWeights, 2L);
498     }
499     
500     for (Map.Entry<byte[], List<S>> entry : map.entrySet()) {
501       S s = weighted ? entry.getValue().get(1) : entry.getValue().get(0);
502       double newSumVal = movingSumVal + ci.divideForAvg(s, 1L);
503       if (newSumVal > halfSumVal) break;  // we found the region with the median
504       movingSumVal = newSumVal;
505       startRow = entry.getKey();
506     }
507     // scan the region with median and find it
508     Scan scan2 = new Scan(scan);
509     // inherit stop row from method parameter
510     if (startRow != null) scan2.setStartRow(startRow);
511     HTable table = null;
512     ResultScanner scanner = null;
513     try {
514       table = new HTable(conf, tableName);
515       int cacheSize = scan2.getCaching();
516       if (!scan2.getCacheBlocks() || scan2.getCaching() < 2) {
517         scan2.setCacheBlocks(true);
518         cacheSize = 5;
519         scan2.setCaching(cacheSize);
520       }
521       scanner = table.getScanner(scan2);
522       Result[] results = null;
523       byte[] qualifier = quals.pollFirst();
524       // qualifier for the weight column
525       byte[] weightQualifier = weighted ? quals.pollLast() : qualifier;
526       R value = null;
527       do {
528         results = scanner.next(cacheSize);
529         if (results != null && results.length > 0) {
530           for (int i = 0; i < results.length; i++) {
531             Result r = results[i];
532             // retrieve weight
533             KeyValue kv = r.getColumnLatest(colFamily, weightQualifier);
534             R newValue = ci.getValue(colFamily, weightQualifier, kv);
535             S s = ci.castToReturnType(newValue);
536             double newSumVal = movingSumVal + ci.divideForAvg(s, 1L);
537             // see if we have moved past the median
538             if (newSumVal > halfSumVal) {
539               return value;
540             }
541             movingSumVal = newSumVal;
542             kv = r.getColumnLatest(colFamily, qualifier);
543             value = ci.getValue(colFamily, qualifier, kv);
544           }
545         }
546       } while (results != null && results.length > 0);
547     } finally {
548       if (scanner != null) {
549         scanner.close();
550       }
551       if (table != null) {
552         table.close();
553       }
554     }
555     return null;
556   }
557 }