001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.coprocessor;
019
020import static org.apache.hadoop.hbase.client.coprocessor.AggregationHelper.getParsedGenericInstance;
021
022import com.google.protobuf.ByteString;
023import com.google.protobuf.Message;
024import com.google.protobuf.RpcCallback;
025import com.google.protobuf.RpcController;
026import com.google.protobuf.Service;
027
028import java.io.IOException;
029import java.lang.reflect.InvocationTargetException;
030import java.nio.ByteBuffer;
031import java.util.ArrayList;
032import java.util.Collections;
033import java.util.List;
034import java.util.NavigableSet;
035
036import org.apache.hadoop.hbase.Cell;
037import org.apache.hadoop.hbase.CoprocessorEnvironment;
038import org.apache.hadoop.hbase.client.Scan;
039import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
040import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
041import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
042import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateRequest;
043import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateResponse;
044import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateService;
045import org.apache.hadoop.hbase.regionserver.InternalScanner;
046import org.apache.yetus.audience.InterfaceAudience;
047import org.slf4j.Logger;
048import org.slf4j.LoggerFactory;
049
050/**
051 * A concrete AggregateProtocol implementation. Its system level coprocessor
052 * that computes the aggregate function at a region level.
053 * {@link ColumnInterpreter} is used to interpret column value. This class is
054 * parameterized with the following (these are the types with which the {@link ColumnInterpreter}
055 * is parameterized, and for more description on these, refer to {@link ColumnInterpreter}):
056 * @param T Cell value data type
057 * @param S Promoted data type
058 * @param P PB message that is used to transport initializer specific bytes
059 * @param Q PB message that is used to transport Cell (<T>) instance
060 * @param R PB message that is used to transport Promoted (<S>) instance
061 */
062@InterfaceAudience.Private
063public class AggregateImplementation<T, S, P extends Message, Q extends Message, R extends Message>
064  extends AggregateService implements RegionCoprocessor {
065  protected static final Logger log = LoggerFactory.getLogger(AggregateImplementation.class);
066  private RegionCoprocessorEnvironment env;
067
068  /**
069   * Gives the maximum for a given combination of column qualifier and column
070   * family, in the given row range as defined in the Scan object. In its
071   * current implementation, it takes one column family and one column qualifier
072   * (if provided). In case of null column qualifier, maximum value for the
073   * entire column family will be returned.
074   */
075  @Override
076  public void getMax(RpcController controller, AggregateRequest request,
077          RpcCallback<AggregateResponse> done) {
078    InternalScanner scanner = null;
079    AggregateResponse response = null;
080    T max = null;
081    try {
082      ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request);
083      T temp;
084      Scan scan = ProtobufUtil.toScan(request.getScan());
085      scanner = env.getRegion().getScanner(scan);
086      List<Cell> results = new ArrayList<>();
087      byte[] colFamily = scan.getFamilies()[0];
088      NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(colFamily);
089      byte[] qualifier = null;
090      if (qualifiers != null && !qualifiers.isEmpty()) {
091        qualifier = qualifiers.pollFirst();
092      }
093      // qualifier can be null.
094      boolean hasMoreRows = false;
095      do {
096        hasMoreRows = scanner.next(results);
097        int listSize = results.size();
098        for (int i = 0; i < listSize; i++) {
099          temp = ci.getValue(colFamily, qualifier, results.get(i));
100          max = (max == null || (temp != null && ci.compare(temp, max) > 0)) ? temp : max;
101        }
102        results.clear();
103      } while (hasMoreRows);
104      if (max != null) {
105        AggregateResponse.Builder builder = AggregateResponse.newBuilder();
106        builder.addFirstPart(ci.getProtoForCellType(max).toByteString());
107        response = builder.build();
108      }
109    } catch (IOException e) {
110      CoprocessorRpcUtils.setControllerException(controller, e);
111    } finally {
112      if (scanner != null) {
113        try {
114          scanner.close();
115        } catch (IOException ignored) {}
116      }
117    }
118    log.info("Maximum from this region is "
119        + env.getRegion().getRegionInfo().getRegionNameAsString() + ": " + max);
120    done.run(response);
121  }
122
123  /**
124   * Gives the minimum for a given combination of column qualifier and column
125   * family, in the given row range as defined in the Scan object. In its
126   * current implementation, it takes one column family and one column qualifier
127   * (if provided). In case of null column qualifier, minimum value for the
128   * entire column family will be returned.
129   */
130  @Override
131  public void getMin(RpcController controller, AggregateRequest request,
132          RpcCallback<AggregateResponse> done) {
133    AggregateResponse response = null;
134    InternalScanner scanner = null;
135    T min = null;
136    try {
137      ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request);
138      T temp;
139      Scan scan = ProtobufUtil.toScan(request.getScan());
140      scanner = env.getRegion().getScanner(scan);
141      List<Cell> results = new ArrayList<>();
142      byte[] colFamily = scan.getFamilies()[0];
143      NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(colFamily);
144      byte[] qualifier = null;
145      if (qualifiers != null && !qualifiers.isEmpty()) {
146        qualifier = qualifiers.pollFirst();
147      }
148      boolean hasMoreRows = false;
149      do {
150        hasMoreRows = scanner.next(results);
151        int listSize = results.size();
152        for (int i = 0; i < listSize; i++) {
153          temp = ci.getValue(colFamily, qualifier, results.get(i));
154          min = (min == null || (temp != null && ci.compare(temp, min) < 0)) ? temp : min;
155        }
156        results.clear();
157      } while (hasMoreRows);
158      if (min != null) {
159        response = AggregateResponse.newBuilder().addFirstPart(
160          ci.getProtoForCellType(min).toByteString()).build();
161      }
162    } catch (IOException e) {
163      CoprocessorRpcUtils.setControllerException(controller, e);
164    } finally {
165      if (scanner != null) {
166        try {
167          scanner.close();
168        } catch (IOException ignored) {}
169      }
170    }
171    log.info("Minimum from this region is "
172        + env.getRegion().getRegionInfo().getRegionNameAsString() + ": " + min);
173    done.run(response);
174  }
175
176  /**
177   * Gives the sum for a given combination of column qualifier and column
178   * family, in the given row range as defined in the Scan object. In its
179   * current implementation, it takes one column family and one column qualifier
180   * (if provided). In case of null column qualifier, sum for the entire column
181   * family will be returned.
182   */
183  @Override
184  public void getSum(RpcController controller, AggregateRequest request,
185          RpcCallback<AggregateResponse> done) {
186    AggregateResponse response = null;
187    InternalScanner scanner = null;
188    long sum = 0L;
189    try {
190      ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request);
191      S sumVal = null;
192      T temp;
193      Scan scan = ProtobufUtil.toScan(request.getScan());
194      scanner = env.getRegion().getScanner(scan);
195      byte[] colFamily = scan.getFamilies()[0];
196      NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(colFamily);
197      byte[] qualifier = null;
198      if (qualifiers != null && !qualifiers.isEmpty()) {
199        qualifier = qualifiers.pollFirst();
200      }
201      List<Cell> results = new ArrayList<>();
202      boolean hasMoreRows = false;
203      do {
204        hasMoreRows = scanner.next(results);
205        int listSize = results.size();
206        for (int i = 0; i < listSize; i++) {
207          temp = ci.getValue(colFamily, qualifier, results.get(i));
208          if (temp != null) {
209            sumVal = ci.add(sumVal, ci.castToReturnType(temp));
210          }
211        }
212        results.clear();
213      } while (hasMoreRows);
214      if (sumVal != null) {
215        response = AggregateResponse.newBuilder().addFirstPart(
216          ci.getProtoForPromotedType(sumVal).toByteString()).build();
217      }
218    } catch (IOException e) {
219      CoprocessorRpcUtils.setControllerException(controller, e);
220    } finally {
221      if (scanner != null) {
222        try {
223          scanner.close();
224        } catch (IOException ignored) {}
225      }
226    }
227    log.debug("Sum from this region is "
228        + env.getRegion().getRegionInfo().getRegionNameAsString() + ": " + sum);
229    done.run(response);
230  }
231
232  /**
233   * Gives the row count for the given column family and column qualifier, in
234   * the given row range as defined in the Scan object.
235   */
236  @Override
237  public void getRowNum(RpcController controller, AggregateRequest request,
238          RpcCallback<AggregateResponse> done) {
239    AggregateResponse response = null;
240    long counter = 0L;
241    List<Cell> results = new ArrayList<>();
242    InternalScanner scanner = null;
243    try {
244      Scan scan = ProtobufUtil.toScan(request.getScan());
245      byte[][] colFamilies = scan.getFamilies();
246      byte[] colFamily = colFamilies != null ? colFamilies[0] : null;
247      NavigableSet<byte[]> qualifiers = colFamilies != null ?
248          scan.getFamilyMap().get(colFamily) : null;
249      byte[] qualifier = null;
250      if (qualifiers != null && !qualifiers.isEmpty()) {
251        qualifier = qualifiers.pollFirst();
252      }
253      if (scan.getFilter() == null && qualifier == null) {
254        scan.setFilter(new FirstKeyOnlyFilter());
255      }
256      scanner = env.getRegion().getScanner(scan);
257      boolean hasMoreRows = false;
258      do {
259        hasMoreRows = scanner.next(results);
260        if (results.size() > 0) {
261          counter++;
262        }
263        results.clear();
264      } while (hasMoreRows);
265      ByteBuffer bb = ByteBuffer.allocate(8).putLong(counter);
266      bb.rewind();
267      response = AggregateResponse.newBuilder().addFirstPart(
268          ByteString.copyFrom(bb)).build();
269    } catch (IOException e) {
270      CoprocessorRpcUtils.setControllerException(controller, e);
271    } finally {
272      if (scanner != null) {
273        try {
274          scanner.close();
275        } catch (IOException ignored) {}
276      }
277    }
278    log.info("Row counter from this region is "
279        + env.getRegion().getRegionInfo().getRegionNameAsString() + ": " + counter);
280    done.run(response);
281  }
282
283  /**
284   * Gives a Pair with first object as Sum and second object as row count,
285   * computed for a given combination of column qualifier and column family in
286   * the given row range as defined in the Scan object. In its current
287   * implementation, it takes one column family and one column qualifier (if
288   * provided). In case of null column qualifier, an aggregate sum over all the
289   * entire column family will be returned.
290   * <p>
291   * The average is computed in
292   * AggregationClient#avg(byte[], ColumnInterpreter, Scan) by
293   * processing results from all regions, so its "ok" to pass sum and a Long
294   * type.
295   */
296  @Override
297  public void getAvg(RpcController controller, AggregateRequest request,
298          RpcCallback<AggregateResponse> done) {
299    AggregateResponse response = null;
300    InternalScanner scanner = null;
301    try {
302      ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request);
303      S sumVal = null;
304      Long rowCountVal = 0L;
305      Scan scan = ProtobufUtil.toScan(request.getScan());
306      scanner = env.getRegion().getScanner(scan);
307      byte[] colFamily = scan.getFamilies()[0];
308      NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(colFamily);
309      byte[] qualifier = null;
310      if (qualifiers != null && !qualifiers.isEmpty()) {
311        qualifier = qualifiers.pollFirst();
312      }
313      List<Cell> results = new ArrayList<>();
314      boolean hasMoreRows = false;
315
316      do {
317        results.clear();
318        hasMoreRows = scanner.next(results);
319        int listSize = results.size();
320        for (int i = 0; i < listSize; i++) {
321          sumVal = ci.add(sumVal, ci.castToReturnType(ci.getValue(colFamily,
322              qualifier, results.get(i))));
323        }
324        rowCountVal++;
325      } while (hasMoreRows);
326      if (sumVal != null) {
327        ByteString first = ci.getProtoForPromotedType(sumVal).toByteString();
328        AggregateResponse.Builder pair = AggregateResponse.newBuilder();
329        pair.addFirstPart(first);
330        ByteBuffer bb = ByteBuffer.allocate(8).putLong(rowCountVal);
331        bb.rewind();
332        pair.setSecondPart(ByteString.copyFrom(bb));
333        response = pair.build();
334      }
335    } catch (IOException e) {
336      CoprocessorRpcUtils.setControllerException(controller, e);
337    } finally {
338      if (scanner != null) {
339        try {
340          scanner.close();
341        } catch (IOException ignored) {}
342      }
343    }
344    done.run(response);
345  }
346
347  /**
348   * Gives a Pair with first object a List containing Sum and sum of squares,
349   * and the second object as row count. It is computed for a given combination of
350   * column qualifier and column family in the given row range as defined in the
351   * Scan object. In its current implementation, it takes one column family and
352   * one column qualifier (if provided). The idea is get the value of variance first:
353   * the average of the squares less the square of the average a standard
354   * deviation is square root of variance.
355   */
356  @Override
357  public void getStd(RpcController controller, AggregateRequest request,
358          RpcCallback<AggregateResponse> done) {
359    InternalScanner scanner = null;
360    AggregateResponse response = null;
361    try {
362      ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request);
363      S sumVal = null, sumSqVal = null, tempVal = null;
364      long rowCountVal = 0L;
365      Scan scan = ProtobufUtil.toScan(request.getScan());
366      scanner = env.getRegion().getScanner(scan);
367      byte[] colFamily = scan.getFamilies()[0];
368      NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(colFamily);
369      byte[] qualifier = null;
370      if (qualifiers != null && !qualifiers.isEmpty()) {
371        qualifier = qualifiers.pollFirst();
372      }
373      List<Cell> results = new ArrayList<>();
374
375      boolean hasMoreRows = false;
376
377      do {
378        tempVal = null;
379        hasMoreRows = scanner.next(results);
380        int listSize = results.size();
381        for (int i = 0; i < listSize; i++) {
382          tempVal = ci.add(tempVal, ci.castToReturnType(ci.getValue(colFamily,
383              qualifier, results.get(i))));
384        }
385        results.clear();
386        sumVal = ci.add(sumVal, tempVal);
387        sumSqVal = ci.add(sumSqVal, ci.multiply(tempVal, tempVal));
388        rowCountVal++;
389      } while (hasMoreRows);
390      if (sumVal != null) {
391        ByteString first_sumVal = ci.getProtoForPromotedType(sumVal).toByteString();
392        ByteString first_sumSqVal = ci.getProtoForPromotedType(sumSqVal).toByteString();
393        AggregateResponse.Builder pair = AggregateResponse.newBuilder();
394        pair.addFirstPart(first_sumVal);
395        pair.addFirstPart(first_sumSqVal);
396        ByteBuffer bb = ByteBuffer.allocate(8).putLong(rowCountVal);
397        bb.rewind();
398        pair.setSecondPart(ByteString.copyFrom(bb));
399        response = pair.build();
400      }
401    } catch (IOException e) {
402      CoprocessorRpcUtils.setControllerException(controller, e);
403    } finally {
404      if (scanner != null) {
405        try {
406          scanner.close();
407        } catch (IOException ignored) {}
408      }
409    }
410    done.run(response);
411  }
412
413  /**
414   * Gives a List containing sum of values and sum of weights.
415   * It is computed for the combination of column
416   * family and column qualifier(s) in the given row range as defined in the
417   * Scan object. In its current implementation, it takes one column family and
418   * two column qualifiers. The first qualifier is for values column and
419   * the second qualifier (optional) is for weight column.
420   */
421  @Override
422  public void getMedian(RpcController controller, AggregateRequest request,
423          RpcCallback<AggregateResponse> done) {
424    AggregateResponse response = null;
425    InternalScanner scanner = null;
426    try {
427      ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request);
428      S sumVal = null, sumWeights = null, tempVal = null, tempWeight = null;
429      Scan scan = ProtobufUtil.toScan(request.getScan());
430      scanner = env.getRegion().getScanner(scan);
431      byte[] colFamily = scan.getFamilies()[0];
432      NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(colFamily);
433      byte[] valQualifier = null, weightQualifier = null;
434      if (qualifiers != null && !qualifiers.isEmpty()) {
435        valQualifier = qualifiers.pollFirst();
436        // if weighted median is requested, get qualifier for the weight column
437        weightQualifier = qualifiers.pollLast();
438      }
439      List<Cell> results = new ArrayList<>();
440
441      boolean hasMoreRows = false;
442
443      do {
444        tempVal = null;
445        tempWeight = null;
446        hasMoreRows = scanner.next(results);
447        int listSize = results.size();
448        for (int i = 0; i < listSize; i++) {
449          Cell kv = results.get(i);
450          tempVal = ci.add(tempVal, ci.castToReturnType(ci.getValue(colFamily,
451              valQualifier, kv)));
452          if (weightQualifier != null) {
453            tempWeight = ci.add(tempWeight,
454                ci.castToReturnType(ci.getValue(colFamily, weightQualifier, kv)));
455          }
456        }
457        results.clear();
458        sumVal = ci.add(sumVal, tempVal);
459        sumWeights = ci.add(sumWeights, tempWeight);
460      } while (hasMoreRows);
461      ByteString first_sumVal = ci.getProtoForPromotedType(sumVal).toByteString();
462      S s = sumWeights == null ? ci.castToReturnType(ci.getMinValue()) : sumWeights;
463      ByteString first_sumWeights = ci.getProtoForPromotedType(s).toByteString();
464      AggregateResponse.Builder pair = AggregateResponse.newBuilder();
465      pair.addFirstPart(first_sumVal);
466      pair.addFirstPart(first_sumWeights);
467      response = pair.build();
468    } catch (IOException e) {
469      CoprocessorRpcUtils.setControllerException(controller, e);
470    } finally {
471      if (scanner != null) {
472        try {
473          scanner.close();
474        } catch (IOException ignored) {}
475      }
476    }
477    done.run(response);
478  }
479
480  @SuppressWarnings("unchecked")
481  // Used server-side too by Aggregation Coprocesor Endpoint. Undo this interdependence. TODO.
482  ColumnInterpreter<T,S,P,Q,R> constructColumnInterpreterFromRequest(
483      AggregateRequest request) throws IOException {
484    String className = request.getInterpreterClassName();
485    try {
486      ColumnInterpreter<T,S,P,Q,R> ci;
487      Class<?> cls = Class.forName(className);
488      ci = (ColumnInterpreter<T, S, P, Q, R>) cls.getDeclaredConstructor().newInstance();
489
490      if (request.hasInterpreterSpecificBytes()) {
491        ByteString b = request.getInterpreterSpecificBytes();
492        P initMsg = getParsedGenericInstance(ci.getClass(), 2, b);
493        ci.initialize(initMsg);
494      }
495      return ci;
496    } catch (ClassNotFoundException | InstantiationException | IllegalAccessException |
497        NoSuchMethodException | InvocationTargetException e) {
498      throw new IOException(e);
499    }
500  }
501
502  @Override
503  public Iterable<Service> getServices() {
504    return Collections.singleton(this);
505  }
506
507  /**
508   * Stores a reference to the coprocessor environment provided by the
509   * {@link org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost} from the region where this
510   * coprocessor is loaded.  Since this is a coprocessor endpoint, it always expects to be loaded
511   * on a table region, so always expects this to be an instance of
512   * {@link RegionCoprocessorEnvironment}.
513   * @param env the environment provided by the coprocessor host
514   * @throws IOException if the provided environment is not an instance of
515   * {@code RegionCoprocessorEnvironment}
516   */
517  @Override
518  public void start(CoprocessorEnvironment env) throws IOException {
519    if (env instanceof RegionCoprocessorEnvironment) {
520      this.env = (RegionCoprocessorEnvironment)env;
521    } else {
522      throw new CoprocessorException("Must be loaded on a table region!");
523    }
524  }
525
526  @Override
527  public void stop(CoprocessorEnvironment env) throws IOException {
528    // nothing to do
529  }
530}