View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.coprocessor;
20  
21  import java.io.IOException;
22  import java.nio.ByteBuffer;
23  import java.util.ArrayList;
24  import java.util.List;
25  import java.util.NavigableSet;
26  
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.classification.InterfaceAudience;
30  import org.apache.hadoop.hbase.Cell;
31  import org.apache.hadoop.hbase.Coprocessor;
32  import org.apache.hadoop.hbase.CoprocessorEnvironment;
33  import org.apache.hadoop.hbase.client.Scan;
34  import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
35  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
36  import org.apache.hadoop.hbase.protobuf.ResponseConverter;
37  import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateRequest;
38  import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateResponse;
39  import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateService;
40  import org.apache.hadoop.hbase.regionserver.InternalScanner;
41  
42  import com.google.protobuf.ByteString;
43  import com.google.protobuf.Message;
44  import com.google.protobuf.RpcCallback;
45  import com.google.protobuf.RpcController;
46  import com.google.protobuf.Service;
47  
48  /**
49   * A concrete AggregateProtocol implementation. Its system level coprocessor
50   * that computes the aggregate function at a region level.
51   * {@link ColumnInterpreter} is used to interpret column value. This class is
52   * parameterized with the following (these are the types with which the {@link ColumnInterpreter}
53   * is parameterized, and for more description on these, refer to {@link ColumnInterpreter}):
54   * @param <T> Cell value data type
55   * @param <S> Promoted data type
56   * @param <P> PB message that is used to transport initializer specific bytes
57   * @param <Q> PB message that is used to transport Cell (<T>) instance
58   * @param <R> PB message that is used to transport Promoted (<S>) instance
59   */
60  @InterfaceAudience.Private
61  public class AggregateImplementation<T, S, P extends Message, Q extends Message, R extends Message> 
62  extends AggregateService implements CoprocessorService, Coprocessor {
63    protected static final Log log = LogFactory.getLog(AggregateImplementation.class);
64    private RegionCoprocessorEnvironment env;
65  
66    /**
67     * Gives the maximum for a given combination of column qualifier and column
68     * family, in the given row range as defined in the Scan object. In its
69     * current implementation, it takes one column family and one column qualifier
70     * (if provided). In case of null column qualifier, maximum value for the
71     * entire column family will be returned.
72     */
73    @Override
74    public void getMax(RpcController controller, AggregateRequest request,
75        RpcCallback<AggregateResponse> done) {
76      InternalScanner scanner = null;
77      AggregateResponse response = null;
78      T max = null;
79      try {
80        ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request);
81        T temp;
82        Scan scan = ProtobufUtil.toScan(request.getScan());
83        scanner = env.getRegion().getScanner(scan);
84        List<Cell> results = new ArrayList<Cell>();
85        byte[] colFamily = scan.getFamilies()[0];
86        NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(colFamily);
87        byte[] qualifier = null;
88        if (qualifiers != null && !qualifiers.isEmpty()) {
89          qualifier = qualifiers.pollFirst();
90        }
91        // qualifier can be null.
92        boolean hasMoreRows = false;
93        do {
94          hasMoreRows = scanner.next(results);
95          for (Cell kv : results) {
96            temp = ci.getValue(colFamily, qualifier, kv);
97            max = (max == null || (temp != null && ci.compare(temp, max) > 0)) ? temp : max;
98          }
99          results.clear();
100       } while (hasMoreRows);
101       if (max != null) {
102         AggregateResponse.Builder builder = AggregateResponse.newBuilder();
103         builder.addFirstPart(ci.getProtoForCellType(max).toByteString());
104         response = builder.build();
105       }
106     } catch (IOException e) {
107       ResponseConverter.setControllerException(controller, e);
108     } finally {
109       if (scanner != null) {
110         try {
111           scanner.close();
112         } catch (IOException ignored) {}
113       }
114     }
115     log.info("Maximum from this region is "
116         + env.getRegion().getRegionNameAsString() + ": " + max);
117     done.run(response);
118   }
119 
120   /**
121    * Gives the minimum for a given combination of column qualifier and column
122    * family, in the given row range as defined in the Scan object. In its
123    * current implementation, it takes one column family and one column qualifier
124    * (if provided). In case of null column qualifier, minimum value for the
125    * entire column family will be returned.
126    */
127   @Override
128   public void getMin(RpcController controller, AggregateRequest request,
129       RpcCallback<AggregateResponse> done) {
130     AggregateResponse response = null;
131     InternalScanner scanner = null;
132     T min = null;
133     try {
134       ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request);
135       T temp;
136       Scan scan = ProtobufUtil.toScan(request.getScan());
137       scanner = env.getRegion().getScanner(scan);
138       List<Cell> results = new ArrayList<Cell>();
139       byte[] colFamily = scan.getFamilies()[0];
140       NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(colFamily);
141       byte[] qualifier = null;
142       if (qualifiers != null && !qualifiers.isEmpty()) {
143         qualifier = qualifiers.pollFirst();
144       }
145       boolean hasMoreRows = false;
146       do {
147         hasMoreRows = scanner.next(results);
148         for (Cell kv : results) {
149           temp = ci.getValue(colFamily, qualifier, kv);
150           min = (min == null || (temp != null && ci.compare(temp, min) < 0)) ? temp : min;
151         }
152         results.clear();
153       } while (hasMoreRows);
154       if (min != null) {
155         response = AggregateResponse.newBuilder().addFirstPart( 
156           ci.getProtoForCellType(min).toByteString()).build();
157       }
158     } catch (IOException e) {
159       ResponseConverter.setControllerException(controller, e);
160     } finally {
161       if (scanner != null) {
162         try {
163           scanner.close();
164         } catch (IOException ignored) {}
165       }
166     }
167     log.info("Minimum from this region is "
168         + env.getRegion().getRegionNameAsString() + ": " + min);
169     done.run(response);
170   }
171 
172   /**
173    * Gives the sum for a given combination of column qualifier and column
174    * family, in the given row range as defined in the Scan object. In its
175    * current implementation, it takes one column family and one column qualifier
176    * (if provided). In case of null column qualifier, sum for the entire column
177    * family will be returned.
178    */
179   @Override
180   public void getSum(RpcController controller, AggregateRequest request,
181       RpcCallback<AggregateResponse> done) {
182     AggregateResponse response = null;
183     InternalScanner scanner = null;
184     long sum = 0l;
185     try {
186       ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request);
187       S sumVal = null;
188       T temp;
189       Scan scan = ProtobufUtil.toScan(request.getScan());
190       scanner = env.getRegion().getScanner(scan);
191       byte[] colFamily = scan.getFamilies()[0];
192       NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(colFamily);
193       byte[] qualifier = null;
194       if (qualifiers != null && !qualifiers.isEmpty()) {
195         qualifier = qualifiers.pollFirst();
196       }
197       List<Cell> results = new ArrayList<Cell>();
198       boolean hasMoreRows = false;
199       do {
200         hasMoreRows = scanner.next(results);
201         for (Cell kv : results) {
202           temp = ci.getValue(colFamily, qualifier, kv);
203           if (temp != null)
204             sumVal = ci.add(sumVal, ci.castToReturnType(temp));
205         }
206         results.clear();
207       } while (hasMoreRows);
208       if (sumVal != null) {
209         response = AggregateResponse.newBuilder().addFirstPart( 
210           ci.getProtoForPromotedType(sumVal).toByteString()).build();
211       }
212     } catch (IOException e) {
213       ResponseConverter.setControllerException(controller, e);
214     } finally {
215       if (scanner != null) {
216         try {
217           scanner.close();
218         } catch (IOException ignored) {}
219       }
220     }
221     log.debug("Sum from this region is "
222         + env.getRegion().getRegionNameAsString() + ": " + sum);
223     done.run(response);
224   }
225 
226   /**
227    * Gives the row count for the given column family and column qualifier, in
228    * the given row range as defined in the Scan object.
229    * @throws IOException
230    */
231   @Override
232   public void getRowNum(RpcController controller, AggregateRequest request,
233       RpcCallback<AggregateResponse> done) {
234     AggregateResponse response = null;
235     long counter = 0l;
236     List<Cell> results = new ArrayList<Cell>();
237     InternalScanner scanner = null;
238     try {
239       Scan scan = ProtobufUtil.toScan(request.getScan());
240       byte[][] colFamilies = scan.getFamilies();
241       byte[] colFamily = colFamilies != null ? colFamilies[0] : null;
242       NavigableSet<byte[]> qualifiers = colFamilies != null ?
243           scan.getFamilyMap().get(colFamily) : null;
244       byte[] qualifier = null;
245       if (qualifiers != null && !qualifiers.isEmpty()) {
246         qualifier = qualifiers.pollFirst();
247       }
248       if (scan.getFilter() == null && qualifier == null)
249         scan.setFilter(new FirstKeyOnlyFilter());
250       scanner = env.getRegion().getScanner(scan);
251       boolean hasMoreRows = false;
252       do {
253         hasMoreRows = scanner.next(results);
254         if (results.size() > 0) {
255           counter++;
256         }
257         results.clear();
258       } while (hasMoreRows);
259       ByteBuffer bb = ByteBuffer.allocate(8).putLong(counter);
260       bb.rewind();
261       response = AggregateResponse.newBuilder().addFirstPart( 
262           ByteString.copyFrom(bb)).build();
263     } catch (IOException e) {
264       ResponseConverter.setControllerException(controller, e);
265     } finally {
266       if (scanner != null) {
267         try {
268           scanner.close();
269         } catch (IOException ignored) {}
270       }
271     }
272     log.info("Row counter from this region is "
273         + env.getRegion().getRegionNameAsString() + ": " + counter);
274     done.run(response);
275   }
276 
277   /**
278    * Gives a Pair with first object as Sum and second object as row count,
279    * computed for a given combination of column qualifier and column family in
280    * the given row range as defined in the Scan object. In its current
281    * implementation, it takes one column family and one column qualifier (if
282    * provided). In case of null column qualifier, an aggregate sum over all the
283    * entire column family will be returned.
284    * <p>
285    * The average is computed in
286    * AggregationClient#avg(byte[], ColumnInterpreter, Scan) by
287    * processing results from all regions, so its "ok" to pass sum and a Long
288    * type.
289    */
290   @Override
291   public void getAvg(RpcController controller, AggregateRequest request,
292       RpcCallback<AggregateResponse> done) {
293     AggregateResponse response = null;
294     InternalScanner scanner = null;
295     try {
296       ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request);
297       S sumVal = null;
298       Long rowCountVal = 0l;
299       Scan scan = ProtobufUtil.toScan(request.getScan());
300       scanner = env.getRegion().getScanner(scan);
301       byte[] colFamily = scan.getFamilies()[0];
302       NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(colFamily);
303       byte[] qualifier = null;
304       if (qualifiers != null && !qualifiers.isEmpty()) {
305         qualifier = qualifiers.pollFirst();
306       }
307       List<Cell> results = new ArrayList<Cell>();
308       boolean hasMoreRows = false;
309     
310       do {
311         results.clear();
312         hasMoreRows = scanner.next(results);
313         for (Cell kv : results) {
314           sumVal = ci.add(sumVal, ci.castToReturnType(ci.getValue(colFamily,
315               qualifier, kv)));
316         }
317         rowCountVal++;
318       } while (hasMoreRows);
319       if (sumVal != null) {
320         ByteString first = ci.getProtoForPromotedType(sumVal).toByteString();
321         AggregateResponse.Builder pair = AggregateResponse.newBuilder();
322         pair.addFirstPart(first);
323         ByteBuffer bb = ByteBuffer.allocate(8).putLong(rowCountVal);
324         bb.rewind();
325         pair.setSecondPart(ByteString.copyFrom(bb));
326         response = pair.build();
327       }
328     } catch (IOException e) {
329       ResponseConverter.setControllerException(controller, e);
330     } finally {
331       if (scanner != null) {
332         try {
333           scanner.close();
334         } catch (IOException ignored) {}
335       }
336     }
337     done.run(response);
338   }
339 
340   /**
341    * Gives a Pair with first object a List containing Sum and sum of squares,
342    * and the second object as row count. It is computed for a given combination of
343    * column qualifier and column family in the given row range as defined in the
344    * Scan object. In its current implementation, it takes one column family and
345    * one column qualifier (if provided). The idea is get the value of variance first:
346    * the average of the squares less the square of the average a standard
347    * deviation is square root of variance.
348    */
349   @Override
350   public void getStd(RpcController controller, AggregateRequest request,
351       RpcCallback<AggregateResponse> done) {
352     InternalScanner scanner = null;
353     AggregateResponse response = null;
354     try {
355       ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request);
356       S sumVal = null, sumSqVal = null, tempVal = null;
357       long rowCountVal = 0l;
358       Scan scan = ProtobufUtil.toScan(request.getScan());
359       scanner = env.getRegion().getScanner(scan);
360       byte[] colFamily = scan.getFamilies()[0];
361       NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(colFamily);
362       byte[] qualifier = null;
363       if (qualifiers != null && !qualifiers.isEmpty()) {
364         qualifier = qualifiers.pollFirst();
365       }
366       List<Cell> results = new ArrayList<Cell>();
367 
368       boolean hasMoreRows = false;
369     
370       do {
371         tempVal = null;
372         hasMoreRows = scanner.next(results);
373         for (Cell kv : results) {
374           tempVal = ci.add(tempVal, ci.castToReturnType(ci.getValue(colFamily,
375               qualifier, kv)));
376         }
377         results.clear();
378         sumVal = ci.add(sumVal, tempVal);
379         sumSqVal = ci.add(sumSqVal, ci.multiply(tempVal, tempVal));
380         rowCountVal++;
381       } while (hasMoreRows);
382       if (sumVal != null) {
383         ByteString first_sumVal = ci.getProtoForPromotedType(sumVal).toByteString();
384         ByteString first_sumSqVal = ci.getProtoForPromotedType(sumSqVal).toByteString();
385         AggregateResponse.Builder pair = AggregateResponse.newBuilder();
386         pair.addFirstPart(first_sumVal);
387         pair.addFirstPart(first_sumSqVal);
388         ByteBuffer bb = ByteBuffer.allocate(8).putLong(rowCountVal);
389         bb.rewind();
390         pair.setSecondPart(ByteString.copyFrom(bb));
391         response = pair.build();
392       }
393     } catch (IOException e) {
394       ResponseConverter.setControllerException(controller, e);
395     } finally {
396       if (scanner != null) {
397         try {
398           scanner.close();
399         } catch (IOException ignored) {}
400       }
401     }
402     done.run(response);
403   }
404 
405   /**
406    * Gives a List containing sum of values and sum of weights.
407    * It is computed for the combination of column
408    * family and column qualifier(s) in the given row range as defined in the
409    * Scan object. In its current implementation, it takes one column family and
410    * two column qualifiers. The first qualifier is for values column and 
411    * the second qualifier (optional) is for weight column.
412    */
413   @Override
414   public void getMedian(RpcController controller, AggregateRequest request,
415       RpcCallback<AggregateResponse> done) {
416     AggregateResponse response = null;
417     InternalScanner scanner = null;
418     try {
419       ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request);
420       S sumVal = null, sumWeights = null, tempVal = null, tempWeight = null;
421       Scan scan = ProtobufUtil.toScan(request.getScan());
422       scanner = env.getRegion().getScanner(scan);
423       byte[] colFamily = scan.getFamilies()[0];
424       NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(colFamily);
425       byte[] valQualifier = null, weightQualifier = null;
426       if (qualifiers != null && !qualifiers.isEmpty()) {
427         valQualifier = qualifiers.pollFirst();
428         // if weighted median is requested, get qualifier for the weight column
429         weightQualifier = qualifiers.pollLast();
430       }
431       List<Cell> results = new ArrayList<Cell>();
432 
433       boolean hasMoreRows = false;
434     
435       do {
436         tempVal = null;
437         tempWeight = null;
438         hasMoreRows = scanner.next(results);
439         for (Cell kv : results) {
440           tempVal = ci.add(tempVal, ci.castToReturnType(ci.getValue(colFamily,
441               valQualifier, kv)));
442           if (weightQualifier != null) {
443             tempWeight = ci.add(tempWeight,
444                 ci.castToReturnType(ci.getValue(colFamily, weightQualifier, kv)));
445           }
446         }
447         results.clear();
448         sumVal = ci.add(sumVal, tempVal);
449         sumWeights = ci.add(sumWeights, tempWeight);
450       } while (hasMoreRows);
451       ByteString first_sumVal = ci.getProtoForPromotedType(sumVal).toByteString();
452       S s = sumWeights == null ? ci.castToReturnType(ci.getMinValue()) : sumWeights;
453       ByteString first_sumWeights = ci.getProtoForPromotedType(s).toByteString();
454       AggregateResponse.Builder pair = AggregateResponse.newBuilder();
455       pair.addFirstPart(first_sumVal);
456       pair.addFirstPart(first_sumWeights); 
457       response = pair.build();
458     } catch (IOException e) {
459       ResponseConverter.setControllerException(controller, e);
460     } finally {
461       if (scanner != null) {
462         try {
463           scanner.close();
464         } catch (IOException ignored) {}
465       }
466     }
467     done.run(response);
468   }
469 
470   @SuppressWarnings("unchecked")
471   ColumnInterpreter<T,S,P,Q,R> constructColumnInterpreterFromRequest(
472       AggregateRequest request) throws IOException {
473     String className = request.getInterpreterClassName();
474     Class<?> cls;
475     try {
476       cls = Class.forName(className);
477       ColumnInterpreter<T,S,P,Q,R> ci = (ColumnInterpreter<T, S, P, Q, R>) cls.newInstance();
478       if (request.hasInterpreterSpecificBytes()) {
479         ByteString b = request.getInterpreterSpecificBytes();
480         P initMsg = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 2, b);
481         ci.initialize(initMsg);
482       }
483       return ci;
484     } catch (ClassNotFoundException e) {
485       throw new IOException(e);
486     } catch (InstantiationException e) {
487       throw new IOException(e);
488     } catch (IllegalAccessException e) {
489       throw new IOException(e);
490     }
491   }
492 
493   @Override
494   public Service getService() {
495     return this;
496   }
497 
498   /**
499    * Stores a reference to the coprocessor environment provided by the
500    * {@link org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost} from the region where this
501    * coprocessor is loaded.  Since this is a coprocessor endpoint, it always expects to be loaded
502    * on a table region, so always expects this to be an instance of
503    * {@link RegionCoprocessorEnvironment}.
504    * @param env the environment provided by the coprocessor host
505    * @throws IOException if the provided environment is not an instance of
506    * {@code RegionCoprocessorEnvironment}
507    */
508   @Override
509   public void start(CoprocessorEnvironment env) throws IOException {
510     if (env instanceof RegionCoprocessorEnvironment) {
511       this.env = (RegionCoprocessorEnvironment)env;
512     } else {
513       throw new CoprocessorException("Must be loaded on a table region!");
514     }
515   }
516 
517   @Override
518   public void stop(CoprocessorEnvironment env) throws IOException {
519     // nothing to do
520   }
521   
522 }