View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.coprocessor;
20  
21  import java.io.IOException;
22  import java.nio.ByteBuffer;
23  import java.util.ArrayList;
24  import java.util.List;
25  import java.util.NavigableSet;
26  
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.hbase.Cell;
30  import org.apache.hadoop.hbase.Coprocessor;
31  import org.apache.hadoop.hbase.CoprocessorEnvironment;
32  import org.apache.hadoop.hbase.classification.InterfaceAudience;
33  import org.apache.hadoop.hbase.client.Scan;
34  import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
35  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
36  import org.apache.hadoop.hbase.protobuf.ResponseConverter;
37  import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateRequest;
38  import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateResponse;
39  import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateService;
40  import org.apache.hadoop.hbase.regionserver.InternalScanner;
41  import org.apache.hadoop.hbase.regionserver.InternalScanner.NextState;
42  
43  import com.google.protobuf.ByteString;
44  import com.google.protobuf.Message;
45  import com.google.protobuf.RpcCallback;
46  import com.google.protobuf.RpcController;
47  import com.google.protobuf.Service;
48  
49  /**
50   * A concrete AggregateProtocol implementation. Its system level coprocessor
51   * that computes the aggregate function at a region level.
52   * {@link ColumnInterpreter} is used to interpret column value. This class is
53   * parameterized with the following (these are the types with which the {@link ColumnInterpreter}
54   * is parameterized, and for more description on these, refer to {@link ColumnInterpreter}):
55   * @param <T> Cell value data type
56   * @param <S> Promoted data type
57   * @param <P> PB message that is used to transport initializer specific bytes
58   * @param <Q> PB message that is used to transport Cell (<T>) instance
59   * @param <R> PB message that is used to transport Promoted (<S>) instance
60   */
61  @InterfaceAudience.Private
62  public class AggregateImplementation<T, S, P extends Message, Q extends Message, R extends Message> 
63  extends AggregateService implements CoprocessorService, Coprocessor {
64    protected static final Log log = LogFactory.getLog(AggregateImplementation.class);
65    private RegionCoprocessorEnvironment env;
66  
67    /**
68     * Gives the maximum for a given combination of column qualifier and column
69     * family, in the given row range as defined in the Scan object. In its
70     * current implementation, it takes one column family and one column qualifier
71     * (if provided). In case of null column qualifier, maximum value for the
72     * entire column family will be returned.
73     */
74    @Override
75    public void getMax(RpcController controller, AggregateRequest request,
76        RpcCallback<AggregateResponse> done) {
77      InternalScanner scanner = null;
78      AggregateResponse response = null;
79      T max = null;
80      try {
81        ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request);
82        T temp;
83        Scan scan = ProtobufUtil.toScan(request.getScan());
84        scanner = env.getRegion().getScanner(scan);
85        List<Cell> results = new ArrayList<Cell>();
86        byte[] colFamily = scan.getFamilies()[0];
87        NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(colFamily);
88        byte[] qualifier = null;
89        if (qualifiers != null && !qualifiers.isEmpty()) {
90          qualifier = qualifiers.pollFirst();
91        }
92        // qualifier can be null.
93        boolean hasMoreRows = false;
94        do {
95          hasMoreRows = NextState.hasMoreValues(scanner.next(results));
96          int listSize = results.size();
97          for (int i = 0; i < listSize; i++) {
98            temp = ci.getValue(colFamily, qualifier, results.get(i));
99            max = (max == null || (temp != null && ci.compare(temp, max) > 0)) ? temp : max;
100         }
101         results.clear();
102       } while (hasMoreRows);
103       if (max != null) {
104         AggregateResponse.Builder builder = AggregateResponse.newBuilder();
105         builder.addFirstPart(ci.getProtoForCellType(max).toByteString());
106         response = builder.build();
107       }
108     } catch (IOException e) {
109       ResponseConverter.setControllerException(controller, e);
110     } finally {
111       if (scanner != null) {
112         try {
113           scanner.close();
114         } catch (IOException ignored) {}
115       }
116     }
117     log.info("Maximum from this region is "
118         + env.getRegion().getRegionNameAsString() + ": " + max);
119     done.run(response);
120   }
121 
122   /**
123    * Gives the minimum for a given combination of column qualifier and column
124    * family, in the given row range as defined in the Scan object. In its
125    * current implementation, it takes one column family and one column qualifier
126    * (if provided). In case of null column qualifier, minimum value for the
127    * entire column family will be returned.
128    */
129   @Override
130   public void getMin(RpcController controller, AggregateRequest request,
131       RpcCallback<AggregateResponse> done) {
132     AggregateResponse response = null;
133     InternalScanner scanner = null;
134     T min = null;
135     try {
136       ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request);
137       T temp;
138       Scan scan = ProtobufUtil.toScan(request.getScan());
139       scanner = env.getRegion().getScanner(scan);
140       List<Cell> results = new ArrayList<Cell>();
141       byte[] colFamily = scan.getFamilies()[0];
142       NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(colFamily);
143       byte[] qualifier = null;
144       if (qualifiers != null && !qualifiers.isEmpty()) {
145         qualifier = qualifiers.pollFirst();
146       }
147       boolean hasMoreRows = false;
148       do {
149         hasMoreRows = NextState.hasMoreValues(scanner.next(results));
150         int listSize = results.size();
151         for (int i = 0; i < listSize; i++) {
152           temp = ci.getValue(colFamily, qualifier, results.get(i));
153           min = (min == null || (temp != null && ci.compare(temp, min) < 0)) ? temp : min;
154         }
155         results.clear();
156       } while (hasMoreRows);
157       if (min != null) {
158         response = AggregateResponse.newBuilder().addFirstPart( 
159           ci.getProtoForCellType(min).toByteString()).build();
160       }
161     } catch (IOException e) {
162       ResponseConverter.setControllerException(controller, e);
163     } finally {
164       if (scanner != null) {
165         try {
166           scanner.close();
167         } catch (IOException ignored) {}
168       }
169     }
170     log.info("Minimum from this region is "
171         + env.getRegion().getRegionNameAsString() + ": " + min);
172     done.run(response);
173   }
174 
175   /**
176    * Gives the sum for a given combination of column qualifier and column
177    * family, in the given row range as defined in the Scan object. In its
178    * current implementation, it takes one column family and one column qualifier
179    * (if provided). In case of null column qualifier, sum for the entire column
180    * family will be returned.
181    */
182   @Override
183   public void getSum(RpcController controller, AggregateRequest request,
184       RpcCallback<AggregateResponse> done) {
185     AggregateResponse response = null;
186     InternalScanner scanner = null;
187     long sum = 0l;
188     try {
189       ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request);
190       S sumVal = null;
191       T temp;
192       Scan scan = ProtobufUtil.toScan(request.getScan());
193       scanner = env.getRegion().getScanner(scan);
194       byte[] colFamily = scan.getFamilies()[0];
195       NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(colFamily);
196       byte[] qualifier = null;
197       if (qualifiers != null && !qualifiers.isEmpty()) {
198         qualifier = qualifiers.pollFirst();
199       }
200       List<Cell> results = new ArrayList<Cell>();
201       boolean hasMoreRows = false;
202       do {
203         hasMoreRows = NextState.hasMoreValues(scanner.next(results));
204         int listSize = results.size();
205         for (int i = 0; i < listSize; i++) {
206           temp = ci.getValue(colFamily, qualifier, results.get(i));
207           if (temp != null)
208             sumVal = ci.add(sumVal, ci.castToReturnType(temp));
209         }
210         results.clear();
211       } while (hasMoreRows);
212       if (sumVal != null) {
213         response = AggregateResponse.newBuilder().addFirstPart( 
214           ci.getProtoForPromotedType(sumVal).toByteString()).build();
215       }
216     } catch (IOException e) {
217       ResponseConverter.setControllerException(controller, e);
218     } finally {
219       if (scanner != null) {
220         try {
221           scanner.close();
222         } catch (IOException ignored) {}
223       }
224     }
225     log.debug("Sum from this region is "
226         + env.getRegion().getRegionNameAsString() + ": " + sum);
227     done.run(response);
228   }
229 
230   /**
231    * Gives the row count for the given column family and column qualifier, in
232    * the given row range as defined in the Scan object.
233    * @throws IOException
234    */
235   @Override
236   public void getRowNum(RpcController controller, AggregateRequest request,
237       RpcCallback<AggregateResponse> done) {
238     AggregateResponse response = null;
239     long counter = 0l;
240     List<Cell> results = new ArrayList<Cell>();
241     InternalScanner scanner = null;
242     try {
243       Scan scan = ProtobufUtil.toScan(request.getScan());
244       byte[][] colFamilies = scan.getFamilies();
245       byte[] colFamily = colFamilies != null ? colFamilies[0] : null;
246       NavigableSet<byte[]> qualifiers = colFamilies != null ?
247           scan.getFamilyMap().get(colFamily) : null;
248       byte[] qualifier = null;
249       if (qualifiers != null && !qualifiers.isEmpty()) {
250         qualifier = qualifiers.pollFirst();
251       }
252       if (scan.getFilter() == null && qualifier == null)
253         scan.setFilter(new FirstKeyOnlyFilter());
254       scanner = env.getRegion().getScanner(scan);
255       boolean hasMoreRows = false;
256       do {
257         hasMoreRows = NextState.hasMoreValues(scanner.next(results));
258         if (results.size() > 0) {
259           counter++;
260         }
261         results.clear();
262       } while (hasMoreRows);
263       ByteBuffer bb = ByteBuffer.allocate(8).putLong(counter);
264       bb.rewind();
265       response = AggregateResponse.newBuilder().addFirstPart( 
266           ByteString.copyFrom(bb)).build();
267     } catch (IOException e) {
268       ResponseConverter.setControllerException(controller, e);
269     } finally {
270       if (scanner != null) {
271         try {
272           scanner.close();
273         } catch (IOException ignored) {}
274       }
275     }
276     log.info("Row counter from this region is "
277         + env.getRegion().getRegionNameAsString() + ": " + counter);
278     done.run(response);
279   }
280 
281   /**
282    * Gives a Pair with first object as Sum and second object as row count,
283    * computed for a given combination of column qualifier and column family in
284    * the given row range as defined in the Scan object. In its current
285    * implementation, it takes one column family and one column qualifier (if
286    * provided). In case of null column qualifier, an aggregate sum over all the
287    * entire column family will be returned.
288    * <p>
289    * The average is computed in
290    * AggregationClient#avg(byte[], ColumnInterpreter, Scan) by
291    * processing results from all regions, so its "ok" to pass sum and a Long
292    * type.
293    */
294   @Override
295   public void getAvg(RpcController controller, AggregateRequest request,
296       RpcCallback<AggregateResponse> done) {
297     AggregateResponse response = null;
298     InternalScanner scanner = null;
299     try {
300       ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request);
301       S sumVal = null;
302       Long rowCountVal = 0l;
303       Scan scan = ProtobufUtil.toScan(request.getScan());
304       scanner = env.getRegion().getScanner(scan);
305       byte[] colFamily = scan.getFamilies()[0];
306       NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(colFamily);
307       byte[] qualifier = null;
308       if (qualifiers != null && !qualifiers.isEmpty()) {
309         qualifier = qualifiers.pollFirst();
310       }
311       List<Cell> results = new ArrayList<Cell>();
312       boolean hasMoreRows = false;
313     
314       do {
315         results.clear();
316         hasMoreRows = NextState.hasMoreValues(scanner.next(results));
317         int listSize = results.size();
318         for (int i = 0; i < listSize; i++) {
319           sumVal = ci.add(sumVal, ci.castToReturnType(ci.getValue(colFamily,
320               qualifier, results.get(i))));
321         }
322         rowCountVal++;
323       } while (hasMoreRows);
324       if (sumVal != null) {
325         ByteString first = ci.getProtoForPromotedType(sumVal).toByteString();
326         AggregateResponse.Builder pair = AggregateResponse.newBuilder();
327         pair.addFirstPart(first);
328         ByteBuffer bb = ByteBuffer.allocate(8).putLong(rowCountVal);
329         bb.rewind();
330         pair.setSecondPart(ByteString.copyFrom(bb));
331         response = pair.build();
332       }
333     } catch (IOException e) {
334       ResponseConverter.setControllerException(controller, e);
335     } finally {
336       if (scanner != null) {
337         try {
338           scanner.close();
339         } catch (IOException ignored) {}
340       }
341     }
342     done.run(response);
343   }
344 
345   /**
346    * Gives a Pair with first object a List containing Sum and sum of squares,
347    * and the second object as row count. It is computed for a given combination of
348    * column qualifier and column family in the given row range as defined in the
349    * Scan object. In its current implementation, it takes one column family and
350    * one column qualifier (if provided). The idea is get the value of variance first:
351    * the average of the squares less the square of the average a standard
352    * deviation is square root of variance.
353    */
354   @Override
355   public void getStd(RpcController controller, AggregateRequest request,
356       RpcCallback<AggregateResponse> done) {
357     InternalScanner scanner = null;
358     AggregateResponse response = null;
359     try {
360       ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request);
361       S sumVal = null, sumSqVal = null, tempVal = null;
362       long rowCountVal = 0l;
363       Scan scan = ProtobufUtil.toScan(request.getScan());
364       scanner = env.getRegion().getScanner(scan);
365       byte[] colFamily = scan.getFamilies()[0];
366       NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(colFamily);
367       byte[] qualifier = null;
368       if (qualifiers != null && !qualifiers.isEmpty()) {
369         qualifier = qualifiers.pollFirst();
370       }
371       List<Cell> results = new ArrayList<Cell>();
372 
373       boolean hasMoreRows = false;
374     
375       do {
376         tempVal = null;
377         hasMoreRows = NextState.hasMoreValues(scanner.next(results));
378         int listSize = results.size();
379         for (int i = 0; i < listSize; i++) {
380           tempVal = ci.add(tempVal, ci.castToReturnType(ci.getValue(colFamily,
381               qualifier, results.get(i))));
382         }
383         results.clear();
384         sumVal = ci.add(sumVal, tempVal);
385         sumSqVal = ci.add(sumSqVal, ci.multiply(tempVal, tempVal));
386         rowCountVal++;
387       } while (hasMoreRows);
388       if (sumVal != null) {
389         ByteString first_sumVal = ci.getProtoForPromotedType(sumVal).toByteString();
390         ByteString first_sumSqVal = ci.getProtoForPromotedType(sumSqVal).toByteString();
391         AggregateResponse.Builder pair = AggregateResponse.newBuilder();
392         pair.addFirstPart(first_sumVal);
393         pair.addFirstPart(first_sumSqVal);
394         ByteBuffer bb = ByteBuffer.allocate(8).putLong(rowCountVal);
395         bb.rewind();
396         pair.setSecondPart(ByteString.copyFrom(bb));
397         response = pair.build();
398       }
399     } catch (IOException e) {
400       ResponseConverter.setControllerException(controller, e);
401     } finally {
402       if (scanner != null) {
403         try {
404           scanner.close();
405         } catch (IOException ignored) {}
406       }
407     }
408     done.run(response);
409   }
410 
411   /**
412    * Gives a List containing sum of values and sum of weights.
413    * It is computed for the combination of column
414    * family and column qualifier(s) in the given row range as defined in the
415    * Scan object. In its current implementation, it takes one column family and
416    * two column qualifiers. The first qualifier is for values column and 
417    * the second qualifier (optional) is for weight column.
418    */
419   @Override
420   public void getMedian(RpcController controller, AggregateRequest request,
421       RpcCallback<AggregateResponse> done) {
422     AggregateResponse response = null;
423     InternalScanner scanner = null;
424     try {
425       ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request);
426       S sumVal = null, sumWeights = null, tempVal = null, tempWeight = null;
427       Scan scan = ProtobufUtil.toScan(request.getScan());
428       scanner = env.getRegion().getScanner(scan);
429       byte[] colFamily = scan.getFamilies()[0];
430       NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(colFamily);
431       byte[] valQualifier = null, weightQualifier = null;
432       if (qualifiers != null && !qualifiers.isEmpty()) {
433         valQualifier = qualifiers.pollFirst();
434         // if weighted median is requested, get qualifier for the weight column
435         weightQualifier = qualifiers.pollLast();
436       }
437       List<Cell> results = new ArrayList<Cell>();
438 
439       boolean hasMoreRows = false;
440     
441       do {
442         tempVal = null;
443         tempWeight = null;
444         hasMoreRows = NextState.hasMoreValues(scanner.next(results));
445         int listSize = results.size();
446         for (int i = 0; i < listSize; i++) {
447           Cell kv = results.get(i);
448           tempVal = ci.add(tempVal, ci.castToReturnType(ci.getValue(colFamily,
449               valQualifier, kv)));
450           if (weightQualifier != null) {
451             tempWeight = ci.add(tempWeight,
452                 ci.castToReturnType(ci.getValue(colFamily, weightQualifier, kv)));
453           }
454         }
455         results.clear();
456         sumVal = ci.add(sumVal, tempVal);
457         sumWeights = ci.add(sumWeights, tempWeight);
458       } while (hasMoreRows);
459       ByteString first_sumVal = ci.getProtoForPromotedType(sumVal).toByteString();
460       S s = sumWeights == null ? ci.castToReturnType(ci.getMinValue()) : sumWeights;
461       ByteString first_sumWeights = ci.getProtoForPromotedType(s).toByteString();
462       AggregateResponse.Builder pair = AggregateResponse.newBuilder();
463       pair.addFirstPart(first_sumVal);
464       pair.addFirstPart(first_sumWeights); 
465       response = pair.build();
466     } catch (IOException e) {
467       ResponseConverter.setControllerException(controller, e);
468     } finally {
469       if (scanner != null) {
470         try {
471           scanner.close();
472         } catch (IOException ignored) {}
473       }
474     }
475     done.run(response);
476   }
477 
478   @SuppressWarnings("unchecked")
479   ColumnInterpreter<T,S,P,Q,R> constructColumnInterpreterFromRequest(
480       AggregateRequest request) throws IOException {
481     String className = request.getInterpreterClassName();
482     Class<?> cls;
483     try {
484       cls = Class.forName(className);
485       ColumnInterpreter<T,S,P,Q,R> ci = (ColumnInterpreter<T, S, P, Q, R>) cls.newInstance();
486       if (request.hasInterpreterSpecificBytes()) {
487         ByteString b = request.getInterpreterSpecificBytes();
488         P initMsg = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 2, b);
489         ci.initialize(initMsg);
490       }
491       return ci;
492     } catch (ClassNotFoundException e) {
493       throw new IOException(e);
494     } catch (InstantiationException e) {
495       throw new IOException(e);
496     } catch (IllegalAccessException e) {
497       throw new IOException(e);
498     }
499   }
500 
501   @Override
502   public Service getService() {
503     return this;
504   }
505 
506   /**
507    * Stores a reference to the coprocessor environment provided by the
508    * {@link org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost} from the region where this
509    * coprocessor is loaded.  Since this is a coprocessor endpoint, it always expects to be loaded
510    * on a table region, so always expects this to be an instance of
511    * {@link RegionCoprocessorEnvironment}.
512    * @param env the environment provided by the coprocessor host
513    * @throws IOException if the provided environment is not an instance of
514    * {@code RegionCoprocessorEnvironment}
515    */
516   @Override
517   public void start(CoprocessorEnvironment env) throws IOException {
518     if (env instanceof RegionCoprocessorEnvironment) {
519       this.env = (RegionCoprocessorEnvironment)env;
520     } else {
521       throw new CoprocessorException("Must be loaded on a table region!");
522     }
523   }
524 
525   @Override
526   public void stop(CoprocessorEnvironment env) throws IOException {
527     // nothing to do
528   }
529   
530 }