001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertNull; 022import static org.junit.Assert.assertTrue; 023 024import com.google.protobuf.RpcCallback; 025import com.google.protobuf.RpcController; 026import com.google.protobuf.Service; 027import com.google.protobuf.ServiceException; 028import java.io.IOException; 029import java.util.Collections; 030import java.util.Map; 031import org.apache.hadoop.hbase.CoprocessorEnvironment; 032import org.apache.hadoop.hbase.HBaseClassTestRule; 033import org.apache.hadoop.hbase.HBaseTestingUtility; 034import org.apache.hadoop.hbase.HRegionLocation; 035import org.apache.hadoop.hbase.TableName; 036import org.apache.hadoop.hbase.client.Put; 037import org.apache.hadoop.hbase.client.RegionLocator; 038import org.apache.hadoop.hbase.client.Table; 039import org.apache.hadoop.hbase.client.coprocessor.Batch; 040import org.apache.hadoop.hbase.coprocessor.CoprocessorException; 041import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 042import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 043import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 044import org.apache.hadoop.hbase.coprocessor.protobuf.generated.PingProtos; 045import org.apache.hadoop.hbase.coprocessor.protobuf.generated.PingProtos.CountRequest; 046import org.apache.hadoop.hbase.coprocessor.protobuf.generated.PingProtos.CountResponse; 047import org.apache.hadoop.hbase.coprocessor.protobuf.generated.PingProtos.HelloRequest; 048import org.apache.hadoop.hbase.coprocessor.protobuf.generated.PingProtos.HelloResponse; 049import org.apache.hadoop.hbase.coprocessor.protobuf.generated.PingProtos.IncrementCountRequest; 050import org.apache.hadoop.hbase.coprocessor.protobuf.generated.PingProtos.IncrementCountResponse; 051import org.apache.hadoop.hbase.coprocessor.protobuf.generated.PingProtos.NoopRequest; 052import org.apache.hadoop.hbase.coprocessor.protobuf.generated.PingProtos.NoopResponse; 053import org.apache.hadoop.hbase.coprocessor.protobuf.generated.PingProtos.PingRequest; 054import org.apache.hadoop.hbase.coprocessor.protobuf.generated.PingProtos.PingResponse; 055import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; 056import org.apache.hadoop.hbase.testclassification.MediumTests; 057import org.apache.hadoop.hbase.testclassification.RegionServerTests; 058import org.apache.hadoop.hbase.util.Bytes; 059import org.junit.After; 060import org.junit.AfterClass; 061import org.junit.Before; 062import org.junit.BeforeClass; 063import org.junit.ClassRule; 064import org.junit.Test; 065import org.junit.experimental.categories.Category; 066import org.slf4j.Logger; 067import org.slf4j.LoggerFactory; 068 069@Category({RegionServerTests.class, MediumTests.class}) 070public class TestServerCustomProtocol { 071 @ClassRule 072 public static final HBaseClassTestRule CLASS_RULE = 073 HBaseClassTestRule.forClass(TestServerCustomProtocol.class); 074 075 private static final Logger LOG = LoggerFactory.getLogger(TestServerCustomProtocol.class); 076 static final String WHOAREYOU = "Who are you?"; 077 static final String NOBODY = "nobody"; 078 static final String HELLO = "Hello, "; 079 080 /* Test protocol implementation */ 081 public static class PingHandler extends PingProtos.PingService implements RegionCoprocessor { 082 private int counter = 0; 083 084 @Override 085 public void start(CoprocessorEnvironment env) throws IOException { 086 if (env instanceof RegionCoprocessorEnvironment) { 087 return; 088 } 089 throw new CoprocessorException("Must be loaded on a table region!"); 090 } 091 092 @Override 093 public void stop(CoprocessorEnvironment env) throws IOException { 094 // Nothing to do. 095 } 096 097 @Override 098 public void ping(RpcController controller, PingRequest request, 099 RpcCallback<PingResponse> done) { 100 this.counter++; 101 done.run(PingResponse.newBuilder().setPong("pong").build()); 102 } 103 104 @Override 105 public void count(RpcController controller, CountRequest request, 106 RpcCallback<CountResponse> done) { 107 done.run(CountResponse.newBuilder().setCount(this.counter).build()); 108 } 109 110 @Override 111 public void increment(RpcController controller, 112 IncrementCountRequest request, RpcCallback<IncrementCountResponse> done) { 113 this.counter += request.getDiff(); 114 done.run(IncrementCountResponse.newBuilder().setCount(this.counter).build()); 115 } 116 117 @Override 118 public void hello(RpcController controller, HelloRequest request, 119 RpcCallback<HelloResponse> done) { 120 if (!request.hasName()) { 121 done.run(HelloResponse.newBuilder().setResponse(WHOAREYOU).build()); 122 } else if (request.getName().equals(NOBODY)) { 123 done.run(HelloResponse.newBuilder().build()); 124 } else { 125 done.run(HelloResponse.newBuilder().setResponse(HELLO + request.getName()).build()); 126 } 127 } 128 129 @Override 130 public void noop(RpcController controller, NoopRequest request, 131 RpcCallback<NoopResponse> done) { 132 done.run(NoopResponse.newBuilder().build()); 133 } 134 135 @Override 136 public Iterable<Service> getServices() { 137 return Collections.singleton(this); 138 } 139 } 140 141 private static final TableName TEST_TABLE = TableName.valueOf("test"); 142 private static final byte[] TEST_FAMILY = Bytes.toBytes("f1"); 143 144 private static final byte[] ROW_A = Bytes.toBytes("aaa"); 145 private static final byte[] ROW_B = Bytes.toBytes("bbb"); 146 private static final byte[] ROW_C = Bytes.toBytes("ccc"); 147 148 private static final byte[] ROW_AB = Bytes.toBytes("abb"); 149 private static final byte[] ROW_BC = Bytes.toBytes("bcc"); 150 151 private static HBaseTestingUtility util = new HBaseTestingUtility(); 152 153 @BeforeClass 154 public static void setupBeforeClass() throws Exception { 155 util.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, 156 PingHandler.class.getName()); 157 util.startMiniCluster(); 158 } 159 160 @Before 161 public void before() throws Exception { 162 final byte[][] SPLIT_KEYS = new byte[][] { ROW_B, ROW_C }; 163 Table table = util.createTable(TEST_TABLE, TEST_FAMILY, SPLIT_KEYS); 164 165 Put puta = new Put(ROW_A); 166 puta.addColumn(TEST_FAMILY, Bytes.toBytes("col1"), Bytes.toBytes(1)); 167 table.put(puta); 168 169 Put putb = new Put(ROW_B); 170 putb.addColumn(TEST_FAMILY, Bytes.toBytes("col1"), Bytes.toBytes(1)); 171 table.put(putb); 172 173 Put putc = new Put(ROW_C); 174 putc.addColumn(TEST_FAMILY, Bytes.toBytes("col1"), Bytes.toBytes(1)); 175 table.put(putc); 176 } 177 178 @After 179 public void after() throws Exception { 180 util.deleteTable(TEST_TABLE); 181 } 182 183 @AfterClass 184 public static void tearDownAfterClass() throws Exception { 185 util.shutdownMiniCluster(); 186 } 187 188 @Test 189 public void testSingleProxy() throws Throwable { 190 Table table = util.getConnection().getTable(TEST_TABLE); 191 Map<byte [], String> results = ping(table, null, null); 192 // There are three regions so should get back three results. 193 assertEquals(3, results.size()); 194 for (Map.Entry<byte [], String> e: results.entrySet()) { 195 assertEquals("Invalid custom protocol response", "pong", e.getValue()); 196 } 197 hello(table, "George", HELLO + "George"); 198 LOG.info("Did george"); 199 hello(table, null, "Who are you?"); 200 LOG.info("Who are you"); 201 hello(table, NOBODY, null); 202 LOG.info(NOBODY); 203 Map<byte [], Integer> intResults = table.coprocessorService(PingProtos.PingService.class, 204 null, null, 205 new Batch.Call<PingProtos.PingService, Integer>() { 206 @Override 207 public Integer call(PingProtos.PingService instance) throws IOException { 208 CoprocessorRpcUtils.BlockingRpcCallback<PingProtos.CountResponse> rpcCallback = 209 new CoprocessorRpcUtils.BlockingRpcCallback<>(); 210 instance.count(null, PingProtos.CountRequest.newBuilder().build(), rpcCallback); 211 return rpcCallback.get().getCount(); 212 } 213 }); 214 int count = -1; 215 for (Map.Entry<byte [], Integer> e: intResults.entrySet()) { 216 assertTrue(e.getValue() > 0); 217 count = e.getValue(); 218 } 219 final int diff = 5; 220 intResults = table.coprocessorService(PingProtos.PingService.class, 221 null, null, 222 new Batch.Call<PingProtos.PingService, Integer>() { 223 @Override 224 public Integer call(PingProtos.PingService instance) throws IOException { 225 CoprocessorRpcUtils.BlockingRpcCallback<PingProtos.IncrementCountResponse> rpcCallback = 226 new CoprocessorRpcUtils.BlockingRpcCallback<>(); 227 instance.increment(null, 228 PingProtos.IncrementCountRequest.newBuilder().setDiff(diff).build(), 229 rpcCallback); 230 return rpcCallback.get().getCount(); 231 } 232 }); 233 // There are three regions so should get back three results. 234 assertEquals(3, results.size()); 235 for (Map.Entry<byte [], Integer> e: intResults.entrySet()) { 236 assertEquals(e.getValue().intValue(), count + diff); 237 } 238 table.close(); 239 } 240 241 private Map<byte [], String> hello(final Table table, final String send, final String response) 242 throws ServiceException, Throwable { 243 Map<byte [], String> results = hello(table, send); 244 for (Map.Entry<byte [], String> e: results.entrySet()) { 245 assertEquals("Invalid custom protocol response", response, e.getValue()); 246 } 247 return results; 248 } 249 250 private Map<byte [], String> hello(final Table table, final String send) 251 throws ServiceException, Throwable { 252 return hello(table, send, null, null); 253 } 254 255 private Map<byte [], String> hello(final Table table, final String send, final byte [] start, 256 final byte [] end) throws ServiceException, Throwable { 257 return table.coprocessorService(PingProtos.PingService.class, 258 start, end, 259 new Batch.Call<PingProtos.PingService, String>() { 260 @Override 261 public String call(PingProtos.PingService instance) throws IOException { 262 CoprocessorRpcUtils.BlockingRpcCallback<PingProtos.HelloResponse> rpcCallback = 263 new CoprocessorRpcUtils.BlockingRpcCallback<>(); 264 PingProtos.HelloRequest.Builder builder = PingProtos.HelloRequest.newBuilder(); 265 if (send != null) { 266 builder.setName(send); 267 } 268 instance.hello(null, builder.build(), rpcCallback); 269 PingProtos.HelloResponse r = rpcCallback.get(); 270 return r != null && r.hasResponse()? r.getResponse(): null; 271 } 272 }); 273 } 274 275 private Map<byte [], String> compoundOfHelloAndPing(final Table table, final byte [] start, 276 final byte [] end) throws ServiceException, Throwable { 277 return table.coprocessorService(PingProtos.PingService.class, 278 start, end, 279 new Batch.Call<PingProtos.PingService, String>() { 280 @Override 281 public String call(PingProtos.PingService instance) throws IOException { 282 CoprocessorRpcUtils.BlockingRpcCallback<PingProtos.HelloResponse> rpcCallback = 283 new CoprocessorRpcUtils.BlockingRpcCallback<>(); 284 PingProtos.HelloRequest.Builder builder = PingProtos.HelloRequest.newBuilder(); 285 // Call ping on same instance. Use result calling hello on same instance. 286 builder.setName(doPing(instance)); 287 instance.hello(null, builder.build(), rpcCallback); 288 PingProtos.HelloResponse r = rpcCallback.get(); 289 return r != null && r.hasResponse()? r.getResponse(): null; 290 } 291 }); 292 } 293 294 private Map<byte [], String> noop(final Table table, final byte [] start, final byte [] end) 295 throws ServiceException, Throwable { 296 return table.coprocessorService(PingProtos.PingService.class, start, end, 297 new Batch.Call<PingProtos.PingService, String>() { 298 @Override 299 public String call(PingProtos.PingService instance) throws IOException { 300 CoprocessorRpcUtils.BlockingRpcCallback<PingProtos.NoopResponse> rpcCallback = 301 new CoprocessorRpcUtils.BlockingRpcCallback<>(); 302 PingProtos.NoopRequest.Builder builder = PingProtos.NoopRequest.newBuilder(); 303 instance.noop(null, builder.build(), rpcCallback); 304 rpcCallback.get(); 305 // Looks like null is expected when void. That is what the test below is looking for 306 return null; 307 } 308 }); 309 } 310 311 @Test 312 public void testSingleMethod() throws Throwable { 313 try (Table table = util.getConnection().getTable(TEST_TABLE); 314 RegionLocator locator = util.getConnection().getRegionLocator(TEST_TABLE)) { 315 Map<byte [], String> results = table.coprocessorService(PingProtos.PingService.class, 316 null, ROW_A, 317 new Batch.Call<PingProtos.PingService, String>() { 318 @Override 319 public String call(PingProtos.PingService instance) throws IOException { 320 CoprocessorRpcUtils.BlockingRpcCallback<PingProtos.PingResponse> rpcCallback = 321 new CoprocessorRpcUtils.BlockingRpcCallback<>(); 322 instance.ping(null, PingProtos.PingRequest.newBuilder().build(), rpcCallback); 323 return rpcCallback.get().getPong(); 324 } 325 }); 326 // Should have gotten results for 1 of the three regions only since we specified 327 // rows from 1 region 328 assertEquals(1, results.size()); 329 verifyRegionResults(locator, results, ROW_A); 330 331 final String name = "NAME"; 332 results = hello(table, name, null, ROW_A); 333 // Should have gotten results for 1 of the three regions only since we specified 334 // rows from 1 region 335 assertEquals(1, results.size()); 336 verifyRegionResults(locator, results, "Hello, NAME", ROW_A); 337 } 338 } 339 340 @Test 341 public void testRowRange() throws Throwable { 342 try (Table table = util.getConnection().getTable(TEST_TABLE); 343 RegionLocator locator = util.getConnection().getRegionLocator(TEST_TABLE)) { 344 for (HRegionLocation e: locator.getAllRegionLocations()) { 345 LOG.info("Region " + e.getRegionInfo().getRegionNameAsString() 346 + ", servername=" + e.getServerName()); 347 } 348 // Here are what regions looked like on a run: 349 // 350 // test,,1355943549657.c65d4822d8bdecc033a96451f3a0f55d. 351 // test,bbb,1355943549661.110393b070dd1ed93441e0bc9b3ffb7e. 352 // test,ccc,1355943549665.c3d6d125141359cbbd2a43eaff3cdf74. 353 354 Map<byte [], String> results = ping(table, null, ROW_A); 355 // Should contain first region only. 356 assertEquals(1, results.size()); 357 verifyRegionResults(locator, results, ROW_A); 358 359 // Test start row + empty end 360 results = ping(table, ROW_BC, null); 361 assertEquals(2, results.size()); 362 // should contain last 2 regions 363 HRegionLocation loc = locator.getRegionLocation(ROW_A, true); 364 assertNull("Should be missing region for row aaa (prior to start row)", 365 results.get(loc.getRegionInfo().getRegionName())); 366 verifyRegionResults(locator, results, ROW_B); 367 verifyRegionResults(locator, results, ROW_C); 368 369 // test empty start + end 370 results = ping(table, null, ROW_BC); 371 // should contain the first 2 regions 372 assertEquals(2, results.size()); 373 verifyRegionResults(locator, results, ROW_A); 374 verifyRegionResults(locator, results, ROW_B); 375 loc = locator.getRegionLocation(ROW_C, true); 376 assertNull("Should be missing region for row ccc (past stop row)", 377 results.get(loc.getRegionInfo().getRegionName())); 378 379 // test explicit start + end 380 results = ping(table, ROW_AB, ROW_BC); 381 // should contain first 2 regions 382 assertEquals(2, results.size()); 383 verifyRegionResults(locator, results, ROW_A); 384 verifyRegionResults(locator, results, ROW_B); 385 loc = locator.getRegionLocation(ROW_C, true); 386 assertNull("Should be missing region for row ccc (past stop row)", 387 results.get(loc.getRegionInfo().getRegionName())); 388 389 // test single region 390 results = ping(table, ROW_B, ROW_BC); 391 // should only contain region bbb 392 assertEquals(1, results.size()); 393 verifyRegionResults(locator, results, ROW_B); 394 loc = locator.getRegionLocation(ROW_A, true); 395 assertNull("Should be missing region for row aaa (prior to start)", 396 results.get(loc.getRegionInfo().getRegionName())); 397 loc = locator.getRegionLocation(ROW_C, true); 398 assertNull("Should be missing region for row ccc (past stop row)", 399 results.get(loc.getRegionInfo().getRegionName())); 400 } 401 } 402 403 private Map<byte [], String> ping(final Table table, final byte [] start, final byte [] end) 404 throws ServiceException, Throwable { 405 return table.coprocessorService(PingProtos.PingService.class, start, end, 406 new Batch.Call<PingProtos.PingService, String>() { 407 @Override 408 public String call(PingProtos.PingService instance) throws IOException { 409 return doPing(instance); 410 } 411 }); 412 } 413 414 private static String doPing(PingProtos.PingService instance) throws IOException { 415 CoprocessorRpcUtils.BlockingRpcCallback<PingProtos.PingResponse> rpcCallback = 416 new CoprocessorRpcUtils.BlockingRpcCallback<>(); 417 instance.ping(null, PingProtos.PingRequest.newBuilder().build(), rpcCallback); 418 return rpcCallback.get().getPong(); 419 } 420 421 @Test 422 public void testCompoundCall() throws Throwable { 423 try (Table table = util.getConnection().getTable(TEST_TABLE); 424 RegionLocator locator = util.getConnection().getRegionLocator(TEST_TABLE)) { 425 Map<byte [], String> results = compoundOfHelloAndPing(table, ROW_A, ROW_C); 426 verifyRegionResults(locator, results, "Hello, pong", ROW_A); 427 verifyRegionResults(locator, results, "Hello, pong", ROW_B); 428 verifyRegionResults(locator, results, "Hello, pong", ROW_C); 429 } 430 } 431 432 @Test 433 public void testNullCall() throws Throwable { 434 try (Table table = util.getConnection().getTable(TEST_TABLE); 435 RegionLocator locator = util.getConnection().getRegionLocator(TEST_TABLE)) { 436 Map<byte[],String> results = hello(table, null, ROW_A, ROW_C); 437 verifyRegionResults(locator, results, "Who are you?", ROW_A); 438 verifyRegionResults(locator, results, "Who are you?", ROW_B); 439 verifyRegionResults(locator, results, "Who are you?", ROW_C); 440 } 441 } 442 443 @Test 444 public void testNullReturn() throws Throwable { 445 try (Table table = util.getConnection().getTable(TEST_TABLE); 446 RegionLocator locator = util.getConnection().getRegionLocator(TEST_TABLE)) { 447 Map<byte[],String> results = hello(table, "nobody", ROW_A, ROW_C); 448 verifyRegionResults(locator, results, null, ROW_A); 449 verifyRegionResults(locator, results, null, ROW_B); 450 verifyRegionResults(locator, results, null, ROW_C); 451 } 452 } 453 454 @Test 455 public void testEmptyReturnType() throws Throwable { 456 try (Table table = util.getConnection().getTable(TEST_TABLE)) { 457 Map<byte[],String> results = noop(table, ROW_A, ROW_C); 458 assertEquals("Should have results from three regions", 3, results.size()); 459 // all results should be null 460 for (Object v : results.values()) { 461 assertNull(v); 462 } 463 } 464 } 465 466 private void verifyRegionResults(RegionLocator table, Map<byte[],String> results, byte[] row) 467 throws Exception { 468 verifyRegionResults(table, results, "pong", row); 469 } 470 471 private void verifyRegionResults(RegionLocator regionLocator, Map<byte[], String> results, 472 String expected, byte[] row) throws Exception { 473 for (Map.Entry<byte [], String> e: results.entrySet()) { 474 LOG.info("row=" + Bytes.toString(row) + ", expected=" + expected + 475 ", result key=" + Bytes.toString(e.getKey()) + 476 ", value=" + e.getValue()); 477 } 478 HRegionLocation loc = regionLocator.getRegionLocation(row, true); 479 byte[] region = loc.getRegionInfo().getRegionName(); 480 assertTrue("Results should contain region " + 481 Bytes.toStringBinary(region) + " for row '" + Bytes.toStringBinary(row)+ "'", 482 results.containsKey(region)); 483 assertEquals("Invalid result for row '"+Bytes.toStringBinary(row)+"'", 484 expected, results.get(region)); 485 } 486}