001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.coprocessor; 019 020import com.google.protobuf.RpcCallback; 021import com.google.protobuf.RpcController; 022import com.google.protobuf.Service; 023 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.Collections; 027import java.util.List; 028 029import org.apache.hadoop.hbase.Cell; 030import org.apache.hadoop.hbase.CellUtil; 031import org.apache.hadoop.hbase.CoprocessorEnvironment; 032import org.apache.hadoop.hbase.DoNotRetryIOException; 033import org.apache.hadoop.hbase.HConstants; 034import org.apache.hadoop.hbase.client.Scan; 035import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithErrorsProtos; 036import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithErrorsProtos.ColumnAggregationWithErrorsSumRequest; 037import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithErrorsProtos.ColumnAggregationWithErrorsSumResponse; 038import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; 039import org.apache.hadoop.hbase.regionserver.InternalScanner; 040import org.apache.hadoop.hbase.regionserver.Region; 041import org.apache.hadoop.hbase.util.Bytes; 042import org.slf4j.Logger; 043import org.slf4j.LoggerFactory; 044 045/** 046 * Test coprocessor endpoint that always throws a {@link DoNotRetryIOException} for requests on 047 * the last region in the table. This allows tests to ensure correct error handling of 048 * coprocessor endpoints throwing exceptions. 049 */ 050public class ColumnAggregationEndpointWithErrors 051 extends ColumnAggregationWithErrorsProtos.ColumnAggregationServiceWithErrors 052 implements RegionCoprocessor { 053 private static final Logger LOG = 054 LoggerFactory.getLogger(ColumnAggregationEndpointWithErrors.class); 055 056 private RegionCoprocessorEnvironment env = null; 057 058 @Override 059 public Iterable<Service> getServices() { 060 return Collections.singleton(this); 061 } 062 063 @Override 064 public void start(CoprocessorEnvironment env) throws IOException { 065 if (env instanceof RegionCoprocessorEnvironment) { 066 this.env = (RegionCoprocessorEnvironment)env; 067 return; 068 } 069 throw new CoprocessorException("Must be loaded on a table region!"); 070 } 071 072 @Override 073 public void stop(CoprocessorEnvironment env) throws IOException { 074 // Nothing to do. 075 } 076 077 @Override 078 public void sum(RpcController controller, ColumnAggregationWithErrorsSumRequest request, 079 RpcCallback<ColumnAggregationWithErrorsSumResponse> done) { 080 // aggregate at each region 081 Scan scan = new Scan(); 082 // Family is required in pb. Qualifier is not. 083 byte[] family = request.getFamily().toByteArray(); 084 byte[] qualifier = request.hasQualifier() ? request.getQualifier().toByteArray() : null; 085 if (request.hasQualifier()) { 086 scan.addColumn(family, qualifier); 087 } else { 088 scan.addFamily(family); 089 } 090 int sumResult = 0; 091 InternalScanner scanner = null; 092 try { 093 Region region = this.env.getRegion(); 094 // throw an exception for requests to the last region in the table, to test error handling 095 if (Bytes.equals(region.getRegionInfo().getEndKey(), HConstants.EMPTY_END_ROW)) { 096 throw new DoNotRetryIOException("An expected exception"); 097 } 098 scanner = region.getScanner(scan); 099 List<Cell> curVals = new ArrayList<>(); 100 boolean hasMore = false; 101 do { 102 curVals.clear(); 103 hasMore = scanner.next(curVals); 104 for (Cell kv : curVals) { 105 if (CellUtil.matchingQualifier(kv, qualifier)) { 106 sumResult += Bytes.toInt(kv.getValueArray(), kv.getValueOffset()); 107 } 108 } 109 } while (hasMore); 110 } catch (IOException e) { 111 CoprocessorRpcUtils.setControllerException(controller, e); 112 // Set result to -1 to indicate error. 113 sumResult = -1; 114 LOG.info("Setting sum result to -1 to indicate error", e); 115 } finally { 116 if (scanner != null) { 117 try { 118 scanner.close(); 119 } catch (IOException e) { 120 CoprocessorRpcUtils.setControllerException(controller, e); 121 sumResult = -1; 122 LOG.info("Setting sum result to -1 to indicate error", e); 123 } 124 } 125 } 126 done.run(ColumnAggregationWithErrorsSumResponse.newBuilder().setSum(sumResult).build()); 127 } 128}