View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.coprocessor;
20  
21  import java.io.IOException;
22  import java.nio.ByteBuffer;
23  import java.util.ArrayList;
24  import java.util.List;
25  import java.util.NavigableSet;
26  
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.hbase.Cell;
30  import org.apache.hadoop.hbase.Coprocessor;
31  import org.apache.hadoop.hbase.CoprocessorEnvironment;
32  import org.apache.hadoop.hbase.classification.InterfaceAudience;
33  import org.apache.hadoop.hbase.client.Scan;
34  import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
35  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
36  import org.apache.hadoop.hbase.protobuf.ResponseConverter;
37  import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateRequest;
38  import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateResponse;
39  import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateService;
40  import org.apache.hadoop.hbase.regionserver.InternalScanner;
41  
42  import com.google.protobuf.ByteString;
43  import com.google.protobuf.Message;
44  import com.google.protobuf.RpcCallback;
45  import com.google.protobuf.RpcController;
46  import com.google.protobuf.Service;
47  
48  /**
49   * A concrete AggregateProtocol implementation. Its system level coprocessor
50   * that computes the aggregate function at a region level.
51   * {@link ColumnInterpreter} is used to interpret column value. This class is
52   * parameterized with the following (these are the types with which the {@link ColumnInterpreter}
53   * is parameterized, and for more description on these, refer to {@link ColumnInterpreter}):
54   * @param <T> Cell value data type
55   * @param <S> Promoted data type
56   * @param <P> PB message that is used to transport initializer specific bytes
57   * @param <Q> PB message that is used to transport Cell (<T>) instance
58   * @param <R> PB message that is used to transport Promoted (<S>) instance
59   */
60  @InterfaceAudience.Private
61  public class AggregateImplementation<T, S, P extends Message, Q extends Message, R extends Message> 
62  extends AggregateService implements CoprocessorService, Coprocessor {
63    protected static final Log log = LogFactory.getLog(AggregateImplementation.class);
64    private RegionCoprocessorEnvironment env;
65  
66    /**
67     * Gives the maximum for a given combination of column qualifier and column
68     * family, in the given row range as defined in the Scan object. In its
69     * current implementation, it takes one column family and one column qualifier
70     * (if provided). In case of null column qualifier, maximum value for the
71     * entire column family will be returned.
72     */
73    @Override
74    public void getMax(RpcController controller, AggregateRequest request,
75        RpcCallback<AggregateResponse> done) {
76      InternalScanner scanner = null;
77      AggregateResponse response = null;
78      T max = null;
79      try {
80        ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request);
81        T temp;
82        Scan scan = ProtobufUtil.toScan(request.getScan());
83        scanner = env.getRegion().getScanner(scan);
84        List<Cell> results = new ArrayList<Cell>();
85        byte[] colFamily = scan.getFamilies()[0];
86        NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(colFamily);
87        byte[] qualifier = null;
88        if (qualifiers != null && !qualifiers.isEmpty()) {
89          qualifier = qualifiers.pollFirst();
90        }
91        // qualifier can be null.
92        boolean hasMoreRows = false;
93        do {
94          hasMoreRows = scanner.next(results);
95          int listSize = results.size();
96          for (int i = 0; i < listSize; i++) {
97            temp = ci.getValue(colFamily, qualifier, results.get(i));
98            max = (max == null || (temp != null && ci.compare(temp, max) > 0)) ? temp : max;
99          }
100         results.clear();
101       } while (hasMoreRows);
102       if (max != null) {
103         AggregateResponse.Builder builder = AggregateResponse.newBuilder();
104         builder.addFirstPart(ci.getProtoForCellType(max).toByteString());
105         response = builder.build();
106       }
107     } catch (IOException e) {
108       ResponseConverter.setControllerException(controller, e);
109     } finally {
110       if (scanner != null) {
111         try {
112           scanner.close();
113         } catch (IOException ignored) {}
114       }
115     }
116     log.info("Maximum from this region is "
117         + env.getRegion().getRegionInfo().getRegionNameAsString() + ": " + max);
118     done.run(response);
119   }
120 
121   /**
122    * Gives the minimum for a given combination of column qualifier and column
123    * family, in the given row range as defined in the Scan object. In its
124    * current implementation, it takes one column family and one column qualifier
125    * (if provided). In case of null column qualifier, minimum value for the
126    * entire column family will be returned.
127    */
128   @Override
129   public void getMin(RpcController controller, AggregateRequest request,
130       RpcCallback<AggregateResponse> done) {
131     AggregateResponse response = null;
132     InternalScanner scanner = null;
133     T min = null;
134     try {
135       ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request);
136       T temp;
137       Scan scan = ProtobufUtil.toScan(request.getScan());
138       scanner = env.getRegion().getScanner(scan);
139       List<Cell> results = new ArrayList<Cell>();
140       byte[] colFamily = scan.getFamilies()[0];
141       NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(colFamily);
142       byte[] qualifier = null;
143       if (qualifiers != null && !qualifiers.isEmpty()) {
144         qualifier = qualifiers.pollFirst();
145       }
146       boolean hasMoreRows = false;
147       do {
148         hasMoreRows = scanner.next(results);
149         int listSize = results.size();
150         for (int i = 0; i < listSize; i++) {
151           temp = ci.getValue(colFamily, qualifier, results.get(i));
152           min = (min == null || (temp != null && ci.compare(temp, min) < 0)) ? temp : min;
153         }
154         results.clear();
155       } while (hasMoreRows);
156       if (min != null) {
157         response = AggregateResponse.newBuilder().addFirstPart( 
158           ci.getProtoForCellType(min).toByteString()).build();
159       }
160     } catch (IOException e) {
161       ResponseConverter.setControllerException(controller, e);
162     } finally {
163       if (scanner != null) {
164         try {
165           scanner.close();
166         } catch (IOException ignored) {}
167       }
168     }
169     log.info("Minimum from this region is "
170         + env.getRegion().getRegionInfo().getRegionNameAsString() + ": " + min);
171     done.run(response);
172   }
173 
174   /**
175    * Gives the sum for a given combination of column qualifier and column
176    * family, in the given row range as defined in the Scan object. In its
177    * current implementation, it takes one column family and one column qualifier
178    * (if provided). In case of null column qualifier, sum for the entire column
179    * family will be returned.
180    */
181   @Override
182   public void getSum(RpcController controller, AggregateRequest request,
183       RpcCallback<AggregateResponse> done) {
184     AggregateResponse response = null;
185     InternalScanner scanner = null;
186     long sum = 0l;
187     try {
188       ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request);
189       S sumVal = null;
190       T temp;
191       Scan scan = ProtobufUtil.toScan(request.getScan());
192       scanner = env.getRegion().getScanner(scan);
193       byte[] colFamily = scan.getFamilies()[0];
194       NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(colFamily);
195       byte[] qualifier = null;
196       if (qualifiers != null && !qualifiers.isEmpty()) {
197         qualifier = qualifiers.pollFirst();
198       }
199       List<Cell> results = new ArrayList<Cell>();
200       boolean hasMoreRows = false;
201       do {
202         hasMoreRows = scanner.next(results);
203         int listSize = results.size();
204         for (int i = 0; i < listSize; i++) {
205           temp = ci.getValue(colFamily, qualifier, results.get(i));
206           if (temp != null)
207             sumVal = ci.add(sumVal, ci.castToReturnType(temp));
208         }
209         results.clear();
210       } while (hasMoreRows);
211       if (sumVal != null) {
212         response = AggregateResponse.newBuilder().addFirstPart( 
213           ci.getProtoForPromotedType(sumVal).toByteString()).build();
214       }
215     } catch (IOException e) {
216       ResponseConverter.setControllerException(controller, e);
217     } finally {
218       if (scanner != null) {
219         try {
220           scanner.close();
221         } catch (IOException ignored) {}
222       }
223     }
224     log.debug("Sum from this region is "
225         + env.getRegion().getRegionInfo().getRegionNameAsString() + ": " + sum);
226     done.run(response);
227   }
228 
229   /**
230    * Gives the row count for the given column family and column qualifier, in
231    * the given row range as defined in the Scan object.
232    * @throws IOException
233    */
234   @Override
235   public void getRowNum(RpcController controller, AggregateRequest request,
236       RpcCallback<AggregateResponse> done) {
237     AggregateResponse response = null;
238     long counter = 0l;
239     List<Cell> results = new ArrayList<Cell>();
240     InternalScanner scanner = null;
241     try {
242       Scan scan = ProtobufUtil.toScan(request.getScan());
243       byte[][] colFamilies = scan.getFamilies();
244       byte[] colFamily = colFamilies != null ? colFamilies[0] : null;
245       NavigableSet<byte[]> qualifiers = colFamilies != null ?
246           scan.getFamilyMap().get(colFamily) : null;
247       byte[] qualifier = null;
248       if (qualifiers != null && !qualifiers.isEmpty()) {
249         qualifier = qualifiers.pollFirst();
250       }
251       if (scan.getFilter() == null && qualifier == null)
252         scan.setFilter(new FirstKeyOnlyFilter());
253       scanner = env.getRegion().getScanner(scan);
254       boolean hasMoreRows = false;
255       do {
256         hasMoreRows = scanner.next(results);
257         if (results.size() > 0) {
258           counter++;
259         }
260         results.clear();
261       } while (hasMoreRows);
262       ByteBuffer bb = ByteBuffer.allocate(8).putLong(counter);
263       bb.rewind();
264       response = AggregateResponse.newBuilder().addFirstPart( 
265           ByteString.copyFrom(bb)).build();
266     } catch (IOException e) {
267       ResponseConverter.setControllerException(controller, e);
268     } finally {
269       if (scanner != null) {
270         try {
271           scanner.close();
272         } catch (IOException ignored) {}
273       }
274     }
275     log.info("Row counter from this region is "
276         + env.getRegion().getRegionInfo().getRegionNameAsString() + ": " + counter);
277     done.run(response);
278   }
279 
280   /**
281    * Gives a Pair with first object as Sum and second object as row count,
282    * computed for a given combination of column qualifier and column family in
283    * the given row range as defined in the Scan object. In its current
284    * implementation, it takes one column family and one column qualifier (if
285    * provided). In case of null column qualifier, an aggregate sum over all the
286    * entire column family will be returned.
287    * <p>
288    * The average is computed in
289    * AggregationClient#avg(byte[], ColumnInterpreter, Scan) by
290    * processing results from all regions, so its "ok" to pass sum and a Long
291    * type.
292    */
293   @Override
294   public void getAvg(RpcController controller, AggregateRequest request,
295       RpcCallback<AggregateResponse> done) {
296     AggregateResponse response = null;
297     InternalScanner scanner = null;
298     try {
299       ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request);
300       S sumVal = null;
301       Long rowCountVal = 0l;
302       Scan scan = ProtobufUtil.toScan(request.getScan());
303       scanner = env.getRegion().getScanner(scan);
304       byte[] colFamily = scan.getFamilies()[0];
305       NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(colFamily);
306       byte[] qualifier = null;
307       if (qualifiers != null && !qualifiers.isEmpty()) {
308         qualifier = qualifiers.pollFirst();
309       }
310       List<Cell> results = new ArrayList<Cell>();
311       boolean hasMoreRows = false;
312     
313       do {
314         results.clear();
315         hasMoreRows = scanner.next(results);
316         int listSize = results.size();
317         for (int i = 0; i < listSize; i++) {
318           sumVal = ci.add(sumVal, ci.castToReturnType(ci.getValue(colFamily,
319               qualifier, results.get(i))));
320         }
321         rowCountVal++;
322       } while (hasMoreRows);
323       if (sumVal != null) {
324         ByteString first = ci.getProtoForPromotedType(sumVal).toByteString();
325         AggregateResponse.Builder pair = AggregateResponse.newBuilder();
326         pair.addFirstPart(first);
327         ByteBuffer bb = ByteBuffer.allocate(8).putLong(rowCountVal);
328         bb.rewind();
329         pair.setSecondPart(ByteString.copyFrom(bb));
330         response = pair.build();
331       }
332     } catch (IOException e) {
333       ResponseConverter.setControllerException(controller, e);
334     } finally {
335       if (scanner != null) {
336         try {
337           scanner.close();
338         } catch (IOException ignored) {}
339       }
340     }
341     done.run(response);
342   }
343 
344   /**
345    * Gives a Pair with first object a List containing Sum and sum of squares,
346    * and the second object as row count. It is computed for a given combination of
347    * column qualifier and column family in the given row range as defined in the
348    * Scan object. In its current implementation, it takes one column family and
349    * one column qualifier (if provided). The idea is get the value of variance first:
350    * the average of the squares less the square of the average a standard
351    * deviation is square root of variance.
352    */
353   @Override
354   public void getStd(RpcController controller, AggregateRequest request,
355       RpcCallback<AggregateResponse> done) {
356     InternalScanner scanner = null;
357     AggregateResponse response = null;
358     try {
359       ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request);
360       S sumVal = null, sumSqVal = null, tempVal = null;
361       long rowCountVal = 0l;
362       Scan scan = ProtobufUtil.toScan(request.getScan());
363       scanner = env.getRegion().getScanner(scan);
364       byte[] colFamily = scan.getFamilies()[0];
365       NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(colFamily);
366       byte[] qualifier = null;
367       if (qualifiers != null && !qualifiers.isEmpty()) {
368         qualifier = qualifiers.pollFirst();
369       }
370       List<Cell> results = new ArrayList<Cell>();
371 
372       boolean hasMoreRows = false;
373     
374       do {
375         tempVal = null;
376         hasMoreRows = scanner.next(results);
377         int listSize = results.size();
378         for (int i = 0; i < listSize; i++) {
379           tempVal = ci.add(tempVal, ci.castToReturnType(ci.getValue(colFamily,
380               qualifier, results.get(i))));
381         }
382         results.clear();
383         sumVal = ci.add(sumVal, tempVal);
384         sumSqVal = ci.add(sumSqVal, ci.multiply(tempVal, tempVal));
385         rowCountVal++;
386       } while (hasMoreRows);
387       if (sumVal != null) {
388         ByteString first_sumVal = ci.getProtoForPromotedType(sumVal).toByteString();
389         ByteString first_sumSqVal = ci.getProtoForPromotedType(sumSqVal).toByteString();
390         AggregateResponse.Builder pair = AggregateResponse.newBuilder();
391         pair.addFirstPart(first_sumVal);
392         pair.addFirstPart(first_sumSqVal);
393         ByteBuffer bb = ByteBuffer.allocate(8).putLong(rowCountVal);
394         bb.rewind();
395         pair.setSecondPart(ByteString.copyFrom(bb));
396         response = pair.build();
397       }
398     } catch (IOException e) {
399       ResponseConverter.setControllerException(controller, e);
400     } finally {
401       if (scanner != null) {
402         try {
403           scanner.close();
404         } catch (IOException ignored) {}
405       }
406     }
407     done.run(response);
408   }
409 
410   /**
411    * Gives a List containing sum of values and sum of weights.
412    * It is computed for the combination of column
413    * family and column qualifier(s) in the given row range as defined in the
414    * Scan object. In its current implementation, it takes one column family and
415    * two column qualifiers. The first qualifier is for values column and 
416    * the second qualifier (optional) is for weight column.
417    */
418   @Override
419   public void getMedian(RpcController controller, AggregateRequest request,
420       RpcCallback<AggregateResponse> done) {
421     AggregateResponse response = null;
422     InternalScanner scanner = null;
423     try {
424       ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request);
425       S sumVal = null, sumWeights = null, tempVal = null, tempWeight = null;
426       Scan scan = ProtobufUtil.toScan(request.getScan());
427       scanner = env.getRegion().getScanner(scan);
428       byte[] colFamily = scan.getFamilies()[0];
429       NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(colFamily);
430       byte[] valQualifier = null, weightQualifier = null;
431       if (qualifiers != null && !qualifiers.isEmpty()) {
432         valQualifier = qualifiers.pollFirst();
433         // if weighted median is requested, get qualifier for the weight column
434         weightQualifier = qualifiers.pollLast();
435       }
436       List<Cell> results = new ArrayList<Cell>();
437 
438       boolean hasMoreRows = false;
439     
440       do {
441         tempVal = null;
442         tempWeight = null;
443         hasMoreRows = scanner.next(results);
444         int listSize = results.size();
445         for (int i = 0; i < listSize; i++) {
446           Cell kv = results.get(i);
447           tempVal = ci.add(tempVal, ci.castToReturnType(ci.getValue(colFamily,
448               valQualifier, kv)));
449           if (weightQualifier != null) {
450             tempWeight = ci.add(tempWeight,
451                 ci.castToReturnType(ci.getValue(colFamily, weightQualifier, kv)));
452           }
453         }
454         results.clear();
455         sumVal = ci.add(sumVal, tempVal);
456         sumWeights = ci.add(sumWeights, tempWeight);
457       } while (hasMoreRows);
458       ByteString first_sumVal = ci.getProtoForPromotedType(sumVal).toByteString();
459       S s = sumWeights == null ? ci.castToReturnType(ci.getMinValue()) : sumWeights;
460       ByteString first_sumWeights = ci.getProtoForPromotedType(s).toByteString();
461       AggregateResponse.Builder pair = AggregateResponse.newBuilder();
462       pair.addFirstPart(first_sumVal);
463       pair.addFirstPart(first_sumWeights); 
464       response = pair.build();
465     } catch (IOException e) {
466       ResponseConverter.setControllerException(controller, e);
467     } finally {
468       if (scanner != null) {
469         try {
470           scanner.close();
471         } catch (IOException ignored) {}
472       }
473     }
474     done.run(response);
475   }
476 
477   @SuppressWarnings("unchecked")
478   ColumnInterpreter<T,S,P,Q,R> constructColumnInterpreterFromRequest(
479       AggregateRequest request) throws IOException {
480     String className = request.getInterpreterClassName();
481     Class<?> cls;
482     try {
483       cls = Class.forName(className);
484       ColumnInterpreter<T,S,P,Q,R> ci = (ColumnInterpreter<T, S, P, Q, R>) cls.newInstance();
485       if (request.hasInterpreterSpecificBytes()) {
486         ByteString b = request.getInterpreterSpecificBytes();
487         P initMsg = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 2, b);
488         ci.initialize(initMsg);
489       }
490       return ci;
491     } catch (ClassNotFoundException e) {
492       throw new IOException(e);
493     } catch (InstantiationException e) {
494       throw new IOException(e);
495     } catch (IllegalAccessException e) {
496       throw new IOException(e);
497     }
498   }
499 
500   @Override
501   public Service getService() {
502     return this;
503   }
504 
505   /**
506    * Stores a reference to the coprocessor environment provided by the
507    * {@link org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost} from the region where this
508    * coprocessor is loaded.  Since this is a coprocessor endpoint, it always expects to be loaded
509    * on a table region, so always expects this to be an instance of
510    * {@link RegionCoprocessorEnvironment}.
511    * @param env the environment provided by the coprocessor host
512    * @throws IOException if the provided environment is not an instance of
513    * {@code RegionCoprocessorEnvironment}
514    */
515   @Override
516   public void start(CoprocessorEnvironment env) throws IOException {
517     if (env instanceof RegionCoprocessorEnvironment) {
518       this.env = (RegionCoprocessorEnvironment)env;
519     } else {
520       throw new CoprocessorException("Must be loaded on a table region!");
521     }
522   }
523 
524   @Override
525   public void stop(CoprocessorEnvironment env) throws IOException {
526     // nothing to do
527   }
528   
529 }