001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertTrue; 021import static org.junit.Assert.fail; 022 023import java.io.IOException; 024import java.net.SocketTimeoutException; 025import java.util.Comparator; 026import java.util.HashMap; 027import java.util.Map; 028import java.util.Objects; 029import java.util.Random; 030import java.util.SortedMap; 031import java.util.concurrent.CompletableFuture; 032import java.util.concurrent.ConcurrentSkipListMap; 033import java.util.concurrent.ExecutorService; 034import java.util.concurrent.Executors; 035import java.util.concurrent.atomic.AtomicInteger; 036import java.util.concurrent.atomic.AtomicLong; 037import org.apache.commons.lang3.NotImplementedException; 038import org.apache.hadoop.conf.Configuration; 039import org.apache.hadoop.conf.Configured; 040import org.apache.hadoop.hbase.CellComparatorImpl; 041import org.apache.hadoop.hbase.DoNotRetryIOException; 042import org.apache.hadoop.hbase.HBaseClassTestRule; 043import org.apache.hadoop.hbase.HBaseConfiguration; 044import org.apache.hadoop.hbase.HConstants; 045import org.apache.hadoop.hbase.HRegionInfo; 046import org.apache.hadoop.hbase.HRegionLocation; 047import org.apache.hadoop.hbase.KeyValue; 048import org.apache.hadoop.hbase.MetaCellComparator; 049import org.apache.hadoop.hbase.MetaTableAccessor; 050import org.apache.hadoop.hbase.RegionLocations; 051import org.apache.hadoop.hbase.RegionTooBusyException; 052import org.apache.hadoop.hbase.ServerName; 053import org.apache.hadoop.hbase.TableName; 054import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; 055import org.apache.hadoop.hbase.security.User; 056import org.apache.hadoop.hbase.testclassification.ClientTests; 057import org.apache.hadoop.hbase.testclassification.SmallTests; 058import org.apache.hadoop.hbase.util.Bytes; 059import org.apache.hadoop.hbase.util.Pair; 060import org.apache.hadoop.hbase.util.Threads; 061import org.apache.hadoop.util.Tool; 062import org.apache.hadoop.util.ToolRunner; 063import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 064import org.junit.Before; 065import org.junit.ClassRule; 066import org.junit.Ignore; 067import org.junit.Test; 068import org.junit.experimental.categories.Category; 069import org.mockito.Mockito; 070import org.slf4j.Logger; 071import org.slf4j.LoggerFactory; 072 073import org.apache.hbase.thirdparty.com.google.common.base.Stopwatch; 074import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 075import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 076import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 077import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; 078 079import org.apache.hadoop.hbase.shaded.protobuf.generated.CellProtos; 080import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 081import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest; 082import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileResponse; 083import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest; 084import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadResponse; 085import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 086import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService.BlockingInterface; 087import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest; 088import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse; 089import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest; 090import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse; 091import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest; 092import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse; 093import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; 094import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse; 095import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadRequest; 096import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadResponse; 097import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction; 098import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult; 099import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException; 100import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; 101import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; 102import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; 103 104/** 105 * Test client behavior w/o setting up a cluster. 106 * Mock up cluster emissions. 107 */ 108@Category({ClientTests.class, SmallTests.class}) 109public class TestClientNoCluster extends Configured implements Tool { 110 111 @ClassRule 112 public static final HBaseClassTestRule CLASS_RULE = 113 HBaseClassTestRule.forClass(TestClientNoCluster.class); 114 115 private static final Logger LOG = LoggerFactory.getLogger(TestClientNoCluster.class); 116 private Configuration conf; 117 public static final ServerName META_SERVERNAME = 118 ServerName.valueOf("meta.example.org", 16010, 12345); 119 120 @Before 121 public void setUp() throws Exception { 122 this.conf = HBaseConfiguration.create(); 123 // Run my Connection overrides. Use my little ConnectionImplementation below which 124 // allows me insert mocks and also use my Registry below rather than the default zk based 125 // one so tests run faster and don't have zk dependency. 126 this.conf.set("hbase.client.registry.impl", SimpleRegistry.class.getName()); 127 } 128 129 /** 130 * Simple cluster registry inserted in place of our usual zookeeper based one. 131 */ 132 static class SimpleRegistry extends DoNothingConnectionRegistry { 133 final ServerName META_HOST = META_SERVERNAME; 134 135 public SimpleRegistry(Configuration conf) { 136 super(conf); 137 } 138 139 @Override 140 public CompletableFuture<RegionLocations> getMetaRegionLocations() { 141 return CompletableFuture.completedFuture(new RegionLocations( 142 new HRegionLocation(RegionInfoBuilder.FIRST_META_REGIONINFO, META_HOST))); 143 } 144 145 @Override 146 public CompletableFuture<String> getClusterId() { 147 return CompletableFuture.completedFuture(HConstants.CLUSTER_ID_DEFAULT); 148 } 149 } 150 151 /** 152 * Remove the @Ignore to try out timeout and retry asettings 153 * @throws IOException 154 */ 155 @Ignore 156 @Test 157 public void testTimeoutAndRetries() throws IOException { 158 Configuration localConfig = HBaseConfiguration.create(this.conf); 159 // This override mocks up our exists/get call to throw a RegionServerStoppedException. 160 localConfig.set("hbase.client.connection.impl", RpcTimeoutConnection.class.getName()); 161 Connection connection = ConnectionFactory.createConnection(localConfig); 162 Table table = connection.getTable(TableName.META_TABLE_NAME); 163 Throwable t = null; 164 LOG.info("Start"); 165 try { 166 // An exists call turns into a get w/ a flag. 167 table.exists(new Get(Bytes.toBytes("abc"))); 168 } catch (SocketTimeoutException e) { 169 // I expect this exception. 170 LOG.info("Got expected exception", e); 171 t = e; 172 } catch (RetriesExhaustedException e) { 173 // This is the old, unwanted behavior. If we get here FAIL!!! 174 fail(); 175 } finally { 176 table.close(); 177 } 178 connection.close(); 179 LOG.info("Stop"); 180 assertTrue(t != null); 181 } 182 183 /** 184 * Test that operation timeout prevails over rpc default timeout and retries, etc. 185 * @throws IOException 186 */ 187 @Test 188 public void testRpcTimeout() throws IOException { 189 Configuration localConfig = HBaseConfiguration.create(this.conf); 190 // This override mocks up our exists/get call to throw a RegionServerStoppedException. 191 localConfig.set("hbase.client.connection.impl", RpcTimeoutConnection.class.getName()); 192 int pause = 10; 193 localConfig.setInt("hbase.client.pause", pause); 194 localConfig.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 10); 195 // Set the operation timeout to be < the pause. Expectation is that after first pause, we will 196 // fail out of the rpc because the rpc timeout will have been set to the operation tiemout 197 // and it has expired. Otherwise, if this functionality is broke, all retries will be run -- 198 // all ten of them -- and we'll get the RetriesExhaustedException exception. 199 localConfig.setInt(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT, pause - 1); 200 Connection connection = ConnectionFactory.createConnection(localConfig); 201 Table table = connection.getTable(TableName.META_TABLE_NAME); 202 Throwable t = null; 203 try { 204 // An exists call turns into a get w/ a flag. 205 table.exists(new Get(Bytes.toBytes("abc"))); 206 } catch (SocketTimeoutException e) { 207 // I expect this exception. 208 LOG.info("Got expected exception", e); 209 t = e; 210 } catch (RetriesExhaustedException e) { 211 // This is the old, unwanted behavior. If we get here FAIL!!! 212 fail(); 213 } finally { 214 table.close(); 215 connection.close(); 216 } 217 assertTrue(t != null); 218 } 219 220 @Test 221 public void testDoNotRetryMetaTableAccessor() throws IOException { 222 this.conf.set("hbase.client.connection.impl", 223 RegionServerStoppedOnScannerOpenConnection.class.getName()); 224 try (Connection connection = ConnectionFactory.createConnection(conf)) { 225 MetaTableAccessor.fullScanRegions(connection); 226 } 227 } 228 229 @Test 230 public void testDoNotRetryOnScanNext() throws IOException { 231 this.conf.set("hbase.client.connection.impl", 232 RegionServerStoppedOnScannerOpenConnection.class.getName()); 233 // Go against meta else we will try to find first region for the table on construction which 234 // means we'll have to do a bunch more mocking. Tests that go against meta only should be 235 // good for a bit of testing. 236 Connection connection = ConnectionFactory.createConnection(this.conf); 237 Table table = connection.getTable(TableName.META_TABLE_NAME); 238 ResultScanner scanner = table.getScanner(HConstants.CATALOG_FAMILY); 239 try { 240 Result result = null; 241 while ((result = scanner.next()) != null) { 242 LOG.info(Objects.toString(result)); 243 } 244 } finally { 245 scanner.close(); 246 table.close(); 247 connection.close(); 248 } 249 } 250 251 @Test 252 public void testRegionServerStoppedOnScannerOpen() throws IOException { 253 this.conf.set("hbase.client.connection.impl", 254 RegionServerStoppedOnScannerOpenConnection.class.getName()); 255 // Go against meta else we will try to find first region for the table on construction which 256 // means we'll have to do a bunch more mocking. Tests that go against meta only should be 257 // good for a bit of testing. 258 Connection connection = ConnectionFactory.createConnection(conf); 259 Table table = connection.getTable(TableName.META_TABLE_NAME); 260 ResultScanner scanner = table.getScanner(HConstants.CATALOG_FAMILY); 261 try { 262 Result result = null; 263 while ((result = scanner.next()) != null) { 264 LOG.info(Objects.toString(result)); 265 } 266 } finally { 267 scanner.close(); 268 table.close(); 269 connection.close(); 270 } 271 } 272 273 @Test 274 public void testConnectionClosedOnRegionLocate() throws IOException { 275 Configuration testConf = new Configuration(this.conf); 276 testConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); 277 // Go against meta else we will try to find first region for the table on construction which 278 // means we'll have to do a bunch more mocking. Tests that go against meta only should be 279 // good for a bit of testing. 280 Connection connection = ConnectionFactory.createConnection(testConf); 281 Table table = connection.getTable(TableName.META_TABLE_NAME); 282 connection.close(); 283 try { 284 Get get = new Get(Bytes.toBytes("dummyRow")); 285 table.get(get); 286 fail("Should have thrown DoNotRetryException but no exception thrown"); 287 } catch (Exception e) { 288 if (!(e instanceof DoNotRetryIOException)) { 289 String errMsg = 290 "Should have thrown DoNotRetryException but actually " + e.getClass().getSimpleName(); 291 LOG.error(errMsg, e); 292 fail(errMsg); 293 } 294 } finally { 295 table.close(); 296 } 297 } 298 299 /** 300 * Override to shutdown going to zookeeper for cluster id and meta location. 301 */ 302 static class RegionServerStoppedOnScannerOpenConnection 303 extends ConnectionImplementation { 304 final ClientService.BlockingInterface stub; 305 306 RegionServerStoppedOnScannerOpenConnection(Configuration conf, 307 ExecutorService pool, User user) throws IOException { 308 super(conf, pool, user); 309 // Mock up my stub so open scanner returns a scanner id and then on next, we throw 310 // exceptions for three times and then after that, we return no more to scan. 311 this.stub = Mockito.mock(ClientService.BlockingInterface.class); 312 long sid = 12345L; 313 try { 314 Mockito.when(stub.scan((RpcController)Mockito.any(), 315 (ClientProtos.ScanRequest)Mockito.any())). 316 thenReturn(ClientProtos.ScanResponse.newBuilder().setScannerId(sid).build()). 317 thenThrow(new ServiceException(new RegionServerStoppedException("From Mockito"))). 318 thenReturn(ClientProtos.ScanResponse.newBuilder().setScannerId(sid). 319 setMoreResults(false).build()); 320 } catch (ServiceException e) { 321 throw new IOException(e); 322 } 323 } 324 325 @Override 326 public BlockingInterface getClient(ServerName sn) throws IOException { 327 return this.stub; 328 } 329 } 330 331 /** 332 * Override to check we are setting rpc timeout right. 333 */ 334 static class RpcTimeoutConnection 335 extends ConnectionImplementation { 336 final ClientService.BlockingInterface stub; 337 338 RpcTimeoutConnection(Configuration conf, ExecutorService pool, User user) 339 throws IOException { 340 super(conf, pool, user); 341 // Mock up my stub so an exists call -- which turns into a get -- throws an exception 342 this.stub = Mockito.mock(ClientService.BlockingInterface.class); 343 try { 344 Mockito.when(stub.get((RpcController)Mockito.any(), 345 (ClientProtos.GetRequest)Mockito.any())). 346 thenThrow(new ServiceException(new RegionServerStoppedException("From Mockito"))); 347 } catch (ServiceException e) { 348 throw new IOException(e); 349 } 350 } 351 352 @Override 353 public BlockingInterface getClient(ServerName sn) throws IOException { 354 return this.stub; 355 } 356 } 357 358 /** 359 * Fake many regionservers and many regions on a connection implementation. 360 */ 361 static class ManyServersManyRegionsConnection 362 extends ConnectionImplementation { 363 // All access should be synchronized 364 final Map<ServerName, ClientService.BlockingInterface> serversByClient; 365 366 /** 367 * Map of faked-up rows of a 'meta table'. 368 */ 369 final SortedMap<byte [], Pair<HRegionInfo, ServerName>> meta; 370 final AtomicLong sequenceids = new AtomicLong(0); 371 private final Configuration conf; 372 373 ManyServersManyRegionsConnection(Configuration conf, 374 ExecutorService pool, User user) 375 throws IOException { 376 super(conf, pool, user); 377 int serverCount = conf.getInt("hbase.test.servers", 10); 378 this.serversByClient = new HashMap<>(serverCount); 379 this.meta = makeMeta(Bytes.toBytes( 380 conf.get("hbase.test.tablename", Bytes.toString(BIG_USER_TABLE))), 381 conf.getInt("hbase.test.regions", 100), 382 conf.getLong("hbase.test.namespace.span", 1000), 383 serverCount); 384 this.conf = conf; 385 } 386 387 @Override 388 public ClientService.BlockingInterface getClient(ServerName sn) throws IOException { 389 // if (!sn.toString().startsWith("meta")) LOG.info(sn); 390 ClientService.BlockingInterface stub = null; 391 synchronized (this.serversByClient) { 392 stub = this.serversByClient.get(sn); 393 if (stub == null) { 394 stub = new FakeServer(this.conf, meta, sequenceids); 395 this.serversByClient.put(sn, stub); 396 } 397 } 398 return stub; 399 } 400 } 401 402 static MultiResponse doMultiResponse(final SortedMap<byte [], Pair<HRegionInfo, ServerName>> meta, 403 final AtomicLong sequenceids, final MultiRequest request) { 404 // Make a response to match the request. Act like there were no failures. 405 ClientProtos.MultiResponse.Builder builder = ClientProtos.MultiResponse.newBuilder(); 406 // Per Region. 407 RegionActionResult.Builder regionActionResultBuilder = 408 RegionActionResult.newBuilder(); 409 ResultOrException.Builder roeBuilder = ResultOrException.newBuilder(); 410 for (RegionAction regionAction: request.getRegionActionList()) { 411 regionActionResultBuilder.clear(); 412 // Per Action in a Region. 413 for (ClientProtos.Action action: regionAction.getActionList()) { 414 roeBuilder.clear(); 415 // Return empty Result and proper index as result. 416 roeBuilder.setResult(ClientProtos.Result.getDefaultInstance()); 417 roeBuilder.setIndex(action.getIndex()); 418 regionActionResultBuilder.addResultOrException(roeBuilder.build()); 419 } 420 builder.addRegionActionResult(regionActionResultBuilder.build()); 421 } 422 return builder.build(); 423 } 424 425 /** 426 * Fake 'server'. 427 * Implements the ClientService responding as though it were a 'server' (presumes a new 428 * ClientService.BlockingInterface made per server). 429 */ 430 static class FakeServer implements ClientService.BlockingInterface { 431 private AtomicInteger multiInvocationsCount = new AtomicInteger(0); 432 private final SortedMap<byte [], Pair<HRegionInfo, ServerName>> meta; 433 private final AtomicLong sequenceids; 434 private final long multiPause; 435 private final int tooManyMultiRequests; 436 437 FakeServer(final Configuration c, final SortedMap<byte [], Pair<HRegionInfo, ServerName>> meta, 438 final AtomicLong sequenceids) { 439 this.meta = meta; 440 this.sequenceids = sequenceids; 441 442 // Pause to simulate the server taking time applying the edits. This will drive up the 443 // number of threads used over in client. 444 this.multiPause = c.getLong("hbase.test.multi.pause.when.done", 0); 445 this.tooManyMultiRequests = c.getInt("hbase.test.multi.too.many", 3); 446 } 447 448 @Override 449 public GetResponse get(RpcController controller, GetRequest request) 450 throws ServiceException { 451 boolean metaRegion = isMetaRegion(request.getRegion().getValue().toByteArray(), 452 request.getRegion().getType()); 453 if (!metaRegion) { 454 return doGetResponse(request); 455 } 456 return doMetaGetResponse(meta, request); 457 } 458 459 private GetResponse doGetResponse(GetRequest request) { 460 ClientProtos.Result.Builder resultBuilder = ClientProtos.Result.newBuilder(); 461 ByteString row = request.getGet().getRow(); 462 resultBuilder.addCell(getStartCode(row)); 463 GetResponse.Builder builder = GetResponse.newBuilder(); 464 builder.setResult(resultBuilder.build()); 465 return builder.build(); 466 } 467 468 @Override 469 public MutateResponse mutate(RpcController controller, 470 MutateRequest request) throws ServiceException { 471 throw new NotImplementedException(HConstants.NOT_IMPLEMENTED); 472 } 473 474 @Override 475 public ScanResponse scan(RpcController controller, 476 ScanRequest request) throws ServiceException { 477 // Presume it is a scan of meta for now. Not all scans provide a region spec expecting 478 // the server to keep reference by scannerid. TODO. 479 return doMetaScanResponse(meta, sequenceids, request); 480 } 481 482 @Override 483 public BulkLoadHFileResponse bulkLoadHFile( 484 RpcController controller, BulkLoadHFileRequest request) 485 throws ServiceException { 486 throw new NotImplementedException(HConstants.NOT_IMPLEMENTED); 487 } 488 489 @Override 490 public CoprocessorServiceResponse execService( 491 RpcController controller, CoprocessorServiceRequest request) 492 throws ServiceException { 493 throw new NotImplementedException(HConstants.NOT_IMPLEMENTED); 494 } 495 496 @Override 497 public MultiResponse multi(RpcController controller, MultiRequest request) 498 throws ServiceException { 499 int concurrentInvocations = this.multiInvocationsCount.incrementAndGet(); 500 try { 501 if (concurrentInvocations >= tooManyMultiRequests) { 502 throw new ServiceException(new RegionTooBusyException("concurrentInvocations=" + 503 concurrentInvocations)); 504 } 505 Threads.sleep(multiPause); 506 return doMultiResponse(meta, sequenceids, request); 507 } finally { 508 this.multiInvocationsCount.decrementAndGet(); 509 } 510 } 511 512 @Override 513 public CoprocessorServiceResponse execRegionServerService(RpcController controller, 514 CoprocessorServiceRequest request) throws ServiceException { 515 throw new NotImplementedException(HConstants.NOT_IMPLEMENTED); 516 } 517 518 @Override 519 public PrepareBulkLoadResponse prepareBulkLoad(RpcController controller, 520 PrepareBulkLoadRequest request) throws ServiceException { 521 throw new NotImplementedException(HConstants.NOT_IMPLEMENTED); 522 } 523 524 @Override 525 public CleanupBulkLoadResponse cleanupBulkLoad(RpcController controller, 526 CleanupBulkLoadRequest request) throws ServiceException { 527 throw new NotImplementedException(HConstants.NOT_IMPLEMENTED); 528 } 529 } 530 531 static ScanResponse doMetaScanResponse(final SortedMap<byte [], Pair<HRegionInfo, ServerName>> meta, 532 final AtomicLong sequenceids, final ScanRequest request) { 533 ScanResponse.Builder builder = ScanResponse.newBuilder(); 534 int max = request.getNumberOfRows(); 535 int count = 0; 536 Map<byte [], Pair<HRegionInfo, ServerName>> tail = 537 request.hasScan()? meta.tailMap(request.getScan().getStartRow().toByteArray()): meta; 538 ClientProtos.Result.Builder resultBuilder = ClientProtos.Result.newBuilder(); 539 for (Map.Entry<byte [], Pair<HRegionInfo, ServerName>> e: tail.entrySet()) { 540 // Can be 0 on open of a scanner -- i.e. rpc to setup scannerid only. 541 if (max <= 0) break; 542 if (++count > max) break; 543 HRegionInfo hri = e.getValue().getFirst(); 544 ByteString row = UnsafeByteOperations.unsafeWrap(hri.getRegionName()); 545 resultBuilder.clear(); 546 resultBuilder.addCell(getRegionInfo(row, hri)); 547 resultBuilder.addCell(getServer(row, e.getValue().getSecond())); 548 resultBuilder.addCell(getStartCode(row)); 549 builder.addResults(resultBuilder.build()); 550 // Set more to false if we are on the last region in table. 551 if (hri.getEndKey().length <= 0) builder.setMoreResults(false); 552 else builder.setMoreResults(true); 553 } 554 // If no scannerid, set one. 555 builder.setScannerId(request.hasScannerId()? 556 request.getScannerId(): sequenceids.incrementAndGet()); 557 return builder.build(); 558 } 559 560 static GetResponse doMetaGetResponse(final SortedMap<byte [], Pair<HRegionInfo, ServerName>> meta, 561 final GetRequest request) { 562 ClientProtos.Result.Builder resultBuilder = ClientProtos.Result.newBuilder(); 563 ByteString row = request.getGet().getRow(); 564 Pair<HRegionInfo, ServerName> p = meta.get(row.toByteArray()); 565 if (p != null) { 566 resultBuilder.addCell(getRegionInfo(row, p.getFirst())); 567 resultBuilder.addCell(getServer(row, p.getSecond())); 568 } 569 resultBuilder.addCell(getStartCode(row)); 570 GetResponse.Builder builder = GetResponse.newBuilder(); 571 builder.setResult(resultBuilder.build()); 572 return builder.build(); 573 } 574 575 /** 576 * @param name region name or encoded region name. 577 * @param type 578 * @return True if we are dealing with a hbase:meta region. 579 */ 580 static boolean isMetaRegion(final byte [] name, final RegionSpecifierType type) { 581 switch (type) { 582 case REGION_NAME: 583 return Bytes.equals(HRegionInfo.FIRST_META_REGIONINFO.getRegionName(), name); 584 case ENCODED_REGION_NAME: 585 return Bytes.equals(HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes(), name); 586 default: throw new UnsupportedOperationException(); 587 } 588 } 589 590 private final static ByteString CATALOG_FAMILY_BYTESTRING = 591 UnsafeByteOperations.unsafeWrap(HConstants.CATALOG_FAMILY); 592 private final static ByteString REGIONINFO_QUALIFIER_BYTESTRING = 593 UnsafeByteOperations.unsafeWrap(HConstants.REGIONINFO_QUALIFIER); 594 private final static ByteString SERVER_QUALIFIER_BYTESTRING = 595 UnsafeByteOperations.unsafeWrap(HConstants.SERVER_QUALIFIER); 596 597 static CellProtos.Cell.Builder getBaseCellBuilder(final ByteString row) { 598 CellProtos.Cell.Builder cellBuilder = CellProtos.Cell.newBuilder(); 599 cellBuilder.setRow(row); 600 cellBuilder.setFamily(CATALOG_FAMILY_BYTESTRING); 601 cellBuilder.setTimestamp(System.currentTimeMillis()); 602 return cellBuilder; 603 } 604 605 static CellProtos.Cell getRegionInfo(final ByteString row, final HRegionInfo hri) { 606 CellProtos.Cell.Builder cellBuilder = getBaseCellBuilder(row); 607 cellBuilder.setQualifier(REGIONINFO_QUALIFIER_BYTESTRING); 608 cellBuilder.setValue(UnsafeByteOperations.unsafeWrap(hri.toByteArray())); 609 return cellBuilder.build(); 610 } 611 612 static CellProtos.Cell getServer(final ByteString row, final ServerName sn) { 613 CellProtos.Cell.Builder cellBuilder = getBaseCellBuilder(row); 614 cellBuilder.setQualifier(SERVER_QUALIFIER_BYTESTRING); 615 cellBuilder.setValue(ByteString.copyFromUtf8(sn.getHostAndPort())); 616 return cellBuilder.build(); 617 } 618 619 static CellProtos.Cell getStartCode(final ByteString row) { 620 CellProtos.Cell.Builder cellBuilder = getBaseCellBuilder(row); 621 cellBuilder.setQualifier(UnsafeByteOperations.unsafeWrap(HConstants.STARTCODE_QUALIFIER)); 622 // TODO: 623 cellBuilder.setValue(UnsafeByteOperations.unsafeWrap( 624 Bytes.toBytes(META_SERVERNAME.getStartcode()))); 625 return cellBuilder.build(); 626 } 627 628 private static final byte [] BIG_USER_TABLE = Bytes.toBytes("t"); 629 630 /** 631 * Format passed integer. Zero-pad. 632 * Copied from hbase-server PE class and small amendment. Make them share. 633 * @param number 634 * @return Returns zero-prefixed 10-byte wide decimal version of passed 635 * number (Does absolute in case number is negative). 636 */ 637 private static byte [] format(final long number) { 638 byte [] b = new byte[10]; 639 long d = number; 640 for (int i = b.length - 1; i >= 0; i--) { 641 b[i] = (byte)((d % 10) + '0'); 642 d /= 10; 643 } 644 return b; 645 } 646 647 /** 648 * @param count 649 * @param namespaceSpan 650 * @return <code>count</code> regions 651 */ 652 private static HRegionInfo [] makeHRegionInfos(final byte [] tableName, final int count, 653 final long namespaceSpan) { 654 byte [] startKey = HConstants.EMPTY_BYTE_ARRAY; 655 byte [] endKey = HConstants.EMPTY_BYTE_ARRAY; 656 long interval = namespaceSpan / count; 657 HRegionInfo [] hris = new HRegionInfo[count]; 658 for (int i = 0; i < count; i++) { 659 if (i == 0) { 660 endKey = format(interval); 661 } else { 662 startKey = endKey; 663 if (i == count - 1) endKey = HConstants.EMPTY_BYTE_ARRAY; 664 else endKey = format((i + 1) * interval); 665 } 666 hris[i] = new HRegionInfo(TableName.valueOf(tableName), startKey, endKey); 667 } 668 return hris; 669 } 670 671 /** 672 * @param count 673 * @return Return <code>count</code> servernames. 674 */ 675 private static ServerName [] makeServerNames(final int count) { 676 ServerName [] sns = new ServerName[count]; 677 for (int i = 0; i < count; i++) { 678 sns[i] = ServerName.valueOf("" + i + ".example.org", 16010, i); 679 } 680 return sns; 681 } 682 683 /** 684 * Comparator for meta row keys. 685 */ 686 private static class MetaRowsComparator implements Comparator<byte []> { 687 private final CellComparatorImpl delegate = MetaCellComparator.META_COMPARATOR; 688 @Override 689 public int compare(byte[] left, byte[] right) { 690 return delegate.compareRows(new KeyValue.KeyOnlyKeyValue(left), right, 0, right.length); 691 } 692 } 693 694 /** 695 * Create up a map that is keyed by meta row name and whose value is the HRegionInfo and 696 * ServerName to return for this row. 697 * @return Map with faked hbase:meta content in it. 698 */ 699 static SortedMap<byte [], Pair<HRegionInfo, ServerName>> makeMeta(final byte [] tableName, 700 final int regionCount, final long namespaceSpan, final int serverCount) { 701 // I need a comparator for meta rows so we sort properly. 702 SortedMap<byte [], Pair<HRegionInfo, ServerName>> meta = 703 new ConcurrentSkipListMap<>(new MetaRowsComparator()); 704 HRegionInfo [] hris = makeHRegionInfos(tableName, regionCount, namespaceSpan); 705 ServerName [] serverNames = makeServerNames(serverCount); 706 int per = regionCount / serverCount; 707 int count = 0; 708 for (HRegionInfo hri: hris) { 709 Pair<HRegionInfo, ServerName> p = new Pair<>(hri, serverNames[count++ / per]); 710 meta.put(hri.getRegionName(), p); 711 } 712 return meta; 713 } 714 715 /** 716 * Code for each 'client' to run. 717 * 718 * @param id 719 * @param c 720 * @param sharedConnection 721 * @throws IOException 722 */ 723 static void cycle(int id, final Configuration c, final Connection sharedConnection) throws IOException { 724 long namespaceSpan = c.getLong("hbase.test.namespace.span", 1000000); 725 long startTime = System.currentTimeMillis(); 726 final int printInterval = 100000; 727 Random rd = new Random(id); 728 boolean get = c.getBoolean("hbase.test.do.gets", false); 729 TableName tableName = TableName.valueOf(BIG_USER_TABLE); 730 if (get) { 731 try (Table table = sharedConnection.getTable(tableName)){ 732 Stopwatch stopWatch = Stopwatch.createStarted(); 733 for (int i = 0; i < namespaceSpan; i++) { 734 byte [] b = format(rd.nextLong()); 735 Get g = new Get(b); 736 table.get(g); 737 if (i % printInterval == 0) { 738 LOG.info("Get " + printInterval + "/" + stopWatch.elapsed(java.util.concurrent.TimeUnit.MILLISECONDS)); 739 stopWatch.reset(); 740 stopWatch.start(); 741 } 742 } 743 LOG.info("Finished a cycle putting " + namespaceSpan + " in " + 744 (System.currentTimeMillis() - startTime) + "ms"); 745 } 746 } else { 747 try (BufferedMutator mutator = sharedConnection.getBufferedMutator(tableName)) { 748 Stopwatch stopWatch = Stopwatch.createStarted(); 749 for (int i = 0; i < namespaceSpan; i++) { 750 byte [] b = format(rd.nextLong()); 751 Put p = new Put(b); 752 p.addColumn(HConstants.CATALOG_FAMILY, b, b); 753 mutator.mutate(p); 754 if (i % printInterval == 0) { 755 LOG.info("Put " + printInterval + "/" + stopWatch.elapsed(java.util.concurrent.TimeUnit.MILLISECONDS)); 756 stopWatch.reset(); 757 stopWatch.start(); 758 } 759 } 760 LOG.info("Finished a cycle putting " + namespaceSpan + " in " + 761 (System.currentTimeMillis() - startTime) + "ms"); 762 } 763 } 764 } 765 766 @Override 767 public int run(String[] arg0) throws Exception { 768 int errCode = 0; 769 // TODO: Make command options. 770 // How many servers to fake. 771 final int servers = 1; 772 // How many regions to put on the faked servers. 773 final int regions = 100000; 774 // How many 'keys' in the faked regions. 775 final long namespaceSpan = 50000000; 776 // How long to take to pause after doing a put; make this long if you want to fake a struggling 777 // server. 778 final long multiPause = 0; 779 // Check args make basic sense. 780 if ((namespaceSpan < regions) || (regions < servers)) { 781 throw new IllegalArgumentException("namespaceSpan=" + namespaceSpan + " must be > regions=" + 782 regions + " which must be > servers=" + servers); 783 } 784 785 // Set my many servers and many regions faking connection in place. 786 getConf().set("hbase.client.connection.impl", 787 ManyServersManyRegionsConnection.class.getName()); 788 // Use simple kv registry rather than zk 789 getConf().set("hbase.client.registry.impl", SimpleRegistry.class.getName()); 790 // When to report fails. Default is we report the 10th. This means we'll see log everytime 791 // an exception is thrown -- usually RegionTooBusyException when we have more than 792 // hbase.test.multi.too.many requests outstanding at any time. 793 getConf().setInt("hbase.client.start.log.errors.counter", 0); 794 795 // Ugly but this is only way to pass in configs.into ManyServersManyRegionsConnection class. 796 getConf().setInt("hbase.test.regions", regions); 797 getConf().setLong("hbase.test.namespace.span", namespaceSpan); 798 getConf().setLong("hbase.test.servers", servers); 799 getConf().set("hbase.test.tablename", Bytes.toString(BIG_USER_TABLE)); 800 getConf().setLong("hbase.test.multi.pause.when.done", multiPause); 801 // Let there be ten outstanding requests at a time before we throw RegionBusyException. 802 getConf().setInt("hbase.test.multi.too.many", 10); 803 final int clients = 2; 804 805 // Have them all share the same connection so they all share the same instance of 806 // ManyServersManyRegionsConnection so I can keep an eye on how many requests by server. 807 final ExecutorService pool = Executors.newCachedThreadPool( 808 new ThreadFactoryBuilder().setNameFormat("p-pool-%d") 809 .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); 810 // Executors.newFixedThreadPool(servers * 10, Threads.getNamedThreadFactory("p")); 811 // Share a connection so I can keep counts in the 'server' on concurrency. 812 final Connection sharedConnection = ConnectionFactory.createConnection(getConf()/*, pool*/); 813 try { 814 Thread [] ts = new Thread[clients]; 815 for (int j = 0; j < ts.length; j++) { 816 final int id = j; 817 ts[j] = new Thread("" + j) { 818 final Configuration c = getConf(); 819 820 @Override 821 public void run() { 822 try { 823 cycle(id, c, sharedConnection); 824 } catch (IOException e) { 825 e.printStackTrace(); 826 } 827 } 828 }; 829 ts[j].start(); 830 } 831 for (int j = 0; j < ts.length; j++) { 832 ts[j].join(); 833 } 834 } finally { 835 sharedConnection.close(); 836 } 837 return errCode; 838 } 839 840 /** 841 * Run a client instance against a faked up server. 842 * @param args TODO 843 * @throws Exception 844 */ 845 public static void main(String[] args) throws Exception { 846 System.exit(ToolRunner.run(HBaseConfiguration.create(), new TestClientNoCluster(), args)); 847 } 848}