001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.coprocessor; 019 020import com.google.protobuf.RpcCallback; 021import com.google.protobuf.RpcController; 022import com.google.protobuf.Service; 023 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.Collections; 027import java.util.List; 028 029import org.apache.hadoop.hbase.Cell; 030import org.apache.hadoop.hbase.CellUtil; 031import org.apache.hadoop.hbase.CoprocessorEnvironment; 032import org.apache.hadoop.hbase.HConstants; 033import org.apache.hadoop.hbase.client.Scan; 034import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithNullResponseProtos.ColumnAggregationNullResponseSumRequest; 035import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithNullResponseProtos.ColumnAggregationNullResponseSumResponse; 036import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithNullResponseProtos.ColumnAggregationServiceNullResponse; 037import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; 038import org.apache.hadoop.hbase.regionserver.InternalScanner; 039import org.apache.hadoop.hbase.regionserver.Region; 040import org.apache.hadoop.hbase.util.Bytes; 041import org.slf4j.Logger; 042import org.slf4j.LoggerFactory; 043 044/** 045 * Test coprocessor endpoint that always returns {@code null} for requests to the last region 046 * in the table. This allows tests to provide assurance of correct {@code null} handling for 047 * response values. 048 */ 049public class ColumnAggregationEndpointNullResponse extends ColumnAggregationServiceNullResponse 050 implements RegionCoprocessor { 051 private static final Logger LOG = 052 LoggerFactory.getLogger(ColumnAggregationEndpointNullResponse.class); 053 054 private RegionCoprocessorEnvironment env = null; 055 056 @Override 057 public Iterable<Service> getServices() { 058 return Collections.singleton(this); 059 } 060 061 @Override 062 public void start(CoprocessorEnvironment env) throws IOException { 063 if (env instanceof RegionCoprocessorEnvironment) { 064 this.env = (RegionCoprocessorEnvironment)env; 065 return; 066 } 067 throw new CoprocessorException("Must be loaded on a table region!"); 068 } 069 070 @Override 071 public void stop(CoprocessorEnvironment env) throws IOException { 072 // Nothing to do. 073 } 074 075 @Override 076 public void sum(RpcController controller, ColumnAggregationNullResponseSumRequest request, 077 RpcCallback<ColumnAggregationNullResponseSumResponse> done) { 078 // aggregate at each region 079 Scan scan = new Scan(); 080 // Family is required in pb. Qualifier is not. 081 byte[] family = request.getFamily().toByteArray(); 082 byte[] qualifier = request.hasQualifier() ? request.getQualifier().toByteArray() : null; 083 if (request.hasQualifier()) { 084 scan.addColumn(family, qualifier); 085 } else { 086 scan.addFamily(family); 087 } 088 int sumResult = 0; 089 InternalScanner scanner = null; 090 try { 091 Region region = this.env.getRegion(); 092 // for the last region in the table, return null to test null handling 093 if (Bytes.equals(region.getRegionInfo().getEndKey(), HConstants.EMPTY_END_ROW)) { 094 done.run(null); 095 return; 096 } 097 scanner = region.getScanner(scan); 098 List<Cell> curVals = new ArrayList<>(); 099 boolean hasMore = false; 100 do { 101 curVals.clear(); 102 hasMore = scanner.next(curVals); 103 for (Cell kv : curVals) { 104 if (CellUtil.matchingQualifier(kv, qualifier)) { 105 sumResult += Bytes.toInt(kv.getValueArray(), kv.getValueOffset()); 106 } 107 } 108 } while (hasMore); 109 } catch (IOException e) { 110 CoprocessorRpcUtils.setControllerException(controller, e); 111 // Set result to -1 to indicate error. 112 sumResult = -1; 113 LOG.info("Setting sum result to -1 to indicate error", e); 114 } finally { 115 if (scanner != null) { 116 try { 117 scanner.close(); 118 } catch (IOException e) { 119 CoprocessorRpcUtils.setControllerException(controller, e); 120 sumResult = -1; 121 LOG.info("Setting sum result to -1 to indicate error", e); 122 } 123 } 124 } 125 done.run(ColumnAggregationNullResponseSumResponse.newBuilder().setSum(sumResult) 126 .build()); 127 LOG.info("Returning sum " + sumResult + " for region " + 128 Bytes.toStringBinary(env.getRegion().getRegionInfo().getRegionName())); 129 } 130}