001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.coprocessor;
019
020import com.google.protobuf.RpcCallback;
021import com.google.protobuf.RpcController;
022import com.google.protobuf.Service;
023
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.Collections;
027import java.util.List;
028
029import org.apache.hadoop.hbase.Cell;
030import org.apache.hadoop.hbase.CellUtil;
031import org.apache.hadoop.hbase.CoprocessorEnvironment;
032import org.apache.hadoop.hbase.HConstants;
033import org.apache.hadoop.hbase.client.Scan;
034import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithNullResponseProtos.ColumnAggregationNullResponseSumRequest;
035import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithNullResponseProtos.ColumnAggregationNullResponseSumResponse;
036import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithNullResponseProtos.ColumnAggregationServiceNullResponse;
037import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
038import org.apache.hadoop.hbase.regionserver.InternalScanner;
039import org.apache.hadoop.hbase.regionserver.Region;
040import org.apache.hadoop.hbase.util.Bytes;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043
044/**
045 * Test coprocessor endpoint that always returns {@code null} for requests to the last region
046 * in the table.  This allows tests to provide assurance of correct {@code null} handling for
047 * response values.
048 */
049public class ColumnAggregationEndpointNullResponse extends ColumnAggregationServiceNullResponse
050        implements RegionCoprocessor {
051  private static final Logger LOG =
052      LoggerFactory.getLogger(ColumnAggregationEndpointNullResponse.class);
053
054  private RegionCoprocessorEnvironment env = null;
055
056  @Override
057  public Iterable<Service> getServices() {
058    return Collections.singleton(this);
059  }
060
061  @Override
062  public void start(CoprocessorEnvironment env) throws IOException {
063    if (env instanceof RegionCoprocessorEnvironment) {
064      this.env = (RegionCoprocessorEnvironment)env;
065      return;
066    }
067    throw new CoprocessorException("Must be loaded on a table region!");
068  }
069
070  @Override
071  public void stop(CoprocessorEnvironment env) throws IOException {
072    // Nothing to do.
073  }
074
075  @Override
076  public void sum(RpcController controller, ColumnAggregationNullResponseSumRequest request,
077       RpcCallback<ColumnAggregationNullResponseSumResponse> done) {
078    // aggregate at each region
079    Scan scan = new Scan();
080    // Family is required in pb. Qualifier is not.
081    byte[] family = request.getFamily().toByteArray();
082    byte[] qualifier = request.hasQualifier() ? request.getQualifier().toByteArray() : null;
083    if (request.hasQualifier()) {
084      scan.addColumn(family, qualifier);
085    } else {
086      scan.addFamily(family);
087    }
088    int sumResult = 0;
089    InternalScanner scanner = null;
090    try {
091      Region region = this.env.getRegion();
092      // for the last region in the table, return null to test null handling
093      if (Bytes.equals(region.getRegionInfo().getEndKey(), HConstants.EMPTY_END_ROW)) {
094        done.run(null);
095        return;
096      }
097      scanner = region.getScanner(scan);
098      List<Cell> curVals = new ArrayList<>();
099      boolean hasMore = false;
100      do {
101        curVals.clear();
102        hasMore = scanner.next(curVals);
103        for (Cell kv : curVals) {
104          if (CellUtil.matchingQualifier(kv, qualifier)) {
105            sumResult += Bytes.toInt(kv.getValueArray(), kv.getValueOffset());
106          }
107        }
108      } while (hasMore);
109    } catch (IOException e) {
110      CoprocessorRpcUtils.setControllerException(controller, e);
111      // Set result to -1 to indicate error.
112      sumResult = -1;
113      LOG.info("Setting sum result to -1 to indicate error", e);
114    } finally {
115      if (scanner != null) {
116        try {
117          scanner.close();
118        } catch (IOException e) {
119          CoprocessorRpcUtils.setControllerException(controller, e);
120          sumResult = -1;
121          LOG.info("Setting sum result to -1 to indicate error", e);
122        }
123      }
124    }
125    done.run(ColumnAggregationNullResponseSumResponse.newBuilder().setSum(sumResult)
126      .build());
127    LOG.info("Returning sum " + sumResult + " for region " +
128        Bytes.toStringBinary(env.getRegion().getRegionInfo().getRegionName()));
129  }
130}