001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.coprocessor; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertNotNull; 022import static org.junit.Assert.assertNull; 023import static org.junit.Assert.assertTrue; 024import static org.junit.Assert.fail; 025 026import com.google.protobuf.RpcController; 027import com.google.protobuf.ServiceException; 028import java.io.IOException; 029import java.util.Collections; 030import java.util.List; 031import java.util.Map; 032import java.util.TreeMap; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.hbase.HBaseClassTestRule; 035import org.apache.hadoop.hbase.HBaseTestingUtility; 036import org.apache.hadoop.hbase.HColumnDescriptor; 037import org.apache.hadoop.hbase.HConstants; 038import org.apache.hadoop.hbase.HRegionInfo; 039import org.apache.hadoop.hbase.HRegionLocation; 040import org.apache.hadoop.hbase.HTableDescriptor; 041import org.apache.hadoop.hbase.TableName; 042import org.apache.hadoop.hbase.client.Admin; 043import org.apache.hadoop.hbase.client.Put; 044import org.apache.hadoop.hbase.client.RegionLocator; 045import org.apache.hadoop.hbase.client.Table; 046import org.apache.hadoop.hbase.client.coprocessor.Batch; 047import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationProtos; 048import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; 049import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; 050import org.apache.hadoop.hbase.ipc.ServerRpcController; 051import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos; 052import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos; 053import org.apache.hadoop.hbase.testclassification.CoprocessorTests; 054import org.apache.hadoop.hbase.testclassification.MediumTests; 055import org.apache.hadoop.hbase.util.ByteStringer; 056import org.apache.hadoop.hbase.util.Bytes; 057import org.junit.AfterClass; 058import org.junit.BeforeClass; 059import org.junit.ClassRule; 060import org.junit.Test; 061import org.junit.experimental.categories.Category; 062import org.slf4j.Logger; 063import org.slf4j.LoggerFactory; 064 065/** 066 * TestEndpoint: test cases to verify coprocessor Endpoint 067 */ 068@Category({CoprocessorTests.class, MediumTests.class}) 069public class TestCoprocessorEndpoint { 070 @ClassRule 071 public static final HBaseClassTestRule CLASS_RULE = 072 HBaseClassTestRule.forClass(TestCoprocessorEndpoint.class); 073 074 private static final Logger LOG = LoggerFactory.getLogger(TestCoprocessorEndpoint.class); 075 076 private static final TableName TEST_TABLE = 077 TableName.valueOf("TestCoprocessorEndpoint"); 078 private static final byte[] TEST_FAMILY = Bytes.toBytes("TestFamily"); 079 private static final byte[] TEST_QUALIFIER = Bytes.toBytes("TestQualifier"); 080 private static byte[] ROW = Bytes.toBytes("testRow"); 081 082 private static final int ROWSIZE = 20; 083 private static final int rowSeperator1 = 5; 084 private static final int rowSeperator2 = 12; 085 private static byte[][] ROWS = makeN(ROW, ROWSIZE); 086 087 private static HBaseTestingUtility util = new HBaseTestingUtility(); 088 089 @BeforeClass 090 public static void setupBeforeClass() throws Exception { 091 // set configure to indicate which cp should be loaded 092 Configuration conf = util.getConfiguration(); 093 conf.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 5000); 094 conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, 095 org.apache.hadoop.hbase.coprocessor.ColumnAggregationEndpoint.class.getName(), 096 ProtobufCoprocessorService.class.getName()); 097 conf.setStrings(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, 098 ProtobufCoprocessorService.class.getName()); 099 util.startMiniCluster(2); 100 Admin admin = util.getAdmin(); 101 HTableDescriptor desc = new HTableDescriptor(TEST_TABLE); 102 desc.addFamily(new HColumnDescriptor(TEST_FAMILY)); 103 admin.createTable(desc, new byte[][]{ROWS[rowSeperator1], ROWS[rowSeperator2]}); 104 util.waitUntilAllRegionsAssigned(TEST_TABLE); 105 106 Table table = util.getConnection().getTable(TEST_TABLE); 107 for (int i = 0; i < ROWSIZE; i++) { 108 Put put = new Put(ROWS[i]); 109 put.addColumn(TEST_FAMILY, TEST_QUALIFIER, Bytes.toBytes(i)); 110 table.put(put); 111 } 112 table.close(); 113 } 114 115 @AfterClass 116 public static void tearDownAfterClass() throws Exception { 117 util.shutdownMiniCluster(); 118 } 119 120 private Map<byte [], Long> sum(final Table table, final byte [] family, 121 final byte [] qualifier, final byte [] start, final byte [] end) 122 throws ServiceException, Throwable { 123 return table.coprocessorService(ColumnAggregationProtos.ColumnAggregationService.class, 124 start, end, 125 new Batch.Call<ColumnAggregationProtos.ColumnAggregationService, Long>() { 126 @Override 127 public Long call(ColumnAggregationProtos.ColumnAggregationService instance) 128 throws IOException { 129 CoprocessorRpcUtils.BlockingRpcCallback<ColumnAggregationProtos.SumResponse> rpcCallback = 130 new CoprocessorRpcUtils.BlockingRpcCallback<>(); 131 ColumnAggregationProtos.SumRequest.Builder builder = 132 ColumnAggregationProtos.SumRequest.newBuilder(); 133 builder.setFamily(ByteStringer.wrap(family)); 134 if (qualifier != null && qualifier.length > 0) { 135 builder.setQualifier(ByteStringer.wrap(qualifier)); 136 } 137 instance.sum(null, builder.build(), rpcCallback); 138 return rpcCallback.get().getSum(); 139 } 140 }); 141 } 142 143 @Test 144 public void testAggregation() throws Throwable { 145 Table table = util.getConnection().getTable(TEST_TABLE); 146 Map<byte[], Long> results = sum(table, TEST_FAMILY, TEST_QUALIFIER, 147 ROWS[0], ROWS[ROWS.length-1]); 148 int sumResult = 0; 149 int expectedResult = 0; 150 for (Map.Entry<byte[], Long> e : results.entrySet()) { 151 LOG.info("Got value "+e.getValue()+" for region "+Bytes.toStringBinary(e.getKey())); 152 sumResult += e.getValue(); 153 } 154 for (int i = 0; i < ROWSIZE; i++) { 155 expectedResult += i; 156 } 157 assertEquals("Invalid result", expectedResult, sumResult); 158 159 results.clear(); 160 161 // scan: for region 2 and region 3 162 results = sum(table, TEST_FAMILY, TEST_QUALIFIER, 163 ROWS[rowSeperator1], ROWS[ROWS.length-1]); 164 sumResult = 0; 165 expectedResult = 0; 166 for (Map.Entry<byte[], Long> e : results.entrySet()) { 167 LOG.info("Got value "+e.getValue()+" for region "+Bytes.toStringBinary(e.getKey())); 168 sumResult += e.getValue(); 169 } 170 for (int i = rowSeperator1; i < ROWSIZE; i++) { 171 expectedResult += i; 172 } 173 assertEquals("Invalid result", expectedResult, sumResult); 174 table.close(); 175 } 176 177 @Test 178 public void testCoprocessorService() throws Throwable { 179 Table table = util.getConnection().getTable(TEST_TABLE); 180 181 List<HRegionLocation> regions; 182 try(RegionLocator rl = util.getConnection().getRegionLocator(TEST_TABLE)) { 183 regions = rl.getAllRegionLocations(); 184 } 185 final TestProtos.EchoRequestProto request = 186 TestProtos.EchoRequestProto.newBuilder().setMessage("hello").build(); 187 final Map<byte[], String> results = Collections.synchronizedMap( 188 new TreeMap<byte[], String>(Bytes.BYTES_COMPARATOR)); 189 try { 190 // scan: for all regions 191 final RpcController controller = new ServerRpcController(); 192 table.coprocessorService(TestRpcServiceProtos.TestProtobufRpcProto.class, 193 ROWS[0], ROWS[ROWS.length - 1], 194 new Batch.Call<TestRpcServiceProtos.TestProtobufRpcProto, TestProtos.EchoResponseProto>() { 195 @Override 196 public TestProtos.EchoResponseProto call( 197 TestRpcServiceProtos.TestProtobufRpcProto instance) throws IOException { 198 LOG.debug("Default response is " + TestProtos.EchoRequestProto.getDefaultInstance()); 199 CoprocessorRpcUtils.BlockingRpcCallback<TestProtos.EchoResponseProto> callback = 200 new CoprocessorRpcUtils.BlockingRpcCallback<>(); 201 instance.echo(controller, request, callback); 202 TestProtos.EchoResponseProto response = callback.get(); 203 LOG.debug("Batch.Call returning result " + response); 204 return response; 205 } 206 }, 207 new Batch.Callback<TestProtos.EchoResponseProto>() { 208 @Override 209 public void update(byte[] region, byte[] row, TestProtos.EchoResponseProto result) { 210 assertNotNull(result); 211 assertEquals("hello", result.getMessage()); 212 results.put(region, result.getMessage()); 213 } 214 } 215 ); 216 for (Map.Entry<byte[], String> e : results.entrySet()) { 217 LOG.info("Got value "+e.getValue()+" for region "+Bytes.toStringBinary(e.getKey())); 218 } 219 assertEquals(3, results.size()); 220 for (HRegionLocation info : regions) { 221 LOG.info("Region info is "+info.getRegionInfo().getRegionNameAsString()); 222 assertTrue(results.containsKey(info.getRegionInfo().getRegionName())); 223 } 224 results.clear(); 225 226 // scan: for region 2 and region 3 227 table.coprocessorService(TestRpcServiceProtos.TestProtobufRpcProto.class, 228 ROWS[rowSeperator1], ROWS[ROWS.length - 1], 229 new Batch.Call<TestRpcServiceProtos.TestProtobufRpcProto, TestProtos.EchoResponseProto>() { 230 @Override 231 public TestProtos.EchoResponseProto call( 232 TestRpcServiceProtos.TestProtobufRpcProto instance) throws IOException { 233 LOG.debug("Default response is " + TestProtos.EchoRequestProto.getDefaultInstance()); 234 CoprocessorRpcUtils.BlockingRpcCallback<TestProtos.EchoResponseProto> callback = 235 new CoprocessorRpcUtils.BlockingRpcCallback<>(); 236 instance.echo(controller, request, callback); 237 TestProtos.EchoResponseProto response = callback.get(); 238 LOG.debug("Batch.Call returning result " + response); 239 return response; 240 } 241 }, 242 new Batch.Callback<TestProtos.EchoResponseProto>() { 243 @Override 244 public void update(byte[] region, byte[] row, TestProtos.EchoResponseProto result) { 245 assertNotNull(result); 246 assertEquals("hello", result.getMessage()); 247 results.put(region, result.getMessage()); 248 } 249 } 250 ); 251 for (Map.Entry<byte[], String> e : results.entrySet()) { 252 LOG.info("Got value "+e.getValue()+" for region "+Bytes.toStringBinary(e.getKey())); 253 } 254 assertEquals(2, results.size()); 255 } finally { 256 table.close(); 257 } 258 } 259 260 @Test 261 public void testCoprocessorServiceNullResponse() throws Throwable { 262 Table table = util.getConnection().getTable(TEST_TABLE); 263 List<HRegionLocation> regions; 264 try(RegionLocator rl = util.getConnection().getRegionLocator(TEST_TABLE)) { 265 regions = rl.getAllRegionLocations(); 266 } 267 268 final TestProtos.EchoRequestProto request = 269 TestProtos.EchoRequestProto.newBuilder().setMessage("hello").build(); 270 try { 271 // scan: for all regions 272 final RpcController controller = new ServerRpcController(); 273 // test that null results are supported 274 Map<byte[], String> results = 275 table.coprocessorService(TestRpcServiceProtos.TestProtobufRpcProto.class, 276 ROWS[0], ROWS[ROWS.length - 1], 277 new Batch.Call<TestRpcServiceProtos.TestProtobufRpcProto, String>() { 278 public String call(TestRpcServiceProtos.TestProtobufRpcProto instance) 279 throws IOException { 280 CoprocessorRpcUtils.BlockingRpcCallback<TestProtos.EchoResponseProto> callback = 281 new CoprocessorRpcUtils.BlockingRpcCallback<>(); 282 instance.echo(controller, request, callback); 283 TestProtos.EchoResponseProto response = callback.get(); 284 LOG.debug("Batch.Call got result " + response); 285 return null; 286 } 287 } 288 ); 289 for (Map.Entry<byte[], String> e : results.entrySet()) { 290 LOG.info("Got value "+e.getValue()+" for region "+Bytes.toStringBinary(e.getKey())); 291 } 292 assertEquals(3, results.size()); 293 for (HRegionLocation region : regions) { 294 HRegionInfo info = region.getRegionInfo(); 295 LOG.info("Region info is "+info.getRegionNameAsString()); 296 assertTrue(results.containsKey(info.getRegionName())); 297 assertNull(results.get(info.getRegionName())); 298 } 299 } finally { 300 table.close(); 301 } 302 } 303 304 @Test 305 public void testMasterCoprocessorService() throws Throwable { 306 Admin admin = util.getAdmin(); 307 final TestProtos.EchoRequestProto request = 308 TestProtos.EchoRequestProto.newBuilder().setMessage("hello").build(); 309 TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface service = 310 TestRpcServiceProtos.TestProtobufRpcProto.newBlockingStub(admin.coprocessorService()); 311 assertEquals("hello", service.echo(null, request).getMessage()); 312 } 313 314 @Test 315 public void testCoprocessorError() throws Exception { 316 Configuration configuration = new Configuration(util.getConfiguration()); 317 // Make it not retry forever 318 configuration.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); 319 Table table = util.getConnection().getTable(TEST_TABLE); 320 321 try { 322 CoprocessorRpcChannel protocol = table.coprocessorService(ROWS[0]); 323 324 TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface service = 325 TestRpcServiceProtos.TestProtobufRpcProto.newBlockingStub(protocol); 326 327 service.error(null, TestProtos.EmptyRequestProto.getDefaultInstance()); 328 fail("Should have thrown an exception"); 329 } catch (ServiceException e) { 330 } finally { 331 table.close(); 332 } 333 } 334 335 @Test 336 public void testMasterCoprocessorError() throws Throwable { 337 Admin admin = util.getAdmin(); 338 TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface service = 339 TestRpcServiceProtos.TestProtobufRpcProto.newBlockingStub(admin.coprocessorService()); 340 try { 341 service.error(null, TestProtos.EmptyRequestProto.getDefaultInstance()); 342 fail("Should have thrown an exception"); 343 } catch (ServiceException e) { 344 } 345 } 346 347 private static byte[][] makeN(byte[] base, int n) { 348 byte[][] ret = new byte[n][]; 349 for (int i = 0; i < n; i++) { 350 ret[i] = Bytes.add(base, Bytes.toBytes(String.format("%02d", i))); 351 } 352 return ret; 353 } 354}