001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertTrue; 021import static org.junit.Assert.fail; 022 023import java.io.IOException; 024import java.net.SocketTimeoutException; 025import java.util.Comparator; 026import java.util.HashMap; 027import java.util.Map; 028import java.util.Objects; 029import java.util.Random; 030import java.util.SortedMap; 031import java.util.concurrent.CompletableFuture; 032import java.util.concurrent.ConcurrentSkipListMap; 033import java.util.concurrent.ExecutorService; 034import java.util.concurrent.Executors; 035import java.util.concurrent.atomic.AtomicInteger; 036import java.util.concurrent.atomic.AtomicLong; 037import org.apache.commons.lang3.NotImplementedException; 038import org.apache.hadoop.conf.Configuration; 039import org.apache.hadoop.conf.Configured; 040import org.apache.hadoop.hbase.CellComparatorImpl; 041import org.apache.hadoop.hbase.DoNotRetryIOException; 042import org.apache.hadoop.hbase.HBaseClassTestRule; 043import org.apache.hadoop.hbase.HBaseConfiguration; 044import org.apache.hadoop.hbase.HConstants; 045import org.apache.hadoop.hbase.HRegionInfo; 046import org.apache.hadoop.hbase.HRegionLocation; 047import org.apache.hadoop.hbase.KeyValue; 048import org.apache.hadoop.hbase.MetaTableAccessor; 049import org.apache.hadoop.hbase.RegionLocations; 050import org.apache.hadoop.hbase.RegionTooBusyException; 051import org.apache.hadoop.hbase.ServerName; 052import org.apache.hadoop.hbase.TableName; 053import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; 054import org.apache.hadoop.hbase.security.User; 055import org.apache.hadoop.hbase.testclassification.ClientTests; 056import org.apache.hadoop.hbase.testclassification.SmallTests; 057import org.apache.hadoop.hbase.util.Bytes; 058import org.apache.hadoop.hbase.util.Pair; 059import org.apache.hadoop.hbase.util.Threads; 060import org.apache.hadoop.util.Tool; 061import org.apache.hadoop.util.ToolRunner; 062import org.junit.Before; 063import org.junit.ClassRule; 064import org.junit.Ignore; 065import org.junit.Test; 066import org.junit.experimental.categories.Category; 067import org.mockito.Mockito; 068import org.slf4j.Logger; 069import org.slf4j.LoggerFactory; 070 071import org.apache.hbase.thirdparty.com.google.common.base.Stopwatch; 072import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 073import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 074import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 075import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; 076 077import org.apache.hadoop.hbase.shaded.protobuf.generated.CellProtos; 078import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 079import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest; 080import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileResponse; 081import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest; 082import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadResponse; 083import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 084import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService.BlockingInterface; 085import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest; 086import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse; 087import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest; 088import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse; 089import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest; 090import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse; 091import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; 092import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse; 093import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadRequest; 094import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadResponse; 095import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction; 096import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult; 097import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException; 098import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; 099import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; 100import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; 101 102/** 103 * Test client behavior w/o setting up a cluster. 104 * Mock up cluster emissions. 105 */ 106@Category({ClientTests.class, SmallTests.class}) 107public class TestClientNoCluster extends Configured implements Tool { 108 109 @ClassRule 110 public static final HBaseClassTestRule CLASS_RULE = 111 HBaseClassTestRule.forClass(TestClientNoCluster.class); 112 113 private static final Logger LOG = LoggerFactory.getLogger(TestClientNoCluster.class); 114 private Configuration conf; 115 public static final ServerName META_SERVERNAME = 116 ServerName.valueOf("meta.example.org", 16010, 12345); 117 118 @Before 119 public void setUp() throws Exception { 120 this.conf = HBaseConfiguration.create(); 121 // Run my Connection overrides. Use my little ConnectionImplementation below which 122 // allows me insert mocks and also use my Registry below rather than the default zk based 123 // one so tests run faster and don't have zk dependency. 124 this.conf.set("hbase.client.registry.impl", SimpleRegistry.class.getName()); 125 } 126 127 /** 128 * Simple cluster registry inserted in place of our usual zookeeper based one. 129 */ 130 static class SimpleRegistry extends DoNothingAsyncRegistry { 131 final ServerName META_HOST = META_SERVERNAME; 132 133 public SimpleRegistry(Configuration conf) { 134 super(conf); 135 } 136 137 @Override 138 public CompletableFuture<RegionLocations> getMetaRegionLocation() { 139 return CompletableFuture.completedFuture(new RegionLocations( 140 new HRegionLocation(RegionInfoBuilder.FIRST_META_REGIONINFO, META_HOST))); 141 } 142 143 @Override 144 public CompletableFuture<String> getClusterId() { 145 return CompletableFuture.completedFuture(HConstants.CLUSTER_ID_DEFAULT); 146 } 147 148 @Override 149 public CompletableFuture<Integer> getCurrentNrHRS() { 150 return CompletableFuture.completedFuture(1); 151 } 152 } 153 154 /** 155 * Remove the @Ignore to try out timeout and retry asettings 156 * @throws IOException 157 */ 158 @Ignore 159 @Test 160 public void testTimeoutAndRetries() throws IOException { 161 Configuration localConfig = HBaseConfiguration.create(this.conf); 162 // This override mocks up our exists/get call to throw a RegionServerStoppedException. 163 localConfig.set("hbase.client.connection.impl", RpcTimeoutConnection.class.getName()); 164 Connection connection = ConnectionFactory.createConnection(localConfig); 165 Table table = connection.getTable(TableName.META_TABLE_NAME); 166 Throwable t = null; 167 LOG.info("Start"); 168 try { 169 // An exists call turns into a get w/ a flag. 170 table.exists(new Get(Bytes.toBytes("abc"))); 171 } catch (SocketTimeoutException e) { 172 // I expect this exception. 173 LOG.info("Got expected exception", e); 174 t = e; 175 } catch (RetriesExhaustedException e) { 176 // This is the old, unwanted behavior. If we get here FAIL!!! 177 fail(); 178 } finally { 179 table.close(); 180 } 181 connection.close(); 182 LOG.info("Stop"); 183 assertTrue(t != null); 184 } 185 186 /** 187 * Test that operation timeout prevails over rpc default timeout and retries, etc. 188 * @throws IOException 189 */ 190 @Test 191 public void testRpcTimeout() throws IOException { 192 Configuration localConfig = HBaseConfiguration.create(this.conf); 193 // This override mocks up our exists/get call to throw a RegionServerStoppedException. 194 localConfig.set("hbase.client.connection.impl", RpcTimeoutConnection.class.getName()); 195 int pause = 10; 196 localConfig.setInt("hbase.client.pause", pause); 197 localConfig.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 10); 198 // Set the operation timeout to be < the pause. Expectation is that after first pause, we will 199 // fail out of the rpc because the rpc timeout will have been set to the operation tiemout 200 // and it has expired. Otherwise, if this functionality is broke, all retries will be run -- 201 // all ten of them -- and we'll get the RetriesExhaustedException exception. 202 localConfig.setInt(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT, pause - 1); 203 Connection connection = ConnectionFactory.createConnection(localConfig); 204 Table table = connection.getTable(TableName.META_TABLE_NAME); 205 Throwable t = null; 206 try { 207 // An exists call turns into a get w/ a flag. 208 table.exists(new Get(Bytes.toBytes("abc"))); 209 } catch (SocketTimeoutException e) { 210 // I expect this exception. 211 LOG.info("Got expected exception", e); 212 t = e; 213 } catch (RetriesExhaustedException e) { 214 // This is the old, unwanted behavior. If we get here FAIL!!! 215 fail(); 216 } finally { 217 table.close(); 218 connection.close(); 219 } 220 assertTrue(t != null); 221 } 222 223 @Test 224 public void testDoNotRetryMetaTableAccessor() throws IOException { 225 this.conf.set("hbase.client.connection.impl", 226 RegionServerStoppedOnScannerOpenConnection.class.getName()); 227 try (Connection connection = ConnectionFactory.createConnection(conf)) { 228 MetaTableAccessor.fullScanRegions(connection); 229 } 230 } 231 232 @Test 233 public void testDoNotRetryOnScanNext() throws IOException { 234 this.conf.set("hbase.client.connection.impl", 235 RegionServerStoppedOnScannerOpenConnection.class.getName()); 236 // Go against meta else we will try to find first region for the table on construction which 237 // means we'll have to do a bunch more mocking. Tests that go against meta only should be 238 // good for a bit of testing. 239 Connection connection = ConnectionFactory.createConnection(this.conf); 240 Table table = connection.getTable(TableName.META_TABLE_NAME); 241 ResultScanner scanner = table.getScanner(HConstants.CATALOG_FAMILY); 242 try { 243 Result result = null; 244 while ((result = scanner.next()) != null) { 245 LOG.info(Objects.toString(result)); 246 } 247 } finally { 248 scanner.close(); 249 table.close(); 250 connection.close(); 251 } 252 } 253 254 @Test 255 public void testRegionServerStoppedOnScannerOpen() throws IOException { 256 this.conf.set("hbase.client.connection.impl", 257 RegionServerStoppedOnScannerOpenConnection.class.getName()); 258 // Go against meta else we will try to find first region for the table on construction which 259 // means we'll have to do a bunch more mocking. Tests that go against meta only should be 260 // good for a bit of testing. 261 Connection connection = ConnectionFactory.createConnection(conf); 262 Table table = connection.getTable(TableName.META_TABLE_NAME); 263 ResultScanner scanner = table.getScanner(HConstants.CATALOG_FAMILY); 264 try { 265 Result result = null; 266 while ((result = scanner.next()) != null) { 267 LOG.info(Objects.toString(result)); 268 } 269 } finally { 270 scanner.close(); 271 table.close(); 272 connection.close(); 273 } 274 } 275 276 @Test 277 public void testConnectionClosedOnRegionLocate() throws IOException { 278 Configuration testConf = new Configuration(this.conf); 279 testConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); 280 // Go against meta else we will try to find first region for the table on construction which 281 // means we'll have to do a bunch more mocking. Tests that go against meta only should be 282 // good for a bit of testing. 283 Connection connection = ConnectionFactory.createConnection(testConf); 284 Table table = connection.getTable(TableName.META_TABLE_NAME); 285 connection.close(); 286 try { 287 Get get = new Get(Bytes.toBytes("dummyRow")); 288 table.get(get); 289 fail("Should have thrown DoNotRetryException but no exception thrown"); 290 } catch (Exception e) { 291 if (!(e instanceof DoNotRetryIOException)) { 292 String errMsg = 293 "Should have thrown DoNotRetryException but actually " + e.getClass().getSimpleName(); 294 LOG.error(errMsg, e); 295 fail(errMsg); 296 } 297 } finally { 298 table.close(); 299 } 300 } 301 302 /** 303 * Override to shutdown going to zookeeper for cluster id and meta location. 304 */ 305 static class RegionServerStoppedOnScannerOpenConnection 306 extends ConnectionImplementation { 307 final ClientService.BlockingInterface stub; 308 309 RegionServerStoppedOnScannerOpenConnection(Configuration conf, 310 ExecutorService pool, User user) throws IOException { 311 super(conf, pool, user); 312 // Mock up my stub so open scanner returns a scanner id and then on next, we throw 313 // exceptions for three times and then after that, we return no more to scan. 314 this.stub = Mockito.mock(ClientService.BlockingInterface.class); 315 long sid = 12345L; 316 try { 317 Mockito.when(stub.scan((RpcController)Mockito.any(), 318 (ClientProtos.ScanRequest)Mockito.any())). 319 thenReturn(ClientProtos.ScanResponse.newBuilder().setScannerId(sid).build()). 320 thenThrow(new ServiceException(new RegionServerStoppedException("From Mockito"))). 321 thenReturn(ClientProtos.ScanResponse.newBuilder().setScannerId(sid). 322 setMoreResults(false).build()); 323 } catch (ServiceException e) { 324 throw new IOException(e); 325 } 326 } 327 328 @Override 329 public BlockingInterface getClient(ServerName sn) throws IOException { 330 return this.stub; 331 } 332 } 333 334 /** 335 * Override to check we are setting rpc timeout right. 336 */ 337 static class RpcTimeoutConnection 338 extends ConnectionImplementation { 339 final ClientService.BlockingInterface stub; 340 341 RpcTimeoutConnection(Configuration conf, ExecutorService pool, User user) 342 throws IOException { 343 super(conf, pool, user); 344 // Mock up my stub so an exists call -- which turns into a get -- throws an exception 345 this.stub = Mockito.mock(ClientService.BlockingInterface.class); 346 try { 347 Mockito.when(stub.get((RpcController)Mockito.any(), 348 (ClientProtos.GetRequest)Mockito.any())). 349 thenThrow(new ServiceException(new RegionServerStoppedException("From Mockito"))); 350 } catch (ServiceException e) { 351 throw new IOException(e); 352 } 353 } 354 355 @Override 356 public BlockingInterface getClient(ServerName sn) throws IOException { 357 return this.stub; 358 } 359 } 360 361 /** 362 * Fake many regionservers and many regions on a connection implementation. 363 */ 364 static class ManyServersManyRegionsConnection 365 extends ConnectionImplementation { 366 // All access should be synchronized 367 final Map<ServerName, ClientService.BlockingInterface> serversByClient; 368 369 /** 370 * Map of faked-up rows of a 'meta table'. 371 */ 372 final SortedMap<byte [], Pair<HRegionInfo, ServerName>> meta; 373 final AtomicLong sequenceids = new AtomicLong(0); 374 private final Configuration conf; 375 376 ManyServersManyRegionsConnection(Configuration conf, 377 ExecutorService pool, User user) 378 throws IOException { 379 super(conf, pool, user); 380 int serverCount = conf.getInt("hbase.test.servers", 10); 381 this.serversByClient = new HashMap<>(serverCount); 382 this.meta = makeMeta(Bytes.toBytes( 383 conf.get("hbase.test.tablename", Bytes.toString(BIG_USER_TABLE))), 384 conf.getInt("hbase.test.regions", 100), 385 conf.getLong("hbase.test.namespace.span", 1000), 386 serverCount); 387 this.conf = conf; 388 } 389 390 @Override 391 public ClientService.BlockingInterface getClient(ServerName sn) throws IOException { 392 // if (!sn.toString().startsWith("meta")) LOG.info(sn); 393 ClientService.BlockingInterface stub = null; 394 synchronized (this.serversByClient) { 395 stub = this.serversByClient.get(sn); 396 if (stub == null) { 397 stub = new FakeServer(this.conf, meta, sequenceids); 398 this.serversByClient.put(sn, stub); 399 } 400 } 401 return stub; 402 } 403 } 404 405 static MultiResponse doMultiResponse(final SortedMap<byte [], Pair<HRegionInfo, ServerName>> meta, 406 final AtomicLong sequenceids, final MultiRequest request) { 407 // Make a response to match the request. Act like there were no failures. 408 ClientProtos.MultiResponse.Builder builder = ClientProtos.MultiResponse.newBuilder(); 409 // Per Region. 410 RegionActionResult.Builder regionActionResultBuilder = 411 RegionActionResult.newBuilder(); 412 ResultOrException.Builder roeBuilder = ResultOrException.newBuilder(); 413 for (RegionAction regionAction: request.getRegionActionList()) { 414 regionActionResultBuilder.clear(); 415 // Per Action in a Region. 416 for (ClientProtos.Action action: regionAction.getActionList()) { 417 roeBuilder.clear(); 418 // Return empty Result and proper index as result. 419 roeBuilder.setResult(ClientProtos.Result.getDefaultInstance()); 420 roeBuilder.setIndex(action.getIndex()); 421 regionActionResultBuilder.addResultOrException(roeBuilder.build()); 422 } 423 builder.addRegionActionResult(regionActionResultBuilder.build()); 424 } 425 return builder.build(); 426 } 427 428 /** 429 * Fake 'server'. 430 * Implements the ClientService responding as though it were a 'server' (presumes a new 431 * ClientService.BlockingInterface made per server). 432 */ 433 static class FakeServer implements ClientService.BlockingInterface { 434 private AtomicInteger multiInvocationsCount = new AtomicInteger(0); 435 private final SortedMap<byte [], Pair<HRegionInfo, ServerName>> meta; 436 private final AtomicLong sequenceids; 437 private final long multiPause; 438 private final int tooManyMultiRequests; 439 440 FakeServer(final Configuration c, final SortedMap<byte [], Pair<HRegionInfo, ServerName>> meta, 441 final AtomicLong sequenceids) { 442 this.meta = meta; 443 this.sequenceids = sequenceids; 444 445 // Pause to simulate the server taking time applying the edits. This will drive up the 446 // number of threads used over in client. 447 this.multiPause = c.getLong("hbase.test.multi.pause.when.done", 0); 448 this.tooManyMultiRequests = c.getInt("hbase.test.multi.too.many", 3); 449 } 450 451 @Override 452 public GetResponse get(RpcController controller, GetRequest request) 453 throws ServiceException { 454 boolean metaRegion = isMetaRegion(request.getRegion().getValue().toByteArray(), 455 request.getRegion().getType()); 456 if (!metaRegion) { 457 return doGetResponse(request); 458 } 459 return doMetaGetResponse(meta, request); 460 } 461 462 private GetResponse doGetResponse(GetRequest request) { 463 ClientProtos.Result.Builder resultBuilder = ClientProtos.Result.newBuilder(); 464 ByteString row = request.getGet().getRow(); 465 resultBuilder.addCell(getStartCode(row)); 466 GetResponse.Builder builder = GetResponse.newBuilder(); 467 builder.setResult(resultBuilder.build()); 468 return builder.build(); 469 } 470 471 @Override 472 public MutateResponse mutate(RpcController controller, 473 MutateRequest request) throws ServiceException { 474 throw new NotImplementedException(HConstants.NOT_IMPLEMENTED); 475 } 476 477 @Override 478 public ScanResponse scan(RpcController controller, 479 ScanRequest request) throws ServiceException { 480 // Presume it is a scan of meta for now. Not all scans provide a region spec expecting 481 // the server to keep reference by scannerid. TODO. 482 return doMetaScanResponse(meta, sequenceids, request); 483 } 484 485 @Override 486 public BulkLoadHFileResponse bulkLoadHFile( 487 RpcController controller, BulkLoadHFileRequest request) 488 throws ServiceException { 489 throw new NotImplementedException(HConstants.NOT_IMPLEMENTED); 490 } 491 492 @Override 493 public CoprocessorServiceResponse execService( 494 RpcController controller, CoprocessorServiceRequest request) 495 throws ServiceException { 496 throw new NotImplementedException(HConstants.NOT_IMPLEMENTED); 497 } 498 499 @Override 500 public MultiResponse multi(RpcController controller, MultiRequest request) 501 throws ServiceException { 502 int concurrentInvocations = this.multiInvocationsCount.incrementAndGet(); 503 try { 504 if (concurrentInvocations >= tooManyMultiRequests) { 505 throw new ServiceException(new RegionTooBusyException("concurrentInvocations=" + 506 concurrentInvocations)); 507 } 508 Threads.sleep(multiPause); 509 return doMultiResponse(meta, sequenceids, request); 510 } finally { 511 this.multiInvocationsCount.decrementAndGet(); 512 } 513 } 514 515 @Override 516 public CoprocessorServiceResponse execRegionServerService(RpcController controller, 517 CoprocessorServiceRequest request) throws ServiceException { 518 throw new NotImplementedException(HConstants.NOT_IMPLEMENTED); 519 } 520 521 @Override 522 public PrepareBulkLoadResponse prepareBulkLoad(RpcController controller, 523 PrepareBulkLoadRequest request) throws ServiceException { 524 throw new NotImplementedException(HConstants.NOT_IMPLEMENTED); 525 } 526 527 @Override 528 public CleanupBulkLoadResponse cleanupBulkLoad(RpcController controller, 529 CleanupBulkLoadRequest request) throws ServiceException { 530 throw new NotImplementedException(HConstants.NOT_IMPLEMENTED); 531 } 532 } 533 534 static ScanResponse doMetaScanResponse(final SortedMap<byte [], Pair<HRegionInfo, ServerName>> meta, 535 final AtomicLong sequenceids, final ScanRequest request) { 536 ScanResponse.Builder builder = ScanResponse.newBuilder(); 537 int max = request.getNumberOfRows(); 538 int count = 0; 539 Map<byte [], Pair<HRegionInfo, ServerName>> tail = 540 request.hasScan()? meta.tailMap(request.getScan().getStartRow().toByteArray()): meta; 541 ClientProtos.Result.Builder resultBuilder = ClientProtos.Result.newBuilder(); 542 for (Map.Entry<byte [], Pair<HRegionInfo, ServerName>> e: tail.entrySet()) { 543 // Can be 0 on open of a scanner -- i.e. rpc to setup scannerid only. 544 if (max <= 0) break; 545 if (++count > max) break; 546 HRegionInfo hri = e.getValue().getFirst(); 547 ByteString row = UnsafeByteOperations.unsafeWrap(hri.getRegionName()); 548 resultBuilder.clear(); 549 resultBuilder.addCell(getRegionInfo(row, hri)); 550 resultBuilder.addCell(getServer(row, e.getValue().getSecond())); 551 resultBuilder.addCell(getStartCode(row)); 552 builder.addResults(resultBuilder.build()); 553 // Set more to false if we are on the last region in table. 554 if (hri.getEndKey().length <= 0) builder.setMoreResults(false); 555 else builder.setMoreResults(true); 556 } 557 // If no scannerid, set one. 558 builder.setScannerId(request.hasScannerId()? 559 request.getScannerId(): sequenceids.incrementAndGet()); 560 return builder.build(); 561 } 562 563 static GetResponse doMetaGetResponse(final SortedMap<byte [], Pair<HRegionInfo, ServerName>> meta, 564 final GetRequest request) { 565 ClientProtos.Result.Builder resultBuilder = ClientProtos.Result.newBuilder(); 566 ByteString row = request.getGet().getRow(); 567 Pair<HRegionInfo, ServerName> p = meta.get(row.toByteArray()); 568 if (p != null) { 569 resultBuilder.addCell(getRegionInfo(row, p.getFirst())); 570 resultBuilder.addCell(getServer(row, p.getSecond())); 571 } 572 resultBuilder.addCell(getStartCode(row)); 573 GetResponse.Builder builder = GetResponse.newBuilder(); 574 builder.setResult(resultBuilder.build()); 575 return builder.build(); 576 } 577 578 /** 579 * @param name region name or encoded region name. 580 * @param type 581 * @return True if we are dealing with a hbase:meta region. 582 */ 583 static boolean isMetaRegion(final byte [] name, final RegionSpecifierType type) { 584 switch (type) { 585 case REGION_NAME: 586 return Bytes.equals(HRegionInfo.FIRST_META_REGIONINFO.getRegionName(), name); 587 case ENCODED_REGION_NAME: 588 return Bytes.equals(HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes(), name); 589 default: throw new UnsupportedOperationException(); 590 } 591 } 592 593 private final static ByteString CATALOG_FAMILY_BYTESTRING = 594 UnsafeByteOperations.unsafeWrap(HConstants.CATALOG_FAMILY); 595 private final static ByteString REGIONINFO_QUALIFIER_BYTESTRING = 596 UnsafeByteOperations.unsafeWrap(HConstants.REGIONINFO_QUALIFIER); 597 private final static ByteString SERVER_QUALIFIER_BYTESTRING = 598 UnsafeByteOperations.unsafeWrap(HConstants.SERVER_QUALIFIER); 599 600 static CellProtos.Cell.Builder getBaseCellBuilder(final ByteString row) { 601 CellProtos.Cell.Builder cellBuilder = CellProtos.Cell.newBuilder(); 602 cellBuilder.setRow(row); 603 cellBuilder.setFamily(CATALOG_FAMILY_BYTESTRING); 604 cellBuilder.setTimestamp(System.currentTimeMillis()); 605 return cellBuilder; 606 } 607 608 static CellProtos.Cell getRegionInfo(final ByteString row, final HRegionInfo hri) { 609 CellProtos.Cell.Builder cellBuilder = getBaseCellBuilder(row); 610 cellBuilder.setQualifier(REGIONINFO_QUALIFIER_BYTESTRING); 611 cellBuilder.setValue(UnsafeByteOperations.unsafeWrap(hri.toByteArray())); 612 return cellBuilder.build(); 613 } 614 615 static CellProtos.Cell getServer(final ByteString row, final ServerName sn) { 616 CellProtos.Cell.Builder cellBuilder = getBaseCellBuilder(row); 617 cellBuilder.setQualifier(SERVER_QUALIFIER_BYTESTRING); 618 cellBuilder.setValue(ByteString.copyFromUtf8(sn.getHostAndPort())); 619 return cellBuilder.build(); 620 } 621 622 static CellProtos.Cell getStartCode(final ByteString row) { 623 CellProtos.Cell.Builder cellBuilder = getBaseCellBuilder(row); 624 cellBuilder.setQualifier(UnsafeByteOperations.unsafeWrap(HConstants.STARTCODE_QUALIFIER)); 625 // TODO: 626 cellBuilder.setValue(UnsafeByteOperations.unsafeWrap( 627 Bytes.toBytes(META_SERVERNAME.getStartcode()))); 628 return cellBuilder.build(); 629 } 630 631 private static final byte [] BIG_USER_TABLE = Bytes.toBytes("t"); 632 633 /** 634 * Format passed integer. Zero-pad. 635 * Copied from hbase-server PE class and small amendment. Make them share. 636 * @param number 637 * @return Returns zero-prefixed 10-byte wide decimal version of passed 638 * number (Does absolute in case number is negative). 639 */ 640 private static byte [] format(final long number) { 641 byte [] b = new byte[10]; 642 long d = number; 643 for (int i = b.length - 1; i >= 0; i--) { 644 b[i] = (byte)((d % 10) + '0'); 645 d /= 10; 646 } 647 return b; 648 } 649 650 /** 651 * @param count 652 * @param namespaceSpan 653 * @return <code>count</code> regions 654 */ 655 private static HRegionInfo [] makeHRegionInfos(final byte [] tableName, final int count, 656 final long namespaceSpan) { 657 byte [] startKey = HConstants.EMPTY_BYTE_ARRAY; 658 byte [] endKey = HConstants.EMPTY_BYTE_ARRAY; 659 long interval = namespaceSpan / count; 660 HRegionInfo [] hris = new HRegionInfo[count]; 661 for (int i = 0; i < count; i++) { 662 if (i == 0) { 663 endKey = format(interval); 664 } else { 665 startKey = endKey; 666 if (i == count - 1) endKey = HConstants.EMPTY_BYTE_ARRAY; 667 else endKey = format((i + 1) * interval); 668 } 669 hris[i] = new HRegionInfo(TableName.valueOf(tableName), startKey, endKey); 670 } 671 return hris; 672 } 673 674 /** 675 * @param count 676 * @return Return <code>count</code> servernames. 677 */ 678 private static ServerName [] makeServerNames(final int count) { 679 ServerName [] sns = new ServerName[count]; 680 for (int i = 0; i < count; i++) { 681 sns[i] = ServerName.valueOf("" + i + ".example.org", 16010, i); 682 } 683 return sns; 684 } 685 686 /** 687 * Comparator for meta row keys. 688 */ 689 private static class MetaRowsComparator implements Comparator<byte []> { 690 private final CellComparatorImpl delegate = CellComparatorImpl.META_COMPARATOR; 691 @Override 692 public int compare(byte[] left, byte[] right) { 693 return delegate.compareRows(new KeyValue.KeyOnlyKeyValue(left), right, 0, right.length); 694 } 695 } 696 697 /** 698 * Create up a map that is keyed by meta row name and whose value is the HRegionInfo and 699 * ServerName to return for this row. 700 * @return Map with faked hbase:meta content in it. 701 */ 702 static SortedMap<byte [], Pair<HRegionInfo, ServerName>> makeMeta(final byte [] tableName, 703 final int regionCount, final long namespaceSpan, final int serverCount) { 704 // I need a comparator for meta rows so we sort properly. 705 SortedMap<byte [], Pair<HRegionInfo, ServerName>> meta = 706 new ConcurrentSkipListMap<>(new MetaRowsComparator()); 707 HRegionInfo [] hris = makeHRegionInfos(tableName, regionCount, namespaceSpan); 708 ServerName [] serverNames = makeServerNames(serverCount); 709 int per = regionCount / serverCount; 710 int count = 0; 711 for (HRegionInfo hri: hris) { 712 Pair<HRegionInfo, ServerName> p = new Pair<>(hri, serverNames[count++ / per]); 713 meta.put(hri.getRegionName(), p); 714 } 715 return meta; 716 } 717 718 /** 719 * Code for each 'client' to run. 720 * 721 * @param id 722 * @param c 723 * @param sharedConnection 724 * @throws IOException 725 */ 726 static void cycle(int id, final Configuration c, final Connection sharedConnection) throws IOException { 727 long namespaceSpan = c.getLong("hbase.test.namespace.span", 1000000); 728 long startTime = System.currentTimeMillis(); 729 final int printInterval = 100000; 730 Random rd = new Random(id); 731 boolean get = c.getBoolean("hbase.test.do.gets", false); 732 TableName tableName = TableName.valueOf(BIG_USER_TABLE); 733 if (get) { 734 try (Table table = sharedConnection.getTable(tableName)){ 735 Stopwatch stopWatch = Stopwatch.createStarted(); 736 for (int i = 0; i < namespaceSpan; i++) { 737 byte [] b = format(rd.nextLong()); 738 Get g = new Get(b); 739 table.get(g); 740 if (i % printInterval == 0) { 741 LOG.info("Get " + printInterval + "/" + stopWatch.elapsed(java.util.concurrent.TimeUnit.MILLISECONDS)); 742 stopWatch.reset(); 743 stopWatch.start(); 744 } 745 } 746 LOG.info("Finished a cycle putting " + namespaceSpan + " in " + 747 (System.currentTimeMillis() - startTime) + "ms"); 748 } 749 } else { 750 try (BufferedMutator mutator = sharedConnection.getBufferedMutator(tableName)) { 751 Stopwatch stopWatch = Stopwatch.createStarted(); 752 for (int i = 0; i < namespaceSpan; i++) { 753 byte [] b = format(rd.nextLong()); 754 Put p = new Put(b); 755 p.addColumn(HConstants.CATALOG_FAMILY, b, b); 756 mutator.mutate(p); 757 if (i % printInterval == 0) { 758 LOG.info("Put " + printInterval + "/" + stopWatch.elapsed(java.util.concurrent.TimeUnit.MILLISECONDS)); 759 stopWatch.reset(); 760 stopWatch.start(); 761 } 762 } 763 LOG.info("Finished a cycle putting " + namespaceSpan + " in " + 764 (System.currentTimeMillis() - startTime) + "ms"); 765 } 766 } 767 } 768 769 @Override 770 public int run(String[] arg0) throws Exception { 771 int errCode = 0; 772 // TODO: Make command options. 773 // How many servers to fake. 774 final int servers = 1; 775 // How many regions to put on the faked servers. 776 final int regions = 100000; 777 // How many 'keys' in the faked regions. 778 final long namespaceSpan = 50000000; 779 // How long to take to pause after doing a put; make this long if you want to fake a struggling 780 // server. 781 final long multiPause = 0; 782 // Check args make basic sense. 783 if ((namespaceSpan < regions) || (regions < servers)) { 784 throw new IllegalArgumentException("namespaceSpan=" + namespaceSpan + " must be > regions=" + 785 regions + " which must be > servers=" + servers); 786 } 787 788 // Set my many servers and many regions faking connection in place. 789 getConf().set("hbase.client.connection.impl", 790 ManyServersManyRegionsConnection.class.getName()); 791 // Use simple kv registry rather than zk 792 getConf().set("hbase.client.registry.impl", SimpleRegistry.class.getName()); 793 // When to report fails. Default is we report the 10th. This means we'll see log everytime 794 // an exception is thrown -- usually RegionTooBusyException when we have more than 795 // hbase.test.multi.too.many requests outstanding at any time. 796 getConf().setInt("hbase.client.start.log.errors.counter", 0); 797 798 // Ugly but this is only way to pass in configs.into ManyServersManyRegionsConnection class. 799 getConf().setInt("hbase.test.regions", regions); 800 getConf().setLong("hbase.test.namespace.span", namespaceSpan); 801 getConf().setLong("hbase.test.servers", servers); 802 getConf().set("hbase.test.tablename", Bytes.toString(BIG_USER_TABLE)); 803 getConf().setLong("hbase.test.multi.pause.when.done", multiPause); 804 // Let there be ten outstanding requests at a time before we throw RegionBusyException. 805 getConf().setInt("hbase.test.multi.too.many", 10); 806 final int clients = 2; 807 808 // Have them all share the same connection so they all share the same instance of 809 // ManyServersManyRegionsConnection so I can keep an eye on how many requests by server. 810 final ExecutorService pool = Executors.newCachedThreadPool(Threads.getNamedThreadFactory("p")); 811 // Executors.newFixedThreadPool(servers * 10, Threads.getNamedThreadFactory("p")); 812 // Share a connection so I can keep counts in the 'server' on concurrency. 813 final Connection sharedConnection = ConnectionFactory.createConnection(getConf()/*, pool*/); 814 try { 815 Thread [] ts = new Thread[clients]; 816 for (int j = 0; j < ts.length; j++) { 817 final int id = j; 818 ts[j] = new Thread("" + j) { 819 final Configuration c = getConf(); 820 821 @Override 822 public void run() { 823 try { 824 cycle(id, c, sharedConnection); 825 } catch (IOException e) { 826 e.printStackTrace(); 827 } 828 } 829 }; 830 ts[j].start(); 831 } 832 for (int j = 0; j < ts.length; j++) { 833 ts[j].join(); 834 } 835 } finally { 836 sharedConnection.close(); 837 } 838 return errCode; 839 } 840 841 /** 842 * Run a client instance against a faked up server. 843 * @param args TODO 844 * @throws Exception 845 */ 846 public static void main(String[] args) throws Exception { 847 System.exit(ToolRunner.run(HBaseConfiguration.create(), new TestClientNoCluster(), args)); 848 } 849}