View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.coprocessor;
20  
21  import java.io.IOException;
22  import java.nio.ByteBuffer;
23  import java.util.ArrayList;
24  import java.util.List;
25  import java.util.NavigableSet;
26  
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.hbase.Cell;
30  import org.apache.hadoop.hbase.Coprocessor;
31  import org.apache.hadoop.hbase.CoprocessorEnvironment;
32  import org.apache.hadoop.hbase.classification.InterfaceAudience;
33  import org.apache.hadoop.hbase.client.Scan;
34  import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
35  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
36  import org.apache.hadoop.hbase.protobuf.ResponseConverter;
37  import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateRequest;
38  import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateResponse;
39  import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateService;
40  import org.apache.hadoop.hbase.regionserver.InternalScanner;
41  
42  import com.google.protobuf.ByteString;
43  import com.google.protobuf.Message;
44  import com.google.protobuf.RpcCallback;
45  import com.google.protobuf.RpcController;
46  import com.google.protobuf.Service;
47  
48  /**
49   * A concrete AggregateProtocol implementation. Its system level coprocessor
50   * that computes the aggregate function at a region level.
51   * {@link ColumnInterpreter} is used to interpret column value. This class is
52   * parameterized with the following (these are the types with which the {@link ColumnInterpreter}
53   * is parameterized, and for more description on these, refer to {@link ColumnInterpreter}):
54   * @param T Cell value data type
55   * @param S Promoted data type
56   * @param P PB message that is used to transport initializer specific bytes
57   * @param Q PB message that is used to transport Cell (<T>) instance
58   * @param R PB message that is used to transport Promoted (<S>) instance
59   */
60  @InterfaceAudience.Private
61  public class AggregateImplementation<T, S, P extends Message, Q extends Message, R extends Message> 
62  extends AggregateService implements CoprocessorService, Coprocessor {
63    protected static final Log log = LogFactory.getLog(AggregateImplementation.class);
64    private RegionCoprocessorEnvironment env;
65  
66    /**
67     * Gives the maximum for a given combination of column qualifier and column
68     * family, in the given row range as defined in the Scan object. In its
69     * current implementation, it takes one column family and one column qualifier
70     * (if provided). In case of null column qualifier, maximum value for the
71     * entire column family will be returned.
72     */
73    @Override
74    public void getMax(RpcController controller, AggregateRequest request,
75        RpcCallback<AggregateResponse> done) {
76      InternalScanner scanner = null;
77      AggregateResponse response = null;
78      T max = null;
79      try {
80        ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request);
81        T temp;
82        Scan scan = ProtobufUtil.toScan(request.getScan());
83        scanner = env.getRegion().getScanner(scan);
84        List<Cell> results = new ArrayList<Cell>();
85        byte[] colFamily = scan.getFamilies()[0];
86        NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(colFamily);
87        byte[] qualifier = null;
88        if (qualifiers != null && !qualifiers.isEmpty()) {
89          qualifier = qualifiers.pollFirst();
90        }
91        // qualifier can be null.
92        boolean hasMoreRows = false;
93        do {
94          hasMoreRows = scanner.next(results);
95          int listSize = results.size();
96          for (int i = 0; i < listSize; i++) {
97            temp = ci.getValue(colFamily, qualifier, results.get(i));
98            max = (max == null || (temp != null && ci.compare(temp, max) > 0)) ? temp : max;
99          }
100         results.clear();
101       } while (hasMoreRows);
102       if (max != null) {
103         AggregateResponse.Builder builder = AggregateResponse.newBuilder();
104         builder.addFirstPart(ci.getProtoForCellType(max).toByteString());
105         response = builder.build();
106       }
107     } catch (IOException e) {
108       ResponseConverter.setControllerException(controller, e);
109     } finally {
110       if (scanner != null) {
111         try {
112           scanner.close();
113         } catch (IOException ignored) {}
114       }
115     }
116     log.info("Maximum from this region is "
117         + env.getRegion().getRegionInfo().getRegionNameAsString() + ": " + max);
118     done.run(response);
119   }
120 
121   /**
122    * Gives the minimum for a given combination of column qualifier and column
123    * family, in the given row range as defined in the Scan object. In its
124    * current implementation, it takes one column family and one column qualifier
125    * (if provided). In case of null column qualifier, minimum value for the
126    * entire column family will be returned.
127    */
128   @Override
129   public void getMin(RpcController controller, AggregateRequest request,
130       RpcCallback<AggregateResponse> done) {
131     AggregateResponse response = null;
132     InternalScanner scanner = null;
133     T min = null;
134     try {
135       ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request);
136       T temp;
137       Scan scan = ProtobufUtil.toScan(request.getScan());
138       scanner = env.getRegion().getScanner(scan);
139       List<Cell> results = new ArrayList<Cell>();
140       byte[] colFamily = scan.getFamilies()[0];
141       NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(colFamily);
142       byte[] qualifier = null;
143       if (qualifiers != null && !qualifiers.isEmpty()) {
144         qualifier = qualifiers.pollFirst();
145       }
146       boolean hasMoreRows = false;
147       do {
148         hasMoreRows = scanner.next(results);
149         int listSize = results.size();
150         for (int i = 0; i < listSize; i++) {
151           temp = ci.getValue(colFamily, qualifier, results.get(i));
152           min = (min == null || (temp != null && ci.compare(temp, min) < 0)) ? temp : min;
153         }
154         results.clear();
155       } while (hasMoreRows);
156       if (min != null) {
157         response = AggregateResponse.newBuilder().addFirstPart( 
158           ci.getProtoForCellType(min).toByteString()).build();
159       }
160     } catch (IOException e) {
161       ResponseConverter.setControllerException(controller, e);
162     } finally {
163       if (scanner != null) {
164         try {
165           scanner.close();
166         } catch (IOException ignored) {}
167       }
168     }
169     log.info("Minimum from this region is "
170         + env.getRegion().getRegionInfo().getRegionNameAsString() + ": " + min);
171     done.run(response);
172   }
173 
174   /**
175    * Gives the sum for a given combination of column qualifier and column
176    * family, in the given row range as defined in the Scan object. In its
177    * current implementation, it takes one column family and one column qualifier
178    * (if provided). In case of null column qualifier, sum for the entire column
179    * family will be returned.
180    */
181   @Override
182   public void getSum(RpcController controller, AggregateRequest request,
183       RpcCallback<AggregateResponse> done) {
184     AggregateResponse response = null;
185     InternalScanner scanner = null;
186     long sum = 0l;
187     try {
188       ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request);
189       S sumVal = null;
190       T temp;
191       Scan scan = ProtobufUtil.toScan(request.getScan());
192       scanner = env.getRegion().getScanner(scan);
193       byte[] colFamily = scan.getFamilies()[0];
194       NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(colFamily);
195       byte[] qualifier = null;
196       if (qualifiers != null && !qualifiers.isEmpty()) {
197         qualifier = qualifiers.pollFirst();
198       }
199       List<Cell> results = new ArrayList<Cell>();
200       boolean hasMoreRows = false;
201       do {
202         hasMoreRows = scanner.next(results);
203         int listSize = results.size();
204         for (int i = 0; i < listSize; i++) {
205           temp = ci.getValue(colFamily, qualifier, results.get(i));
206           if (temp != null)
207             sumVal = ci.add(sumVal, ci.castToReturnType(temp));
208         }
209         results.clear();
210       } while (hasMoreRows);
211       if (sumVal != null) {
212         response = AggregateResponse.newBuilder().addFirstPart( 
213           ci.getProtoForPromotedType(sumVal).toByteString()).build();
214       }
215     } catch (IOException e) {
216       ResponseConverter.setControllerException(controller, e);
217     } finally {
218       if (scanner != null) {
219         try {
220           scanner.close();
221         } catch (IOException ignored) {}
222       }
223     }
224     log.debug("Sum from this region is "
225         + env.getRegion().getRegionInfo().getRegionNameAsString() + ": " + sum);
226     done.run(response);
227   }
228 
229   /**
230    * Gives the row count for the given column family and column qualifier, in
231    * the given row range as defined in the Scan object.
232    */
233   @Override
234   public void getRowNum(RpcController controller, AggregateRequest request,
235       RpcCallback<AggregateResponse> done) {
236     AggregateResponse response = null;
237     long counter = 0l;
238     List<Cell> results = new ArrayList<Cell>();
239     InternalScanner scanner = null;
240     try {
241       Scan scan = ProtobufUtil.toScan(request.getScan());
242       byte[][] colFamilies = scan.getFamilies();
243       byte[] colFamily = colFamilies != null ? colFamilies[0] : null;
244       NavigableSet<byte[]> qualifiers = colFamilies != null ?
245           scan.getFamilyMap().get(colFamily) : null;
246       byte[] qualifier = null;
247       if (qualifiers != null && !qualifiers.isEmpty()) {
248         qualifier = qualifiers.pollFirst();
249       }
250       if (scan.getFilter() == null && qualifier == null)
251         scan.setFilter(new FirstKeyOnlyFilter());
252       scanner = env.getRegion().getScanner(scan);
253       boolean hasMoreRows = false;
254       do {
255         hasMoreRows = scanner.next(results);
256         if (results.size() > 0) {
257           counter++;
258         }
259         results.clear();
260       } while (hasMoreRows);
261       ByteBuffer bb = ByteBuffer.allocate(8).putLong(counter);
262       bb.rewind();
263       response = AggregateResponse.newBuilder().addFirstPart( 
264           ByteString.copyFrom(bb)).build();
265     } catch (IOException e) {
266       ResponseConverter.setControllerException(controller, e);
267     } finally {
268       if (scanner != null) {
269         try {
270           scanner.close();
271         } catch (IOException ignored) {}
272       }
273     }
274     log.info("Row counter from this region is "
275         + env.getRegion().getRegionInfo().getRegionNameAsString() + ": " + counter);
276     done.run(response);
277   }
278 
279   /**
280    * Gives a Pair with first object as Sum and second object as row count,
281    * computed for a given combination of column qualifier and column family in
282    * the given row range as defined in the Scan object. In its current
283    * implementation, it takes one column family and one column qualifier (if
284    * provided). In case of null column qualifier, an aggregate sum over all the
285    * entire column family will be returned.
286    * <p>
287    * The average is computed in
288    * AggregationClient#avg(byte[], ColumnInterpreter, Scan) by
289    * processing results from all regions, so its "ok" to pass sum and a Long
290    * type.
291    */
292   @Override
293   public void getAvg(RpcController controller, AggregateRequest request,
294       RpcCallback<AggregateResponse> done) {
295     AggregateResponse response = null;
296     InternalScanner scanner = null;
297     try {
298       ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request);
299       S sumVal = null;
300       Long rowCountVal = 0l;
301       Scan scan = ProtobufUtil.toScan(request.getScan());
302       scanner = env.getRegion().getScanner(scan);
303       byte[] colFamily = scan.getFamilies()[0];
304       NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(colFamily);
305       byte[] qualifier = null;
306       if (qualifiers != null && !qualifiers.isEmpty()) {
307         qualifier = qualifiers.pollFirst();
308       }
309       List<Cell> results = new ArrayList<Cell>();
310       boolean hasMoreRows = false;
311     
312       do {
313         results.clear();
314         hasMoreRows = scanner.next(results);
315         int listSize = results.size();
316         for (int i = 0; i < listSize; i++) {
317           sumVal = ci.add(sumVal, ci.castToReturnType(ci.getValue(colFamily,
318               qualifier, results.get(i))));
319         }
320         rowCountVal++;
321       } while (hasMoreRows);
322       if (sumVal != null) {
323         ByteString first = ci.getProtoForPromotedType(sumVal).toByteString();
324         AggregateResponse.Builder pair = AggregateResponse.newBuilder();
325         pair.addFirstPart(first);
326         ByteBuffer bb = ByteBuffer.allocate(8).putLong(rowCountVal);
327         bb.rewind();
328         pair.setSecondPart(ByteString.copyFrom(bb));
329         response = pair.build();
330       }
331     } catch (IOException e) {
332       ResponseConverter.setControllerException(controller, e);
333     } finally {
334       if (scanner != null) {
335         try {
336           scanner.close();
337         } catch (IOException ignored) {}
338       }
339     }
340     done.run(response);
341   }
342 
343   /**
344    * Gives a Pair with first object a List containing Sum and sum of squares,
345    * and the second object as row count. It is computed for a given combination of
346    * column qualifier and column family in the given row range as defined in the
347    * Scan object. In its current implementation, it takes one column family and
348    * one column qualifier (if provided). The idea is get the value of variance first:
349    * the average of the squares less the square of the average a standard
350    * deviation is square root of variance.
351    */
352   @Override
353   public void getStd(RpcController controller, AggregateRequest request,
354       RpcCallback<AggregateResponse> done) {
355     InternalScanner scanner = null;
356     AggregateResponse response = null;
357     try {
358       ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request);
359       S sumVal = null, sumSqVal = null, tempVal = null;
360       long rowCountVal = 0l;
361       Scan scan = ProtobufUtil.toScan(request.getScan());
362       scanner = env.getRegion().getScanner(scan);
363       byte[] colFamily = scan.getFamilies()[0];
364       NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(colFamily);
365       byte[] qualifier = null;
366       if (qualifiers != null && !qualifiers.isEmpty()) {
367         qualifier = qualifiers.pollFirst();
368       }
369       List<Cell> results = new ArrayList<Cell>();
370 
371       boolean hasMoreRows = false;
372     
373       do {
374         tempVal = null;
375         hasMoreRows = scanner.next(results);
376         int listSize = results.size();
377         for (int i = 0; i < listSize; i++) {
378           tempVal = ci.add(tempVal, ci.castToReturnType(ci.getValue(colFamily,
379               qualifier, results.get(i))));
380         }
381         results.clear();
382         sumVal = ci.add(sumVal, tempVal);
383         sumSqVal = ci.add(sumSqVal, ci.multiply(tempVal, tempVal));
384         rowCountVal++;
385       } while (hasMoreRows);
386       if (sumVal != null) {
387         ByteString first_sumVal = ci.getProtoForPromotedType(sumVal).toByteString();
388         ByteString first_sumSqVal = ci.getProtoForPromotedType(sumSqVal).toByteString();
389         AggregateResponse.Builder pair = AggregateResponse.newBuilder();
390         pair.addFirstPart(first_sumVal);
391         pair.addFirstPart(first_sumSqVal);
392         ByteBuffer bb = ByteBuffer.allocate(8).putLong(rowCountVal);
393         bb.rewind();
394         pair.setSecondPart(ByteString.copyFrom(bb));
395         response = pair.build();
396       }
397     } catch (IOException e) {
398       ResponseConverter.setControllerException(controller, e);
399     } finally {
400       if (scanner != null) {
401         try {
402           scanner.close();
403         } catch (IOException ignored) {}
404       }
405     }
406     done.run(response);
407   }
408 
409   /**
410    * Gives a List containing sum of values and sum of weights.
411    * It is computed for the combination of column
412    * family and column qualifier(s) in the given row range as defined in the
413    * Scan object. In its current implementation, it takes one column family and
414    * two column qualifiers. The first qualifier is for values column and 
415    * the second qualifier (optional) is for weight column.
416    */
417   @Override
418   public void getMedian(RpcController controller, AggregateRequest request,
419       RpcCallback<AggregateResponse> done) {
420     AggregateResponse response = null;
421     InternalScanner scanner = null;
422     try {
423       ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request);
424       S sumVal = null, sumWeights = null, tempVal = null, tempWeight = null;
425       Scan scan = ProtobufUtil.toScan(request.getScan());
426       scanner = env.getRegion().getScanner(scan);
427       byte[] colFamily = scan.getFamilies()[0];
428       NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(colFamily);
429       byte[] valQualifier = null, weightQualifier = null;
430       if (qualifiers != null && !qualifiers.isEmpty()) {
431         valQualifier = qualifiers.pollFirst();
432         // if weighted median is requested, get qualifier for the weight column
433         weightQualifier = qualifiers.pollLast();
434       }
435       List<Cell> results = new ArrayList<Cell>();
436 
437       boolean hasMoreRows = false;
438     
439       do {
440         tempVal = null;
441         tempWeight = null;
442         hasMoreRows = scanner.next(results);
443         int listSize = results.size();
444         for (int i = 0; i < listSize; i++) {
445           Cell kv = results.get(i);
446           tempVal = ci.add(tempVal, ci.castToReturnType(ci.getValue(colFamily,
447               valQualifier, kv)));
448           if (weightQualifier != null) {
449             tempWeight = ci.add(tempWeight,
450                 ci.castToReturnType(ci.getValue(colFamily, weightQualifier, kv)));
451           }
452         }
453         results.clear();
454         sumVal = ci.add(sumVal, tempVal);
455         sumWeights = ci.add(sumWeights, tempWeight);
456       } while (hasMoreRows);
457       ByteString first_sumVal = ci.getProtoForPromotedType(sumVal).toByteString();
458       S s = sumWeights == null ? ci.castToReturnType(ci.getMinValue()) : sumWeights;
459       ByteString first_sumWeights = ci.getProtoForPromotedType(s).toByteString();
460       AggregateResponse.Builder pair = AggregateResponse.newBuilder();
461       pair.addFirstPart(first_sumVal);
462       pair.addFirstPart(first_sumWeights); 
463       response = pair.build();
464     } catch (IOException e) {
465       ResponseConverter.setControllerException(controller, e);
466     } finally {
467       if (scanner != null) {
468         try {
469           scanner.close();
470         } catch (IOException ignored) {}
471       }
472     }
473     done.run(response);
474   }
475 
476   @SuppressWarnings("unchecked")
477   ColumnInterpreter<T,S,P,Q,R> constructColumnInterpreterFromRequest(
478       AggregateRequest request) throws IOException {
479     String className = request.getInterpreterClassName();
480     Class<?> cls;
481     try {
482       cls = Class.forName(className);
483       ColumnInterpreter<T,S,P,Q,R> ci = (ColumnInterpreter<T, S, P, Q, R>) cls.newInstance();
484       if (request.hasInterpreterSpecificBytes()) {
485         ByteString b = request.getInterpreterSpecificBytes();
486         P initMsg = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 2, b);
487         ci.initialize(initMsg);
488       }
489       return ci;
490     } catch (ClassNotFoundException e) {
491       throw new IOException(e);
492     } catch (InstantiationException e) {
493       throw new IOException(e);
494     } catch (IllegalAccessException e) {
495       throw new IOException(e);
496     }
497   }
498 
499   @Override
500   public Service getService() {
501     return this;
502   }
503 
504   /**
505    * Stores a reference to the coprocessor environment provided by the
506    * {@link org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost} from the region where this
507    * coprocessor is loaded.  Since this is a coprocessor endpoint, it always expects to be loaded
508    * on a table region, so always expects this to be an instance of
509    * {@link RegionCoprocessorEnvironment}.
510    * @param env the environment provided by the coprocessor host
511    * @throws IOException if the provided environment is not an instance of
512    * {@code RegionCoprocessorEnvironment}
513    */
514   @Override
515   public void start(CoprocessorEnvironment env) throws IOException {
516     if (env instanceof RegionCoprocessorEnvironment) {
517       this.env = (RegionCoprocessorEnvironment)env;
518     } else {
519       throw new CoprocessorException("Must be loaded on a table region!");
520     }
521   }
522 
523   @Override
524   public void stop(CoprocessorEnvironment env) throws IOException {
525     // nothing to do
526   }
527   
528 }