001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.coprocessor;
019
020import com.google.protobuf.RpcCallback;
021import com.google.protobuf.RpcController;
022import com.google.protobuf.Service;
023
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.Collections;
027import java.util.List;
028
029import org.apache.hadoop.hbase.Cell;
030import org.apache.hadoop.hbase.CellUtil;
031import org.apache.hadoop.hbase.CoprocessorEnvironment;
032import org.apache.hadoop.hbase.DoNotRetryIOException;
033import org.apache.hadoop.hbase.HConstants;
034import org.apache.hadoop.hbase.client.Scan;
035import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithErrorsProtos;
036import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithErrorsProtos.ColumnAggregationWithErrorsSumRequest;
037import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithErrorsProtos.ColumnAggregationWithErrorsSumResponse;
038import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
039import org.apache.hadoop.hbase.regionserver.InternalScanner;
040import org.apache.hadoop.hbase.regionserver.Region;
041import org.apache.hadoop.hbase.util.Bytes;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044
045/**
046 * Test coprocessor endpoint that always throws a {@link DoNotRetryIOException} for requests on
047 * the last region in the table.  This allows tests to ensure correct error handling of
048 * coprocessor endpoints throwing exceptions.
049 */
050public class ColumnAggregationEndpointWithErrors
051        extends ColumnAggregationWithErrorsProtos.ColumnAggregationServiceWithErrors
052        implements RegionCoprocessor {
053  private static final Logger LOG =
054      LoggerFactory.getLogger(ColumnAggregationEndpointWithErrors.class);
055
056  private RegionCoprocessorEnvironment env = null;
057
058  @Override
059  public Iterable<Service> getServices() {
060    return Collections.singleton(this);
061  }
062
063  @Override
064  public void start(CoprocessorEnvironment env) throws IOException {
065    if (env instanceof RegionCoprocessorEnvironment) {
066      this.env = (RegionCoprocessorEnvironment)env;
067      return;
068    }
069    throw new CoprocessorException("Must be loaded on a table region!");
070  }
071
072  @Override
073  public void stop(CoprocessorEnvironment env) throws IOException {
074    // Nothing to do.
075  }
076
077  @Override
078  public void sum(RpcController controller, ColumnAggregationWithErrorsSumRequest request,
079          RpcCallback<ColumnAggregationWithErrorsSumResponse> done) {
080    // aggregate at each region
081    Scan scan = new Scan();
082    // Family is required in pb. Qualifier is not.
083    byte[] family = request.getFamily().toByteArray();
084    byte[] qualifier = request.hasQualifier() ? request.getQualifier().toByteArray() : null;
085    if (request.hasQualifier()) {
086      scan.addColumn(family, qualifier);
087    } else {
088      scan.addFamily(family);
089    }
090    int sumResult = 0;
091    InternalScanner scanner = null;
092    try {
093      Region region = this.env.getRegion();
094      // throw an exception for requests to the last region in the table, to test error handling
095      if (Bytes.equals(region.getRegionInfo().getEndKey(), HConstants.EMPTY_END_ROW)) {
096        throw new DoNotRetryIOException("An expected exception");
097      }
098      scanner = region.getScanner(scan);
099      List<Cell> curVals = new ArrayList<>();
100      boolean hasMore = false;
101      do {
102        curVals.clear();
103        hasMore = scanner.next(curVals);
104        for (Cell kv : curVals) {
105          if (CellUtil.matchingQualifier(kv, qualifier)) {
106            sumResult += Bytes.toInt(kv.getValueArray(), kv.getValueOffset());
107          }
108        }
109      } while (hasMore);
110    } catch (IOException e) {
111      CoprocessorRpcUtils.setControllerException(controller, e);
112      // Set result to -1 to indicate error.
113      sumResult = -1;
114      LOG.info("Setting sum result to -1 to indicate error", e);
115    } finally {
116      if (scanner != null) {
117        try {
118          scanner.close();
119        } catch (IOException e) {
120          CoprocessorRpcUtils.setControllerException(controller, e);
121          sumResult = -1;
122          LOG.info("Setting sum result to -1 to indicate error", e);
123        }
124      }
125    }
126    done.run(ColumnAggregationWithErrorsSumResponse.newBuilder().setSum(sumResult).build());
127  }
128}