001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertTrue; 021import static org.junit.Assert.fail; 022 023import java.io.IOException; 024import java.net.SocketTimeoutException; 025import java.util.Comparator; 026import java.util.HashMap; 027import java.util.Map; 028import java.util.Objects; 029import java.util.Random; 030import java.util.SortedMap; 031import java.util.concurrent.CompletableFuture; 032import java.util.concurrent.ConcurrentSkipListMap; 033import java.util.concurrent.ExecutorService; 034import java.util.concurrent.Executors; 035import java.util.concurrent.atomic.AtomicInteger; 036import java.util.concurrent.atomic.AtomicLong; 037import org.apache.commons.lang3.NotImplementedException; 038import org.apache.hadoop.conf.Configuration; 039import org.apache.hadoop.conf.Configured; 040import org.apache.hadoop.hbase.CellComparatorImpl; 041import org.apache.hadoop.hbase.DoNotRetryIOException; 042import org.apache.hadoop.hbase.HBaseClassTestRule; 043import org.apache.hadoop.hbase.HBaseConfiguration; 044import org.apache.hadoop.hbase.HConstants; 045import org.apache.hadoop.hbase.HRegionInfo; 046import org.apache.hadoop.hbase.HRegionLocation; 047import org.apache.hadoop.hbase.KeyValue; 048import org.apache.hadoop.hbase.MetaTableAccessor; 049import org.apache.hadoop.hbase.RegionLocations; 050import org.apache.hadoop.hbase.RegionTooBusyException; 051import org.apache.hadoop.hbase.ServerName; 052import org.apache.hadoop.hbase.TableName; 053import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; 054import org.apache.hadoop.hbase.security.User; 055import org.apache.hadoop.hbase.testclassification.ClientTests; 056import org.apache.hadoop.hbase.testclassification.SmallTests; 057import org.apache.hadoop.hbase.util.Bytes; 058import org.apache.hadoop.hbase.util.Pair; 059import org.apache.hadoop.hbase.util.Threads; 060import org.apache.hadoop.util.Tool; 061import org.apache.hadoop.util.ToolRunner; 062import org.junit.Before; 063import org.junit.ClassRule; 064import org.junit.Ignore; 065import org.junit.Test; 066import org.junit.experimental.categories.Category; 067import org.mockito.Mockito; 068import org.slf4j.Logger; 069import org.slf4j.LoggerFactory; 070 071import org.apache.hbase.thirdparty.com.google.common.base.Stopwatch; 072import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 073import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 074import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 075import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; 076 077import org.apache.hadoop.hbase.shaded.protobuf.generated.CellProtos; 078import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 079import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest; 080import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileResponse; 081import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest; 082import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadResponse; 083import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 084import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService.BlockingInterface; 085import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest; 086import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse; 087import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest; 088import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse; 089import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest; 090import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse; 091import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; 092import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse; 093import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadRequest; 094import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadResponse; 095import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction; 096import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult; 097import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException; 098import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; 099import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; 100import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; 101 102/** 103 * Test client behavior w/o setting up a cluster. 104 * Mock up cluster emissions. 105 */ 106@Category({ClientTests.class, SmallTests.class}) 107public class TestClientNoCluster extends Configured implements Tool { 108 109 @ClassRule 110 public static final HBaseClassTestRule CLASS_RULE = 111 HBaseClassTestRule.forClass(TestClientNoCluster.class); 112 113 private static final Logger LOG = LoggerFactory.getLogger(TestClientNoCluster.class); 114 private Configuration conf; 115 public static final ServerName META_SERVERNAME = 116 ServerName.valueOf("meta.example.org", 16010, 12345); 117 118 @Before 119 public void setUp() throws Exception { 120 this.conf = HBaseConfiguration.create(); 121 // Run my Connection overrides. Use my little ConnectionImplementation below which 122 // allows me insert mocks and also use my Registry below rather than the default zk based 123 // one so tests run faster and don't have zk dependency. 124 this.conf.set("hbase.client.registry.impl", SimpleRegistry.class.getName()); 125 } 126 127 /** 128 * Simple cluster registry inserted in place of our usual zookeeper based one. 129 */ 130 static class SimpleRegistry extends DoNothingConnectionRegistry { 131 final ServerName META_HOST = META_SERVERNAME; 132 133 public SimpleRegistry(Configuration conf) { 134 super(conf); 135 } 136 137 @Override 138 public CompletableFuture<RegionLocations> getMetaRegionLocations() { 139 return CompletableFuture.completedFuture(new RegionLocations( 140 new HRegionLocation(RegionInfoBuilder.FIRST_META_REGIONINFO, META_HOST))); 141 } 142 143 @Override 144 public CompletableFuture<String> getClusterId() { 145 return CompletableFuture.completedFuture(HConstants.CLUSTER_ID_DEFAULT); 146 } 147 } 148 149 /** 150 * Remove the @Ignore to try out timeout and retry asettings 151 * @throws IOException 152 */ 153 @Ignore 154 @Test 155 public void testTimeoutAndRetries() throws IOException { 156 Configuration localConfig = HBaseConfiguration.create(this.conf); 157 // This override mocks up our exists/get call to throw a RegionServerStoppedException. 158 localConfig.set("hbase.client.connection.impl", RpcTimeoutConnection.class.getName()); 159 Connection connection = ConnectionFactory.createConnection(localConfig); 160 Table table = connection.getTable(TableName.META_TABLE_NAME); 161 Throwable t = null; 162 LOG.info("Start"); 163 try { 164 // An exists call turns into a get w/ a flag. 165 table.exists(new Get(Bytes.toBytes("abc"))); 166 } catch (SocketTimeoutException e) { 167 // I expect this exception. 168 LOG.info("Got expected exception", e); 169 t = e; 170 } catch (RetriesExhaustedException e) { 171 // This is the old, unwanted behavior. If we get here FAIL!!! 172 fail(); 173 } finally { 174 table.close(); 175 } 176 connection.close(); 177 LOG.info("Stop"); 178 assertTrue(t != null); 179 } 180 181 /** 182 * Test that operation timeout prevails over rpc default timeout and retries, etc. 183 * @throws IOException 184 */ 185 @Test 186 public void testRpcTimeout() throws IOException { 187 Configuration localConfig = HBaseConfiguration.create(this.conf); 188 // This override mocks up our exists/get call to throw a RegionServerStoppedException. 189 localConfig.set("hbase.client.connection.impl", RpcTimeoutConnection.class.getName()); 190 int pause = 10; 191 localConfig.setInt("hbase.client.pause", pause); 192 localConfig.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 10); 193 // Set the operation timeout to be < the pause. Expectation is that after first pause, we will 194 // fail out of the rpc because the rpc timeout will have been set to the operation tiemout 195 // and it has expired. Otherwise, if this functionality is broke, all retries will be run -- 196 // all ten of them -- and we'll get the RetriesExhaustedException exception. 197 localConfig.setInt(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT, pause - 1); 198 Connection connection = ConnectionFactory.createConnection(localConfig); 199 Table table = connection.getTable(TableName.META_TABLE_NAME); 200 Throwable t = null; 201 try { 202 // An exists call turns into a get w/ a flag. 203 table.exists(new Get(Bytes.toBytes("abc"))); 204 } catch (SocketTimeoutException e) { 205 // I expect this exception. 206 LOG.info("Got expected exception", e); 207 t = e; 208 } catch (RetriesExhaustedException e) { 209 // This is the old, unwanted behavior. If we get here FAIL!!! 210 fail(); 211 } finally { 212 table.close(); 213 connection.close(); 214 } 215 assertTrue(t != null); 216 } 217 218 @Test 219 public void testDoNotRetryMetaTableAccessor() throws IOException { 220 this.conf.set("hbase.client.connection.impl", 221 RegionServerStoppedOnScannerOpenConnection.class.getName()); 222 try (Connection connection = ConnectionFactory.createConnection(conf)) { 223 MetaTableAccessor.fullScanRegions(connection); 224 } 225 } 226 227 @Test 228 public void testDoNotRetryOnScanNext() throws IOException { 229 this.conf.set("hbase.client.connection.impl", 230 RegionServerStoppedOnScannerOpenConnection.class.getName()); 231 // Go against meta else we will try to find first region for the table on construction which 232 // means we'll have to do a bunch more mocking. Tests that go against meta only should be 233 // good for a bit of testing. 234 Connection connection = ConnectionFactory.createConnection(this.conf); 235 Table table = connection.getTable(TableName.META_TABLE_NAME); 236 ResultScanner scanner = table.getScanner(HConstants.CATALOG_FAMILY); 237 try { 238 Result result = null; 239 while ((result = scanner.next()) != null) { 240 LOG.info(Objects.toString(result)); 241 } 242 } finally { 243 scanner.close(); 244 table.close(); 245 connection.close(); 246 } 247 } 248 249 @Test 250 public void testRegionServerStoppedOnScannerOpen() throws IOException { 251 this.conf.set("hbase.client.connection.impl", 252 RegionServerStoppedOnScannerOpenConnection.class.getName()); 253 // Go against meta else we will try to find first region for the table on construction which 254 // means we'll have to do a bunch more mocking. Tests that go against meta only should be 255 // good for a bit of testing. 256 Connection connection = ConnectionFactory.createConnection(conf); 257 Table table = connection.getTable(TableName.META_TABLE_NAME); 258 ResultScanner scanner = table.getScanner(HConstants.CATALOG_FAMILY); 259 try { 260 Result result = null; 261 while ((result = scanner.next()) != null) { 262 LOG.info(Objects.toString(result)); 263 } 264 } finally { 265 scanner.close(); 266 table.close(); 267 connection.close(); 268 } 269 } 270 271 @Test 272 public void testConnectionClosedOnRegionLocate() throws IOException { 273 Configuration testConf = new Configuration(this.conf); 274 testConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); 275 // Go against meta else we will try to find first region for the table on construction which 276 // means we'll have to do a bunch more mocking. Tests that go against meta only should be 277 // good for a bit of testing. 278 Connection connection = ConnectionFactory.createConnection(testConf); 279 Table table = connection.getTable(TableName.META_TABLE_NAME); 280 connection.close(); 281 try { 282 Get get = new Get(Bytes.toBytes("dummyRow")); 283 table.get(get); 284 fail("Should have thrown DoNotRetryException but no exception thrown"); 285 } catch (Exception e) { 286 if (!(e instanceof DoNotRetryIOException)) { 287 String errMsg = 288 "Should have thrown DoNotRetryException but actually " + e.getClass().getSimpleName(); 289 LOG.error(errMsg, e); 290 fail(errMsg); 291 } 292 } finally { 293 table.close(); 294 } 295 } 296 297 /** 298 * Override to shutdown going to zookeeper for cluster id and meta location. 299 */ 300 static class RegionServerStoppedOnScannerOpenConnection 301 extends ConnectionImplementation { 302 final ClientService.BlockingInterface stub; 303 304 RegionServerStoppedOnScannerOpenConnection(Configuration conf, 305 ExecutorService pool, User user) throws IOException { 306 super(conf, pool, user); 307 // Mock up my stub so open scanner returns a scanner id and then on next, we throw 308 // exceptions for three times and then after that, we return no more to scan. 309 this.stub = Mockito.mock(ClientService.BlockingInterface.class); 310 long sid = 12345L; 311 try { 312 Mockito.when(stub.scan((RpcController)Mockito.any(), 313 (ClientProtos.ScanRequest)Mockito.any())). 314 thenReturn(ClientProtos.ScanResponse.newBuilder().setScannerId(sid).build()). 315 thenThrow(new ServiceException(new RegionServerStoppedException("From Mockito"))). 316 thenReturn(ClientProtos.ScanResponse.newBuilder().setScannerId(sid). 317 setMoreResults(false).build()); 318 } catch (ServiceException e) { 319 throw new IOException(e); 320 } 321 } 322 323 @Override 324 public BlockingInterface getClient(ServerName sn) throws IOException { 325 return this.stub; 326 } 327 } 328 329 /** 330 * Override to check we are setting rpc timeout right. 331 */ 332 static class RpcTimeoutConnection 333 extends ConnectionImplementation { 334 final ClientService.BlockingInterface stub; 335 336 RpcTimeoutConnection(Configuration conf, ExecutorService pool, User user) 337 throws IOException { 338 super(conf, pool, user); 339 // Mock up my stub so an exists call -- which turns into a get -- throws an exception 340 this.stub = Mockito.mock(ClientService.BlockingInterface.class); 341 try { 342 Mockito.when(stub.get((RpcController)Mockito.any(), 343 (ClientProtos.GetRequest)Mockito.any())). 344 thenThrow(new ServiceException(new RegionServerStoppedException("From Mockito"))); 345 } catch (ServiceException e) { 346 throw new IOException(e); 347 } 348 } 349 350 @Override 351 public BlockingInterface getClient(ServerName sn) throws IOException { 352 return this.stub; 353 } 354 } 355 356 /** 357 * Fake many regionservers and many regions on a connection implementation. 358 */ 359 static class ManyServersManyRegionsConnection 360 extends ConnectionImplementation { 361 // All access should be synchronized 362 final Map<ServerName, ClientService.BlockingInterface> serversByClient; 363 364 /** 365 * Map of faked-up rows of a 'meta table'. 366 */ 367 final SortedMap<byte [], Pair<HRegionInfo, ServerName>> meta; 368 final AtomicLong sequenceids = new AtomicLong(0); 369 private final Configuration conf; 370 371 ManyServersManyRegionsConnection(Configuration conf, 372 ExecutorService pool, User user) 373 throws IOException { 374 super(conf, pool, user); 375 int serverCount = conf.getInt("hbase.test.servers", 10); 376 this.serversByClient = new HashMap<>(serverCount); 377 this.meta = makeMeta(Bytes.toBytes( 378 conf.get("hbase.test.tablename", Bytes.toString(BIG_USER_TABLE))), 379 conf.getInt("hbase.test.regions", 100), 380 conf.getLong("hbase.test.namespace.span", 1000), 381 serverCount); 382 this.conf = conf; 383 } 384 385 @Override 386 public ClientService.BlockingInterface getClient(ServerName sn) throws IOException { 387 // if (!sn.toString().startsWith("meta")) LOG.info(sn); 388 ClientService.BlockingInterface stub = null; 389 synchronized (this.serversByClient) { 390 stub = this.serversByClient.get(sn); 391 if (stub == null) { 392 stub = new FakeServer(this.conf, meta, sequenceids); 393 this.serversByClient.put(sn, stub); 394 } 395 } 396 return stub; 397 } 398 } 399 400 static MultiResponse doMultiResponse(final SortedMap<byte [], Pair<HRegionInfo, ServerName>> meta, 401 final AtomicLong sequenceids, final MultiRequest request) { 402 // Make a response to match the request. Act like there were no failures. 403 ClientProtos.MultiResponse.Builder builder = ClientProtos.MultiResponse.newBuilder(); 404 // Per Region. 405 RegionActionResult.Builder regionActionResultBuilder = 406 RegionActionResult.newBuilder(); 407 ResultOrException.Builder roeBuilder = ResultOrException.newBuilder(); 408 for (RegionAction regionAction: request.getRegionActionList()) { 409 regionActionResultBuilder.clear(); 410 // Per Action in a Region. 411 for (ClientProtos.Action action: regionAction.getActionList()) { 412 roeBuilder.clear(); 413 // Return empty Result and proper index as result. 414 roeBuilder.setResult(ClientProtos.Result.getDefaultInstance()); 415 roeBuilder.setIndex(action.getIndex()); 416 regionActionResultBuilder.addResultOrException(roeBuilder.build()); 417 } 418 builder.addRegionActionResult(regionActionResultBuilder.build()); 419 } 420 return builder.build(); 421 } 422 423 /** 424 * Fake 'server'. 425 * Implements the ClientService responding as though it were a 'server' (presumes a new 426 * ClientService.BlockingInterface made per server). 427 */ 428 static class FakeServer implements ClientService.BlockingInterface { 429 private AtomicInteger multiInvocationsCount = new AtomicInteger(0); 430 private final SortedMap<byte [], Pair<HRegionInfo, ServerName>> meta; 431 private final AtomicLong sequenceids; 432 private final long multiPause; 433 private final int tooManyMultiRequests; 434 435 FakeServer(final Configuration c, final SortedMap<byte [], Pair<HRegionInfo, ServerName>> meta, 436 final AtomicLong sequenceids) { 437 this.meta = meta; 438 this.sequenceids = sequenceids; 439 440 // Pause to simulate the server taking time applying the edits. This will drive up the 441 // number of threads used over in client. 442 this.multiPause = c.getLong("hbase.test.multi.pause.when.done", 0); 443 this.tooManyMultiRequests = c.getInt("hbase.test.multi.too.many", 3); 444 } 445 446 @Override 447 public GetResponse get(RpcController controller, GetRequest request) 448 throws ServiceException { 449 boolean metaRegion = isMetaRegion(request.getRegion().getValue().toByteArray(), 450 request.getRegion().getType()); 451 if (!metaRegion) { 452 return doGetResponse(request); 453 } 454 return doMetaGetResponse(meta, request); 455 } 456 457 private GetResponse doGetResponse(GetRequest request) { 458 ClientProtos.Result.Builder resultBuilder = ClientProtos.Result.newBuilder(); 459 ByteString row = request.getGet().getRow(); 460 resultBuilder.addCell(getStartCode(row)); 461 GetResponse.Builder builder = GetResponse.newBuilder(); 462 builder.setResult(resultBuilder.build()); 463 return builder.build(); 464 } 465 466 @Override 467 public MutateResponse mutate(RpcController controller, 468 MutateRequest request) throws ServiceException { 469 throw new NotImplementedException(HConstants.NOT_IMPLEMENTED); 470 } 471 472 @Override 473 public ScanResponse scan(RpcController controller, 474 ScanRequest request) throws ServiceException { 475 // Presume it is a scan of meta for now. Not all scans provide a region spec expecting 476 // the server to keep reference by scannerid. TODO. 477 return doMetaScanResponse(meta, sequenceids, request); 478 } 479 480 @Override 481 public BulkLoadHFileResponse bulkLoadHFile( 482 RpcController controller, BulkLoadHFileRequest request) 483 throws ServiceException { 484 throw new NotImplementedException(HConstants.NOT_IMPLEMENTED); 485 } 486 487 @Override 488 public CoprocessorServiceResponse execService( 489 RpcController controller, CoprocessorServiceRequest request) 490 throws ServiceException { 491 throw new NotImplementedException(HConstants.NOT_IMPLEMENTED); 492 } 493 494 @Override 495 public MultiResponse multi(RpcController controller, MultiRequest request) 496 throws ServiceException { 497 int concurrentInvocations = this.multiInvocationsCount.incrementAndGet(); 498 try { 499 if (concurrentInvocations >= tooManyMultiRequests) { 500 throw new ServiceException(new RegionTooBusyException("concurrentInvocations=" + 501 concurrentInvocations)); 502 } 503 Threads.sleep(multiPause); 504 return doMultiResponse(meta, sequenceids, request); 505 } finally { 506 this.multiInvocationsCount.decrementAndGet(); 507 } 508 } 509 510 @Override 511 public CoprocessorServiceResponse execRegionServerService(RpcController controller, 512 CoprocessorServiceRequest request) throws ServiceException { 513 throw new NotImplementedException(HConstants.NOT_IMPLEMENTED); 514 } 515 516 @Override 517 public PrepareBulkLoadResponse prepareBulkLoad(RpcController controller, 518 PrepareBulkLoadRequest request) throws ServiceException { 519 throw new NotImplementedException(HConstants.NOT_IMPLEMENTED); 520 } 521 522 @Override 523 public CleanupBulkLoadResponse cleanupBulkLoad(RpcController controller, 524 CleanupBulkLoadRequest request) throws ServiceException { 525 throw new NotImplementedException(HConstants.NOT_IMPLEMENTED); 526 } 527 } 528 529 static ScanResponse doMetaScanResponse(final SortedMap<byte [], Pair<HRegionInfo, ServerName>> meta, 530 final AtomicLong sequenceids, final ScanRequest request) { 531 ScanResponse.Builder builder = ScanResponse.newBuilder(); 532 int max = request.getNumberOfRows(); 533 int count = 0; 534 Map<byte [], Pair<HRegionInfo, ServerName>> tail = 535 request.hasScan()? meta.tailMap(request.getScan().getStartRow().toByteArray()): meta; 536 ClientProtos.Result.Builder resultBuilder = ClientProtos.Result.newBuilder(); 537 for (Map.Entry<byte [], Pair<HRegionInfo, ServerName>> e: tail.entrySet()) { 538 // Can be 0 on open of a scanner -- i.e. rpc to setup scannerid only. 539 if (max <= 0) break; 540 if (++count > max) break; 541 HRegionInfo hri = e.getValue().getFirst(); 542 ByteString row = UnsafeByteOperations.unsafeWrap(hri.getRegionName()); 543 resultBuilder.clear(); 544 resultBuilder.addCell(getRegionInfo(row, hri)); 545 resultBuilder.addCell(getServer(row, e.getValue().getSecond())); 546 resultBuilder.addCell(getStartCode(row)); 547 builder.addResults(resultBuilder.build()); 548 // Set more to false if we are on the last region in table. 549 if (hri.getEndKey().length <= 0) builder.setMoreResults(false); 550 else builder.setMoreResults(true); 551 } 552 // If no scannerid, set one. 553 builder.setScannerId(request.hasScannerId()? 554 request.getScannerId(): sequenceids.incrementAndGet()); 555 return builder.build(); 556 } 557 558 static GetResponse doMetaGetResponse(final SortedMap<byte [], Pair<HRegionInfo, ServerName>> meta, 559 final GetRequest request) { 560 ClientProtos.Result.Builder resultBuilder = ClientProtos.Result.newBuilder(); 561 ByteString row = request.getGet().getRow(); 562 Pair<HRegionInfo, ServerName> p = meta.get(row.toByteArray()); 563 if (p != null) { 564 resultBuilder.addCell(getRegionInfo(row, p.getFirst())); 565 resultBuilder.addCell(getServer(row, p.getSecond())); 566 } 567 resultBuilder.addCell(getStartCode(row)); 568 GetResponse.Builder builder = GetResponse.newBuilder(); 569 builder.setResult(resultBuilder.build()); 570 return builder.build(); 571 } 572 573 /** 574 * @param name region name or encoded region name. 575 * @param type 576 * @return True if we are dealing with a hbase:meta region. 577 */ 578 static boolean isMetaRegion(final byte [] name, final RegionSpecifierType type) { 579 switch (type) { 580 case REGION_NAME: 581 return Bytes.equals(HRegionInfo.FIRST_META_REGIONINFO.getRegionName(), name); 582 case ENCODED_REGION_NAME: 583 return Bytes.equals(HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes(), name); 584 default: throw new UnsupportedOperationException(); 585 } 586 } 587 588 private final static ByteString CATALOG_FAMILY_BYTESTRING = 589 UnsafeByteOperations.unsafeWrap(HConstants.CATALOG_FAMILY); 590 private final static ByteString REGIONINFO_QUALIFIER_BYTESTRING = 591 UnsafeByteOperations.unsafeWrap(HConstants.REGIONINFO_QUALIFIER); 592 private final static ByteString SERVER_QUALIFIER_BYTESTRING = 593 UnsafeByteOperations.unsafeWrap(HConstants.SERVER_QUALIFIER); 594 595 static CellProtos.Cell.Builder getBaseCellBuilder(final ByteString row) { 596 CellProtos.Cell.Builder cellBuilder = CellProtos.Cell.newBuilder(); 597 cellBuilder.setRow(row); 598 cellBuilder.setFamily(CATALOG_FAMILY_BYTESTRING); 599 cellBuilder.setTimestamp(System.currentTimeMillis()); 600 return cellBuilder; 601 } 602 603 static CellProtos.Cell getRegionInfo(final ByteString row, final HRegionInfo hri) { 604 CellProtos.Cell.Builder cellBuilder = getBaseCellBuilder(row); 605 cellBuilder.setQualifier(REGIONINFO_QUALIFIER_BYTESTRING); 606 cellBuilder.setValue(UnsafeByteOperations.unsafeWrap(hri.toByteArray())); 607 return cellBuilder.build(); 608 } 609 610 static CellProtos.Cell getServer(final ByteString row, final ServerName sn) { 611 CellProtos.Cell.Builder cellBuilder = getBaseCellBuilder(row); 612 cellBuilder.setQualifier(SERVER_QUALIFIER_BYTESTRING); 613 cellBuilder.setValue(ByteString.copyFromUtf8(sn.getHostAndPort())); 614 return cellBuilder.build(); 615 } 616 617 static CellProtos.Cell getStartCode(final ByteString row) { 618 CellProtos.Cell.Builder cellBuilder = getBaseCellBuilder(row); 619 cellBuilder.setQualifier(UnsafeByteOperations.unsafeWrap(HConstants.STARTCODE_QUALIFIER)); 620 // TODO: 621 cellBuilder.setValue(UnsafeByteOperations.unsafeWrap( 622 Bytes.toBytes(META_SERVERNAME.getStartcode()))); 623 return cellBuilder.build(); 624 } 625 626 private static final byte [] BIG_USER_TABLE = Bytes.toBytes("t"); 627 628 /** 629 * Format passed integer. Zero-pad. 630 * Copied from hbase-server PE class and small amendment. Make them share. 631 * @param number 632 * @return Returns zero-prefixed 10-byte wide decimal version of passed 633 * number (Does absolute in case number is negative). 634 */ 635 private static byte [] format(final long number) { 636 byte [] b = new byte[10]; 637 long d = number; 638 for (int i = b.length - 1; i >= 0; i--) { 639 b[i] = (byte)((d % 10) + '0'); 640 d /= 10; 641 } 642 return b; 643 } 644 645 /** 646 * @param count 647 * @param namespaceSpan 648 * @return <code>count</code> regions 649 */ 650 private static HRegionInfo [] makeHRegionInfos(final byte [] tableName, final int count, 651 final long namespaceSpan) { 652 byte [] startKey = HConstants.EMPTY_BYTE_ARRAY; 653 byte [] endKey = HConstants.EMPTY_BYTE_ARRAY; 654 long interval = namespaceSpan / count; 655 HRegionInfo [] hris = new HRegionInfo[count]; 656 for (int i = 0; i < count; i++) { 657 if (i == 0) { 658 endKey = format(interval); 659 } else { 660 startKey = endKey; 661 if (i == count - 1) endKey = HConstants.EMPTY_BYTE_ARRAY; 662 else endKey = format((i + 1) * interval); 663 } 664 hris[i] = new HRegionInfo(TableName.valueOf(tableName), startKey, endKey); 665 } 666 return hris; 667 } 668 669 /** 670 * @param count 671 * @return Return <code>count</code> servernames. 672 */ 673 private static ServerName [] makeServerNames(final int count) { 674 ServerName [] sns = new ServerName[count]; 675 for (int i = 0; i < count; i++) { 676 sns[i] = ServerName.valueOf("" + i + ".example.org", 16010, i); 677 } 678 return sns; 679 } 680 681 /** 682 * Comparator for meta row keys. 683 */ 684 private static class MetaRowsComparator implements Comparator<byte []> { 685 private final CellComparatorImpl delegate = CellComparatorImpl.META_COMPARATOR; 686 @Override 687 public int compare(byte[] left, byte[] right) { 688 return delegate.compareRows(new KeyValue.KeyOnlyKeyValue(left), right, 0, right.length); 689 } 690 } 691 692 /** 693 * Create up a map that is keyed by meta row name and whose value is the HRegionInfo and 694 * ServerName to return for this row. 695 * @return Map with faked hbase:meta content in it. 696 */ 697 static SortedMap<byte [], Pair<HRegionInfo, ServerName>> makeMeta(final byte [] tableName, 698 final int regionCount, final long namespaceSpan, final int serverCount) { 699 // I need a comparator for meta rows so we sort properly. 700 SortedMap<byte [], Pair<HRegionInfo, ServerName>> meta = 701 new ConcurrentSkipListMap<>(new MetaRowsComparator()); 702 HRegionInfo [] hris = makeHRegionInfos(tableName, regionCount, namespaceSpan); 703 ServerName [] serverNames = makeServerNames(serverCount); 704 int per = regionCount / serverCount; 705 int count = 0; 706 for (HRegionInfo hri: hris) { 707 Pair<HRegionInfo, ServerName> p = new Pair<>(hri, serverNames[count++ / per]); 708 meta.put(hri.getRegionName(), p); 709 } 710 return meta; 711 } 712 713 /** 714 * Code for each 'client' to run. 715 * 716 * @param id 717 * @param c 718 * @param sharedConnection 719 * @throws IOException 720 */ 721 static void cycle(int id, final Configuration c, final Connection sharedConnection) throws IOException { 722 long namespaceSpan = c.getLong("hbase.test.namespace.span", 1000000); 723 long startTime = System.currentTimeMillis(); 724 final int printInterval = 100000; 725 Random rd = new Random(id); 726 boolean get = c.getBoolean("hbase.test.do.gets", false); 727 TableName tableName = TableName.valueOf(BIG_USER_TABLE); 728 if (get) { 729 try (Table table = sharedConnection.getTable(tableName)){ 730 Stopwatch stopWatch = Stopwatch.createStarted(); 731 for (int i = 0; i < namespaceSpan; i++) { 732 byte [] b = format(rd.nextLong()); 733 Get g = new Get(b); 734 table.get(g); 735 if (i % printInterval == 0) { 736 LOG.info("Get " + printInterval + "/" + stopWatch.elapsed(java.util.concurrent.TimeUnit.MILLISECONDS)); 737 stopWatch.reset(); 738 stopWatch.start(); 739 } 740 } 741 LOG.info("Finished a cycle putting " + namespaceSpan + " in " + 742 (System.currentTimeMillis() - startTime) + "ms"); 743 } 744 } else { 745 try (BufferedMutator mutator = sharedConnection.getBufferedMutator(tableName)) { 746 Stopwatch stopWatch = Stopwatch.createStarted(); 747 for (int i = 0; i < namespaceSpan; i++) { 748 byte [] b = format(rd.nextLong()); 749 Put p = new Put(b); 750 p.addColumn(HConstants.CATALOG_FAMILY, b, b); 751 mutator.mutate(p); 752 if (i % printInterval == 0) { 753 LOG.info("Put " + printInterval + "/" + stopWatch.elapsed(java.util.concurrent.TimeUnit.MILLISECONDS)); 754 stopWatch.reset(); 755 stopWatch.start(); 756 } 757 } 758 LOG.info("Finished a cycle putting " + namespaceSpan + " in " + 759 (System.currentTimeMillis() - startTime) + "ms"); 760 } 761 } 762 } 763 764 @Override 765 public int run(String[] arg0) throws Exception { 766 int errCode = 0; 767 // TODO: Make command options. 768 // How many servers to fake. 769 final int servers = 1; 770 // How many regions to put on the faked servers. 771 final int regions = 100000; 772 // How many 'keys' in the faked regions. 773 final long namespaceSpan = 50000000; 774 // How long to take to pause after doing a put; make this long if you want to fake a struggling 775 // server. 776 final long multiPause = 0; 777 // Check args make basic sense. 778 if ((namespaceSpan < regions) || (regions < servers)) { 779 throw new IllegalArgumentException("namespaceSpan=" + namespaceSpan + " must be > regions=" + 780 regions + " which must be > servers=" + servers); 781 } 782 783 // Set my many servers and many regions faking connection in place. 784 getConf().set("hbase.client.connection.impl", 785 ManyServersManyRegionsConnection.class.getName()); 786 // Use simple kv registry rather than zk 787 getConf().set("hbase.client.registry.impl", SimpleRegistry.class.getName()); 788 // When to report fails. Default is we report the 10th. This means we'll see log everytime 789 // an exception is thrown -- usually RegionTooBusyException when we have more than 790 // hbase.test.multi.too.many requests outstanding at any time. 791 getConf().setInt("hbase.client.start.log.errors.counter", 0); 792 793 // Ugly but this is only way to pass in configs.into ManyServersManyRegionsConnection class. 794 getConf().setInt("hbase.test.regions", regions); 795 getConf().setLong("hbase.test.namespace.span", namespaceSpan); 796 getConf().setLong("hbase.test.servers", servers); 797 getConf().set("hbase.test.tablename", Bytes.toString(BIG_USER_TABLE)); 798 getConf().setLong("hbase.test.multi.pause.when.done", multiPause); 799 // Let there be ten outstanding requests at a time before we throw RegionBusyException. 800 getConf().setInt("hbase.test.multi.too.many", 10); 801 final int clients = 2; 802 803 // Have them all share the same connection so they all share the same instance of 804 // ManyServersManyRegionsConnection so I can keep an eye on how many requests by server. 805 final ExecutorService pool = Executors.newCachedThreadPool( 806 Threads.newDaemonThreadFactory("p")); 807 // Executors.newFixedThreadPool(servers * 10, Threads.getNamedThreadFactory("p")); 808 // Share a connection so I can keep counts in the 'server' on concurrency. 809 final Connection sharedConnection = ConnectionFactory.createConnection(getConf()/*, pool*/); 810 try { 811 Thread [] ts = new Thread[clients]; 812 for (int j = 0; j < ts.length; j++) { 813 final int id = j; 814 ts[j] = new Thread("" + j) { 815 final Configuration c = getConf(); 816 817 @Override 818 public void run() { 819 try { 820 cycle(id, c, sharedConnection); 821 } catch (IOException e) { 822 e.printStackTrace(); 823 } 824 } 825 }; 826 ts[j].start(); 827 } 828 for (int j = 0; j < ts.length; j++) { 829 ts[j].join(); 830 } 831 } finally { 832 sharedConnection.close(); 833 } 834 return errCode; 835 } 836 837 /** 838 * Run a client instance against a faked up server. 839 * @param args TODO 840 * @throws Exception 841 */ 842 public static void main(String[] args) throws Exception { 843 System.exit(ToolRunner.run(HBaseConfiguration.create(), new TestClientNoCluster(), args)); 844 } 845}