001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.coprocessor;
019
020import com.google.protobuf.RpcCallback;
021import com.google.protobuf.RpcController;
022import com.google.protobuf.Service;
023
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.Collections;
027import java.util.List;
028
029import org.apache.hadoop.hbase.Cell;
030import org.apache.hadoop.hbase.CellUtil;
031import org.apache.hadoop.hbase.CoprocessorEnvironment;
032import org.apache.hadoop.hbase.client.Scan;
033import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationProtos.ColumnAggregationService;
034import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationProtos.SumRequest;
035import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationProtos.SumResponse;
036import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
037import org.apache.hadoop.hbase.regionserver.InternalScanner;
038import org.apache.hadoop.hbase.util.Bytes;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041
042/**
043 * The aggregation implementation at a region.
044 */
045public class ColumnAggregationEndpoint extends ColumnAggregationService
046        implements RegionCoprocessor {
047  private static final Logger LOG = LoggerFactory.getLogger(ColumnAggregationEndpoint.class);
048  private RegionCoprocessorEnvironment env = null;
049
050  @Override
051  public Iterable<Service> getServices() {
052    return Collections.singleton(this);
053  }
054
055  @Override
056  public void start(CoprocessorEnvironment env) throws IOException {
057    if (env instanceof RegionCoprocessorEnvironment) {
058      this.env = (RegionCoprocessorEnvironment)env;
059      return;
060    }
061    throw new CoprocessorException("Must be loaded on a table region!");
062  }
063
064  @Override
065  public void stop(CoprocessorEnvironment env) throws IOException {
066    // Nothing to do.
067  }
068
069  @Override
070  public void sum(RpcController controller, SumRequest request, RpcCallback<SumResponse> done) {
071    // aggregate at each region
072    Scan scan = new Scan();
073    // Family is required in pb. Qualifier is not.
074    byte [] family = request.getFamily().toByteArray();
075    byte [] qualifier = request.hasQualifier()? request.getQualifier().toByteArray(): null;
076    if (request.hasQualifier()) {
077      scan.addColumn(family, qualifier);
078    } else {
079      scan.addFamily(family);
080    }
081    int sumResult = 0;
082    InternalScanner scanner = null;
083    try {
084      scanner = this.env.getRegion().getScanner(scan);
085      List<Cell> curVals = new ArrayList<>();
086      boolean hasMore = false;
087      do {
088        curVals.clear();
089        hasMore = scanner.next(curVals);
090        for (Cell kv : curVals) {
091          if (CellUtil.matchingQualifier(kv, qualifier)) {
092            sumResult += Bytes.toInt(kv.getValueArray(), kv.getValueOffset());
093          }
094        }
095      } while (hasMore);
096    } catch (IOException e) {
097      CoprocessorRpcUtils.setControllerException(controller, e);
098      // Set result to -1 to indicate error.
099      sumResult = -1;
100      LOG.info("Setting sum result to -1 to indicate error", e);
101    } finally {
102      if (scanner != null) {
103        try {
104          scanner.close();
105        } catch (IOException e) {
106          CoprocessorRpcUtils.setControllerException(controller, e);
107          sumResult = -1;
108          LOG.info("Setting sum result to -1 to indicate error", e);
109        }
110      }
111    }
112    LOG.info("Returning result " + sumResult);
113    done.run(SumResponse.newBuilder().setSum(sumResult).build());
114  }
115}