001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertTrue; 021import static org.junit.Assert.fail; 022 023import java.io.IOException; 024import java.net.SocketTimeoutException; 025import java.util.Comparator; 026import java.util.HashMap; 027import java.util.Map; 028import java.util.Objects; 029import java.util.Random; 030import java.util.SortedMap; 031import java.util.concurrent.CompletableFuture; 032import java.util.concurrent.ConcurrentSkipListMap; 033import java.util.concurrent.ExecutionException; 034import java.util.concurrent.ExecutorService; 035import java.util.concurrent.atomic.AtomicInteger; 036import java.util.concurrent.atomic.AtomicLong; 037import org.apache.commons.lang3.NotImplementedException; 038import org.apache.hadoop.conf.Configuration; 039import org.apache.hadoop.conf.Configured; 040import org.apache.hadoop.hbase.CellComparatorImpl; 041import org.apache.hadoop.hbase.DoNotRetryIOException; 042import org.apache.hadoop.hbase.HBaseClassTestRule; 043import org.apache.hadoop.hbase.HBaseConfiguration; 044import org.apache.hadoop.hbase.HConstants; 045import org.apache.hadoop.hbase.HRegionInfo; 046import org.apache.hadoop.hbase.HRegionLocation; 047import org.apache.hadoop.hbase.KeyValue; 048import org.apache.hadoop.hbase.MetaCellComparator; 049import org.apache.hadoop.hbase.MetaTableAccessor; 050import org.apache.hadoop.hbase.RegionLocations; 051import org.apache.hadoop.hbase.RegionTooBusyException; 052import org.apache.hadoop.hbase.ServerName; 053import org.apache.hadoop.hbase.TableName; 054import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; 055import org.apache.hadoop.hbase.security.User; 056import org.apache.hadoop.hbase.testclassification.ClientTests; 057import org.apache.hadoop.hbase.testclassification.SmallTests; 058import org.apache.hadoop.hbase.util.Bytes; 059import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 060import org.apache.hadoop.hbase.util.Pair; 061import org.apache.hadoop.hbase.util.Threads; 062import org.apache.hadoop.util.Tool; 063import org.apache.hadoop.util.ToolRunner; 064import org.junit.Before; 065import org.junit.ClassRule; 066import org.junit.Ignore; 067import org.junit.Test; 068import org.junit.experimental.categories.Category; 069import org.mockito.Mockito; 070import org.slf4j.Logger; 071import org.slf4j.LoggerFactory; 072 073import org.apache.hbase.thirdparty.com.google.common.base.Stopwatch; 074import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 075import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 076import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 077import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; 078 079import org.apache.hadoop.hbase.shaded.protobuf.generated.CellProtos; 080import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 081import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest; 082import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileResponse; 083import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest; 084import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadResponse; 085import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 086import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService.BlockingInterface; 087import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest; 088import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse; 089import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest; 090import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse; 091import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest; 092import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse; 093import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; 094import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse; 095import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadRequest; 096import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadResponse; 097import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction; 098import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult; 099import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException; 100import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; 101import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; 102import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; 103 104/** 105 * Test client behavior w/o setting up a cluster. Mock up cluster emissions. See below for a method 106 * that tests retries/timeouts currently commented out. 107 */ 108@Category({ ClientTests.class, SmallTests.class }) 109public class TestClientNoCluster extends Configured implements Tool { 110 111 @ClassRule 112 public static final HBaseClassTestRule CLASS_RULE = 113 HBaseClassTestRule.forClass(TestClientNoCluster.class); 114 115 private static final Logger LOG = LoggerFactory.getLogger(TestClientNoCluster.class); 116 private Configuration conf; 117 /** 118 * A server that does not exist. I've changed the server in the below to 'localhost' so we have a 119 * servername that resolves -- otherwise, we just fail on server name lookup with UnknownHost... 120 * With localhost, was able to reproduce stack traces that looked like production stack traces. 121 * Was useful figuring out how retry/timeouts are functioning. 122 */ 123 public static final ServerName META_SERVERNAME = 124 ServerName.valueOf("meta.example.org", 16010, 12345); 125 126 @Before 127 public void setUp() throws Exception { 128 this.conf = HBaseConfiguration.create(); 129 // Run my Connection overrides. Use my little ConnectionImplementation below which 130 // allows me insert mocks and also use my Registry below rather than the default zk based 131 // one so tests run faster and don't have zk dependency. 132 this.conf.set("hbase.client.registry.impl", SimpleRegistry.class.getName()); 133 } 134 135 /** 136 * Simple cluster registry inserted in place of our usual zookeeper based one. 137 */ 138 static class SimpleRegistry extends DoNothingConnectionRegistry { 139 final ServerName META_HOST = META_SERVERNAME; 140 141 public SimpleRegistry(Configuration conf) { 142 super(conf); 143 } 144 145 @Override 146 public CompletableFuture<RegionLocations> getMetaRegionLocations() { 147 return CompletableFuture.completedFuture(new RegionLocations( 148 new HRegionLocation(RegionInfoBuilder.FIRST_META_REGIONINFO, META_HOST))); 149 } 150 151 @Override 152 public CompletableFuture<String> getClusterId() { 153 return CompletableFuture.completedFuture(HConstants.CLUSTER_ID_DEFAULT); 154 } 155 } 156 157 /** 158 * Remove the @Ignore to try out timeout and retry settings 159 */ 160 @Ignore 161 @Test 162 public void testTimeoutAndRetries() throws IOException { 163 Configuration localConfig = HBaseConfiguration.create(this.conf); 164 // This override mocks up our exists/get call to throw a RegionServerStoppedException. 165 localConfig.set("hbase.client.connection.impl", RpcTimeoutConnection.class.getName()); 166 // localConfig.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 7); 167 Connection connection = ConnectionFactory.createConnection(localConfig); 168 Table table = connection.getTable(TableName.META_TABLE_NAME); 169 Throwable t = null; 170 LOG.info("Start"); 171 try { 172 // An exists call turns into a get w/ a flag. 173 table.exists(new Get(Bytes.toBytes("abc"))); 174 } catch (SocketTimeoutException e) { 175 // I expect this exception. 176 LOG.info("Got expected exception", e); 177 t = e; 178 } finally { 179 table.close(); 180 } 181 connection.close(); 182 LOG.info("Stop"); 183 assertTrue(t != null); 184 } 185 186 /** 187 * Remove the @Ignore to try out timeout and retry settings 188 */ 189 // @Ignore 190 @Test 191 public void testAsyncTimeoutAndRetries() 192 throws IOException, ExecutionException, InterruptedException { 193 Configuration localConfig = HBaseConfiguration.create(this.conf); 194 localConfig.set(ConnectionFactory.HBASE_CLIENT_ASYNC_CONNECTION_IMPL, 195 RpcTimeoutAsyncConnection.class.getName()); 196 localConfig.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 9); 197 AsyncConnection connection = ConnectionFactory.createAsyncConnection(localConfig).get(); 198 AsyncTable table = connection.getTable(TableName.META_TABLE_NAME); 199 Throwable t = null; 200 LOG.info("Start"); 201 try { 202 // An exists call turns into a get w/ a flag. 203 table.exists(new Get(Bytes.toBytes("abc"))).get(); 204 } catch (Throwable throwable) { 205 // What to catch? 206 t = throwable; 207 } finally { 208 connection.close(); 209 } 210 LOG.info("Stop"); 211 assertTrue(t != null); 212 } 213 214 /** 215 * Test that operation timeout prevails over rpc default timeout and retries, etc. n 216 */ 217 @Test 218 public void testRpcTimeout() throws IOException { 219 Configuration localConfig = HBaseConfiguration.create(this.conf); 220 // This override mocks up our exists/get call to throw a RegionServerStoppedException. 221 localConfig.set("hbase.client.connection.impl", RpcTimeoutConnection.class.getName()); 222 int pause = 10; 223 localConfig.setInt("hbase.client.pause", pause); 224 localConfig.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 10); 225 // Set the operation timeout to be < the pause. Expectation is that after first pause, we will 226 // fail out of the rpc because the rpc timeout will have been set to the operation tiemout 227 // and it has expired. Otherwise, if this functionality is broke, all retries will be run -- 228 // all ten of them -- and we'll get the RetriesExhaustedException exception. 229 localConfig.setInt(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT, pause - 1); 230 Connection connection = ConnectionFactory.createConnection(localConfig); 231 Table table = connection.getTable(TableName.META_TABLE_NAME); 232 Throwable t = null; 233 try { 234 // An exists call turns into a get w/ a flag. 235 table.exists(new Get(Bytes.toBytes("abc"))); 236 } catch (SocketTimeoutException e) { 237 // I expect this exception. 238 LOG.info("Got expected exception", e); 239 t = e; 240 } finally { 241 table.close(); 242 connection.close(); 243 } 244 assertTrue(t != null); 245 } 246 247 @Test 248 public void testDoNotRetryMetaTableAccessor() throws IOException { 249 this.conf.set("hbase.client.connection.impl", 250 RegionServerStoppedOnScannerOpenConnection.class.getName()); 251 try (Connection connection = ConnectionFactory.createConnection(conf)) { 252 MetaTableAccessor.fullScanRegions(connection); 253 } 254 } 255 256 @Test 257 public void testDoNotRetryOnScanNext() throws IOException { 258 this.conf.set("hbase.client.connection.impl", 259 RegionServerStoppedOnScannerOpenConnection.class.getName()); 260 // Go against meta else we will try to find first region for the table on construction which 261 // means we'll have to do a bunch more mocking. Tests that go against meta only should be 262 // good for a bit of testing. 263 Connection connection = ConnectionFactory.createConnection(this.conf); 264 Table table = connection.getTable(TableName.META_TABLE_NAME); 265 ResultScanner scanner = table.getScanner(HConstants.CATALOG_FAMILY); 266 try { 267 Result result = null; 268 while ((result = scanner.next()) != null) { 269 LOG.info(Objects.toString(result)); 270 } 271 } finally { 272 scanner.close(); 273 table.close(); 274 connection.close(); 275 } 276 } 277 278 @Test 279 public void testRegionServerStoppedOnScannerOpen() throws IOException { 280 this.conf.set("hbase.client.connection.impl", 281 RegionServerStoppedOnScannerOpenConnection.class.getName()); 282 // Go against meta else we will try to find first region for the table on construction which 283 // means we'll have to do a bunch more mocking. Tests that go against meta only should be 284 // good for a bit of testing. 285 Connection connection = ConnectionFactory.createConnection(conf); 286 Table table = connection.getTable(TableName.META_TABLE_NAME); 287 ResultScanner scanner = table.getScanner(HConstants.CATALOG_FAMILY); 288 try { 289 Result result = null; 290 while ((result = scanner.next()) != null) { 291 LOG.info(Objects.toString(result)); 292 } 293 } finally { 294 scanner.close(); 295 table.close(); 296 connection.close(); 297 } 298 } 299 300 @Test 301 public void testConnectionClosedOnRegionLocate() throws IOException { 302 Configuration testConf = new Configuration(this.conf); 303 testConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); 304 // Go against meta else we will try to find first region for the table on construction which 305 // means we'll have to do a bunch more mocking. Tests that go against meta only should be 306 // good for a bit of testing. 307 Connection connection = ConnectionFactory.createConnection(testConf); 308 Table table = connection.getTable(TableName.META_TABLE_NAME); 309 connection.close(); 310 try { 311 Get get = new Get(Bytes.toBytes("dummyRow")); 312 table.get(get); 313 fail("Should have thrown DoNotRetryException but no exception thrown"); 314 } catch (Exception e) { 315 if (!(e instanceof DoNotRetryIOException)) { 316 String errMsg = 317 "Should have thrown DoNotRetryException but actually " + e.getClass().getSimpleName(); 318 LOG.error(errMsg, e); 319 fail(errMsg); 320 } 321 } finally { 322 table.close(); 323 } 324 } 325 326 /** 327 * Override to shutdown going to zookeeper for cluster id and meta location. 328 */ 329 static class RegionServerStoppedOnScannerOpenConnection extends ConnectionImplementation { 330 final ClientService.BlockingInterface stub; 331 332 RegionServerStoppedOnScannerOpenConnection(Configuration conf, ExecutorService pool, User user) 333 throws IOException { 334 super(conf, pool, user); 335 // Mock up my stub so open scanner returns a scanner id and then on next, we throw 336 // exceptions for three times and then after that, we return no more to scan. 337 this.stub = Mockito.mock(ClientService.BlockingInterface.class); 338 long sid = 12345L; 339 try { 340 Mockito 341 .when(stub.scan((RpcController) Mockito.any(), (ClientProtos.ScanRequest) Mockito.any())) 342 .thenReturn(ClientProtos.ScanResponse.newBuilder().setScannerId(sid).build()) 343 .thenThrow(new ServiceException(new RegionServerStoppedException("From Mockito"))) 344 .thenReturn( 345 ClientProtos.ScanResponse.newBuilder().setScannerId(sid).setMoreResults(false).build()); 346 } catch (ServiceException e) { 347 throw new IOException(e); 348 } 349 } 350 351 @Override 352 public BlockingInterface getClient(ServerName sn) throws IOException { 353 return this.stub; 354 } 355 } 356 357 /** 358 * Override to check we are setting rpc timeout right. 359 */ 360 static class RpcTimeoutConnection extends ConnectionImplementation { 361 final ClientService.BlockingInterface stub; 362 363 RpcTimeoutConnection(Configuration conf, ExecutorService pool, User user) throws IOException { 364 super(conf, pool, user); 365 // Mock up my stub so an exists call -- which turns into a get -- throws an exception 366 this.stub = Mockito.mock(ClientService.BlockingInterface.class); 367 try { 368 Mockito 369 .when(stub.get((RpcController) Mockito.any(), (ClientProtos.GetRequest) Mockito.any())) 370 .thenThrow(new ServiceException(new java.net.ConnectException("Connection refused"))); 371 } catch (ServiceException e) { 372 throw new IOException(e); 373 } 374 } 375 376 @Override 377 public BlockingInterface getClient(ServerName sn) throws IOException { 378 return this.stub; 379 } 380 } 381 382 /** 383 * Override to check we are setting rpc timeout right. 384 */ 385 static class RpcTimeoutAsyncConnection extends AsyncConnectionImpl { 386 RpcTimeoutAsyncConnection(Configuration configuration, ConnectionRegistry registry, 387 String clusterId, User user) { 388 super(configuration, registry, clusterId, user); 389 } 390 } 391 392 /** 393 * Fake many regionservers and many regions on a connection implementation. 394 */ 395 static class ManyServersManyRegionsConnection extends ConnectionImplementation { 396 // All access should be synchronized 397 final Map<ServerName, ClientService.BlockingInterface> serversByClient; 398 399 /** 400 * Map of faked-up rows of a 'meta table'. 401 */ 402 final SortedMap<byte[], Pair<HRegionInfo, ServerName>> meta; 403 final AtomicLong sequenceids = new AtomicLong(0); 404 private final Configuration conf; 405 406 ManyServersManyRegionsConnection(Configuration conf, ExecutorService pool, User user) 407 throws IOException { 408 super(conf, pool, user); 409 int serverCount = conf.getInt("hbase.test.servers", 10); 410 this.serversByClient = new HashMap<>(serverCount); 411 this.meta = 412 makeMeta(Bytes.toBytes(conf.get("hbase.test.tablename", Bytes.toString(BIG_USER_TABLE))), 413 conf.getInt("hbase.test.regions", 100), conf.getLong("hbase.test.namespace.span", 1000), 414 serverCount); 415 this.conf = conf; 416 } 417 418 @Override 419 public ClientService.BlockingInterface getClient(ServerName sn) throws IOException { 420 // if (!sn.toString().startsWith("meta")) LOG.info(sn); 421 ClientService.BlockingInterface stub = null; 422 synchronized (this.serversByClient) { 423 stub = this.serversByClient.get(sn); 424 if (stub == null) { 425 stub = new FakeServer(this.conf, meta, sequenceids); 426 this.serversByClient.put(sn, stub); 427 } 428 } 429 return stub; 430 } 431 } 432 433 static MultiResponse doMultiResponse(final SortedMap<byte[], Pair<HRegionInfo, ServerName>> meta, 434 final AtomicLong sequenceids, final MultiRequest request) { 435 // Make a response to match the request. Act like there were no failures. 436 ClientProtos.MultiResponse.Builder builder = ClientProtos.MultiResponse.newBuilder(); 437 // Per Region. 438 RegionActionResult.Builder regionActionResultBuilder = RegionActionResult.newBuilder(); 439 ResultOrException.Builder roeBuilder = ResultOrException.newBuilder(); 440 for (RegionAction regionAction : request.getRegionActionList()) { 441 regionActionResultBuilder.clear(); 442 // Per Action in a Region. 443 for (ClientProtos.Action action : regionAction.getActionList()) { 444 roeBuilder.clear(); 445 // Return empty Result and proper index as result. 446 roeBuilder.setResult(ClientProtos.Result.getDefaultInstance()); 447 roeBuilder.setIndex(action.getIndex()); 448 regionActionResultBuilder.addResultOrException(roeBuilder.build()); 449 } 450 builder.addRegionActionResult(regionActionResultBuilder.build()); 451 } 452 return builder.build(); 453 } 454 455 /** 456 * Fake 'server'. Implements the ClientService responding as though it were a 'server' (presumes a 457 * new ClientService.BlockingInterface made per server). 458 */ 459 static class FakeServer implements ClientService.BlockingInterface { 460 private AtomicInteger multiInvocationsCount = new AtomicInteger(0); 461 private final SortedMap<byte[], Pair<HRegionInfo, ServerName>> meta; 462 private final AtomicLong sequenceids; 463 private final long multiPause; 464 private final int tooManyMultiRequests; 465 466 FakeServer(final Configuration c, final SortedMap<byte[], Pair<HRegionInfo, ServerName>> meta, 467 final AtomicLong sequenceids) { 468 this.meta = meta; 469 this.sequenceids = sequenceids; 470 471 // Pause to simulate the server taking time applying the edits. This will drive up the 472 // number of threads used over in client. 473 this.multiPause = c.getLong("hbase.test.multi.pause.when.done", 0); 474 this.tooManyMultiRequests = c.getInt("hbase.test.multi.too.many", 3); 475 } 476 477 @Override 478 public GetResponse get(RpcController controller, GetRequest request) throws ServiceException { 479 boolean metaRegion = 480 isMetaRegion(request.getRegion().getValue().toByteArray(), request.getRegion().getType()); 481 if (!metaRegion) { 482 return doGetResponse(request); 483 } 484 return doMetaGetResponse(meta, request); 485 } 486 487 private GetResponse doGetResponse(GetRequest request) { 488 ClientProtos.Result.Builder resultBuilder = ClientProtos.Result.newBuilder(); 489 ByteString row = request.getGet().getRow(); 490 resultBuilder.addCell(getStartCode(row)); 491 GetResponse.Builder builder = GetResponse.newBuilder(); 492 builder.setResult(resultBuilder.build()); 493 return builder.build(); 494 } 495 496 @Override 497 public MutateResponse mutate(RpcController controller, MutateRequest request) 498 throws ServiceException { 499 throw new NotImplementedException(HConstants.NOT_IMPLEMENTED); 500 } 501 502 @Override 503 public ScanResponse scan(RpcController controller, ScanRequest request) 504 throws ServiceException { 505 // Presume it is a scan of meta for now. Not all scans provide a region spec expecting 506 // the server to keep reference by scannerid. TODO. 507 return doMetaScanResponse(meta, sequenceids, request); 508 } 509 510 @Override 511 public BulkLoadHFileResponse bulkLoadHFile(RpcController controller, 512 BulkLoadHFileRequest request) throws ServiceException { 513 throw new NotImplementedException(HConstants.NOT_IMPLEMENTED); 514 } 515 516 @Override 517 public CoprocessorServiceResponse execService(RpcController controller, 518 CoprocessorServiceRequest request) throws ServiceException { 519 throw new NotImplementedException(HConstants.NOT_IMPLEMENTED); 520 } 521 522 @Override 523 public MultiResponse multi(RpcController controller, MultiRequest request) 524 throws ServiceException { 525 int concurrentInvocations = this.multiInvocationsCount.incrementAndGet(); 526 try { 527 if (concurrentInvocations >= tooManyMultiRequests) { 528 throw new ServiceException( 529 new RegionTooBusyException("concurrentInvocations=" + concurrentInvocations)); 530 } 531 Threads.sleep(multiPause); 532 return doMultiResponse(meta, sequenceids, request); 533 } finally { 534 this.multiInvocationsCount.decrementAndGet(); 535 } 536 } 537 538 @Override 539 public CoprocessorServiceResponse execRegionServerService(RpcController controller, 540 CoprocessorServiceRequest request) throws ServiceException { 541 throw new NotImplementedException(HConstants.NOT_IMPLEMENTED); 542 } 543 544 @Override 545 public PrepareBulkLoadResponse prepareBulkLoad(RpcController controller, 546 PrepareBulkLoadRequest request) throws ServiceException { 547 throw new NotImplementedException(HConstants.NOT_IMPLEMENTED); 548 } 549 550 @Override 551 public CleanupBulkLoadResponse cleanupBulkLoad(RpcController controller, 552 CleanupBulkLoadRequest request) throws ServiceException { 553 throw new NotImplementedException(HConstants.NOT_IMPLEMENTED); 554 } 555 } 556 557 static ScanResponse doMetaScanResponse( 558 final SortedMap<byte[], Pair<HRegionInfo, ServerName>> meta, final AtomicLong sequenceids, 559 final ScanRequest request) { 560 ScanResponse.Builder builder = ScanResponse.newBuilder(); 561 int max = request.getNumberOfRows(); 562 int count = 0; 563 Map<byte[], Pair<HRegionInfo, ServerName>> tail = 564 request.hasScan() ? meta.tailMap(request.getScan().getStartRow().toByteArray()) : meta; 565 ClientProtos.Result.Builder resultBuilder = ClientProtos.Result.newBuilder(); 566 for (Map.Entry<byte[], Pair<HRegionInfo, ServerName>> e : tail.entrySet()) { 567 // Can be 0 on open of a scanner -- i.e. rpc to setup scannerid only. 568 if (max <= 0) break; 569 if (++count > max) break; 570 HRegionInfo hri = e.getValue().getFirst(); 571 ByteString row = UnsafeByteOperations.unsafeWrap(hri.getRegionName()); 572 resultBuilder.clear(); 573 resultBuilder.addCell(getRegionInfo(row, hri)); 574 resultBuilder.addCell(getServer(row, e.getValue().getSecond())); 575 resultBuilder.addCell(getStartCode(row)); 576 builder.addResults(resultBuilder.build()); 577 // Set more to false if we are on the last region in table. 578 if (hri.getEndKey().length <= 0) builder.setMoreResults(false); 579 else builder.setMoreResults(true); 580 } 581 // If no scannerid, set one. 582 builder.setScannerId( 583 request.hasScannerId() ? request.getScannerId() : sequenceids.incrementAndGet()); 584 return builder.build(); 585 } 586 587 static GetResponse doMetaGetResponse(final SortedMap<byte[], Pair<HRegionInfo, ServerName>> meta, 588 final GetRequest request) { 589 ClientProtos.Result.Builder resultBuilder = ClientProtos.Result.newBuilder(); 590 ByteString row = request.getGet().getRow(); 591 Pair<HRegionInfo, ServerName> p = meta.get(row.toByteArray()); 592 if (p != null) { 593 resultBuilder.addCell(getRegionInfo(row, p.getFirst())); 594 resultBuilder.addCell(getServer(row, p.getSecond())); 595 } 596 resultBuilder.addCell(getStartCode(row)); 597 GetResponse.Builder builder = GetResponse.newBuilder(); 598 builder.setResult(resultBuilder.build()); 599 return builder.build(); 600 } 601 602 /** 603 * @param name region name or encoded region name. n * @return True if we are dealing with a 604 * hbase:meta region. 605 */ 606 static boolean isMetaRegion(final byte[] name, final RegionSpecifierType type) { 607 switch (type) { 608 case REGION_NAME: 609 return Bytes.equals(HRegionInfo.FIRST_META_REGIONINFO.getRegionName(), name); 610 case ENCODED_REGION_NAME: 611 return Bytes.equals(HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes(), name); 612 default: 613 throw new UnsupportedOperationException(); 614 } 615 } 616 617 private final static ByteString CATALOG_FAMILY_BYTESTRING = 618 UnsafeByteOperations.unsafeWrap(HConstants.CATALOG_FAMILY); 619 private final static ByteString REGIONINFO_QUALIFIER_BYTESTRING = 620 UnsafeByteOperations.unsafeWrap(HConstants.REGIONINFO_QUALIFIER); 621 private final static ByteString SERVER_QUALIFIER_BYTESTRING = 622 UnsafeByteOperations.unsafeWrap(HConstants.SERVER_QUALIFIER); 623 624 static CellProtos.Cell.Builder getBaseCellBuilder(final ByteString row) { 625 CellProtos.Cell.Builder cellBuilder = CellProtos.Cell.newBuilder(); 626 cellBuilder.setRow(row); 627 cellBuilder.setFamily(CATALOG_FAMILY_BYTESTRING); 628 cellBuilder.setTimestamp(EnvironmentEdgeManager.currentTime()); 629 return cellBuilder; 630 } 631 632 static CellProtos.Cell getRegionInfo(final ByteString row, final HRegionInfo hri) { 633 CellProtos.Cell.Builder cellBuilder = getBaseCellBuilder(row); 634 cellBuilder.setQualifier(REGIONINFO_QUALIFIER_BYTESTRING); 635 cellBuilder.setValue(UnsafeByteOperations.unsafeWrap(hri.toByteArray())); 636 return cellBuilder.build(); 637 } 638 639 static CellProtos.Cell getServer(final ByteString row, final ServerName sn) { 640 CellProtos.Cell.Builder cellBuilder = getBaseCellBuilder(row); 641 cellBuilder.setQualifier(SERVER_QUALIFIER_BYTESTRING); 642 cellBuilder.setValue(ByteString.copyFromUtf8(sn.getAddress().toString())); 643 return cellBuilder.build(); 644 } 645 646 static CellProtos.Cell getStartCode(final ByteString row) { 647 CellProtos.Cell.Builder cellBuilder = getBaseCellBuilder(row); 648 cellBuilder.setQualifier(UnsafeByteOperations.unsafeWrap(HConstants.STARTCODE_QUALIFIER)); 649 // TODO: 650 cellBuilder 651 .setValue(UnsafeByteOperations.unsafeWrap(Bytes.toBytes(META_SERVERNAME.getStartcode()))); 652 return cellBuilder.build(); 653 } 654 655 private static final byte[] BIG_USER_TABLE = Bytes.toBytes("t"); 656 657 /** 658 * Format passed integer. Zero-pad. Copied from hbase-server PE class and small amendment. Make 659 * them share. n * @return Returns zero-prefixed 10-byte wide decimal version of passed number 660 * (Does absolute in case number is negative). 661 */ 662 private static byte[] format(final long number) { 663 byte[] b = new byte[10]; 664 long d = number; 665 for (int i = b.length - 1; i >= 0; i--) { 666 b[i] = (byte) ((d % 10) + '0'); 667 d /= 10; 668 } 669 return b; 670 } 671 672 /** 673 * nn * @return <code>count</code> regions 674 */ 675 private static HRegionInfo[] makeHRegionInfos(final byte[] tableName, final int count, 676 final long namespaceSpan) { 677 byte[] startKey = HConstants.EMPTY_BYTE_ARRAY; 678 byte[] endKey = HConstants.EMPTY_BYTE_ARRAY; 679 long interval = namespaceSpan / count; 680 HRegionInfo[] hris = new HRegionInfo[count]; 681 for (int i = 0; i < count; i++) { 682 if (i == 0) { 683 endKey = format(interval); 684 } else { 685 startKey = endKey; 686 if (i == count - 1) endKey = HConstants.EMPTY_BYTE_ARRAY; 687 else endKey = format((i + 1) * interval); 688 } 689 hris[i] = new HRegionInfo(TableName.valueOf(tableName), startKey, endKey); 690 } 691 return hris; 692 } 693 694 /** 695 * n * @return Return <code>count</code> servernames. 696 */ 697 private static ServerName[] makeServerNames(final int count) { 698 ServerName[] sns = new ServerName[count]; 699 for (int i = 0; i < count; i++) { 700 sns[i] = ServerName.valueOf("" + i + ".example.org", 16010, i); 701 } 702 return sns; 703 } 704 705 /** 706 * Comparator for meta row keys. 707 */ 708 private static class MetaRowsComparator implements Comparator<byte[]> { 709 private final CellComparatorImpl delegate = MetaCellComparator.META_COMPARATOR; 710 711 @Override 712 public int compare(byte[] left, byte[] right) { 713 return delegate.compareRows(new KeyValue.KeyOnlyKeyValue(left), right, 0, right.length); 714 } 715 } 716 717 /** 718 * Create up a map that is keyed by meta row name and whose value is the HRegionInfo and 719 * ServerName to return for this row. 720 * @return Map with faked hbase:meta content in it. 721 */ 722 static SortedMap<byte[], Pair<HRegionInfo, ServerName>> makeMeta(final byte[] tableName, 723 final int regionCount, final long namespaceSpan, final int serverCount) { 724 // I need a comparator for meta rows so we sort properly. 725 SortedMap<byte[], Pair<HRegionInfo, ServerName>> meta = 726 new ConcurrentSkipListMap<>(new MetaRowsComparator()); 727 HRegionInfo[] hris = makeHRegionInfos(tableName, regionCount, namespaceSpan); 728 ServerName[] serverNames = makeServerNames(serverCount); 729 int per = regionCount / serverCount; 730 int count = 0; 731 for (HRegionInfo hri : hris) { 732 Pair<HRegionInfo, ServerName> p = new Pair<>(hri, serverNames[count++ / per]); 733 meta.put(hri.getRegionName(), p); 734 } 735 return meta; 736 } 737 738 /** 739 * Code for each 'client' to run. nnnn 740 */ 741 static void cycle(int id, final Configuration c, final Connection sharedConnection) 742 throws IOException { 743 long namespaceSpan = c.getLong("hbase.test.namespace.span", 1000000); 744 long startTime = EnvironmentEdgeManager.currentTime(); 745 final int printInterval = 100000; 746 Random rd = new Random(id); 747 boolean get = c.getBoolean("hbase.test.do.gets", false); 748 TableName tableName = TableName.valueOf(BIG_USER_TABLE); 749 if (get) { 750 try (Table table = sharedConnection.getTable(tableName)) { 751 Stopwatch stopWatch = Stopwatch.createStarted(); 752 for (int i = 0; i < namespaceSpan; i++) { 753 byte[] b = format(rd.nextLong()); 754 Get g = new Get(b); 755 table.get(g); 756 if (i % printInterval == 0) { 757 LOG.info("Get " + printInterval + "/" 758 + stopWatch.elapsed(java.util.concurrent.TimeUnit.MILLISECONDS)); 759 stopWatch.reset(); 760 stopWatch.start(); 761 } 762 } 763 LOG.info("Finished a cycle putting " + namespaceSpan + " in " 764 + (EnvironmentEdgeManager.currentTime() - startTime) + "ms"); 765 } 766 } else { 767 try (BufferedMutator mutator = sharedConnection.getBufferedMutator(tableName)) { 768 Stopwatch stopWatch = Stopwatch.createStarted(); 769 for (int i = 0; i < namespaceSpan; i++) { 770 byte[] b = format(rd.nextLong()); 771 Put p = new Put(b); 772 p.addColumn(HConstants.CATALOG_FAMILY, b, b); 773 mutator.mutate(p); 774 if (i % printInterval == 0) { 775 LOG.info("Put " + printInterval + "/" 776 + stopWatch.elapsed(java.util.concurrent.TimeUnit.MILLISECONDS)); 777 stopWatch.reset(); 778 stopWatch.start(); 779 } 780 } 781 LOG.info("Finished a cycle putting " + namespaceSpan + " in " 782 + (EnvironmentEdgeManager.currentTime() - startTime) + "ms"); 783 } 784 } 785 } 786 787 @Override 788 public int run(String[] arg0) throws Exception { 789 int errCode = 0; 790 // TODO: Make command options. 791 // How many servers to fake. 792 final int servers = 1; 793 // How many regions to put on the faked servers. 794 final int regions = 100000; 795 // How many 'keys' in the faked regions. 796 final long namespaceSpan = 50000000; 797 // How long to take to pause after doing a put; make this long if you want to fake a struggling 798 // server. 799 final long multiPause = 0; 800 // Check args make basic sense. 801 if ((namespaceSpan < regions) || (regions < servers)) { 802 throw new IllegalArgumentException("namespaceSpan=" + namespaceSpan + " must be > regions=" 803 + regions + " which must be > servers=" + servers); 804 } 805 806 // Set my many servers and many regions faking connection in place. 807 getConf().set("hbase.client.connection.impl", ManyServersManyRegionsConnection.class.getName()); 808 // Use simple kv registry rather than zk 809 getConf().set("hbase.client.registry.impl", SimpleRegistry.class.getName()); 810 // When to report fails. Default is we report the 10th. This means we'll see log everytime 811 // an exception is thrown -- usually RegionTooBusyException when we have more than 812 // hbase.test.multi.too.many requests outstanding at any time. 813 getConf().setInt("hbase.client.start.log.errors.counter", 0); 814 815 // Ugly but this is only way to pass in configs.into ManyServersManyRegionsConnection class. 816 getConf().setInt("hbase.test.regions", regions); 817 getConf().setLong("hbase.test.namespace.span", namespaceSpan); 818 getConf().setLong("hbase.test.servers", servers); 819 getConf().set("hbase.test.tablename", Bytes.toString(BIG_USER_TABLE)); 820 getConf().setLong("hbase.test.multi.pause.when.done", multiPause); 821 // Let there be ten outstanding requests at a time before we throw RegionBusyException. 822 getConf().setInt("hbase.test.multi.too.many", 10); 823 final int clients = 2; 824 825 // Share a connection so I can keep counts in the 'server' on concurrency. 826 final Connection sharedConnection = ConnectionFactory.createConnection(getConf()); 827 try { 828 Thread[] ts = new Thread[clients]; 829 for (int j = 0; j < ts.length; j++) { 830 final int id = j; 831 ts[j] = new Thread("" + j) { 832 final Configuration c = getConf(); 833 834 @Override 835 public void run() { 836 try { 837 cycle(id, c, sharedConnection); 838 } catch (IOException e) { 839 LOG.info("Exception in cycle " + id, e); 840 } 841 } 842 }; 843 ts[j].start(); 844 } 845 for (int j = 0; j < ts.length; j++) { 846 ts[j].join(); 847 } 848 } finally { 849 sharedConnection.close(); 850 } 851 return errCode; 852 } 853 854 /** 855 * Run a client instance against a faked up server. 856 * @param args TODO n 857 */ 858 public static void main(String[] args) throws Exception { 859 System.exit(ToolRunner.run(HBaseConfiguration.create(), new TestClientNoCluster(), args)); 860 } 861}