001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023 024import java.io.IOException; 025import java.io.InterruptedIOException; 026import java.util.ArrayList; 027import java.util.Arrays; 028import java.util.Collections; 029import java.util.HashMap; 030import java.util.Iterator; 031import java.util.List; 032import java.util.Map; 033import java.util.Random; 034import java.util.Set; 035import java.util.TreeSet; 036import java.util.concurrent.BlockingQueue; 037import java.util.concurrent.CompletableFuture; 038import java.util.concurrent.ExecutorService; 039import java.util.concurrent.Executors; 040import java.util.concurrent.Future; 041import java.util.concurrent.LinkedBlockingQueue; 042import java.util.concurrent.SynchronousQueue; 043import java.util.concurrent.ThreadFactory; 044import java.util.concurrent.ThreadPoolExecutor; 045import java.util.concurrent.TimeUnit; 046import java.util.concurrent.atomic.AtomicBoolean; 047import java.util.concurrent.atomic.AtomicInteger; 048import java.util.concurrent.atomic.AtomicLong; 049import org.apache.hadoop.conf.Configuration; 050import org.apache.hadoop.hbase.CallDroppedException; 051import org.apache.hadoop.hbase.CallQueueTooBigException; 052import org.apache.hadoop.hbase.Cell; 053import org.apache.hadoop.hbase.HBaseClassTestRule; 054import org.apache.hadoop.hbase.HBaseServerException; 055import org.apache.hadoop.hbase.HConstants; 056import org.apache.hadoop.hbase.HRegionInfo; 057import org.apache.hadoop.hbase.HRegionLocation; 058import org.apache.hadoop.hbase.RegionLocations; 059import org.apache.hadoop.hbase.ServerName; 060import org.apache.hadoop.hbase.TableName; 061import org.apache.hadoop.hbase.client.AsyncProcessTask.ListRowAccess; 062import org.apache.hadoop.hbase.client.AsyncProcessTask.SubmittedRows; 063import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy; 064import org.apache.hadoop.hbase.client.backoff.ServerStatistics; 065import org.apache.hadoop.hbase.client.coprocessor.Batch; 066import org.apache.hadoop.hbase.exceptions.RegionOpeningException; 067import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 068import org.apache.hadoop.hbase.testclassification.ClientTests; 069import org.apache.hadoop.hbase.testclassification.LargeTests; 070import org.apache.hadoop.hbase.util.Bytes; 071import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 072import org.apache.hadoop.hbase.util.Threads; 073import org.junit.Assert; 074import org.junit.Before; 075import org.junit.ClassRule; 076import org.junit.Test; 077import org.junit.experimental.categories.Category; 078import org.mockito.Mockito; 079import org.slf4j.Logger; 080import org.slf4j.LoggerFactory; 081 082import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 083 084@Category({ ClientTests.class, LargeTests.class }) 085public class TestAsyncProcess { 086 087 @ClassRule 088 public static final HBaseClassTestRule CLASS_RULE = 089 HBaseClassTestRule.forClass(TestAsyncProcess.class); 090 091 private static final Logger LOG = LoggerFactory.getLogger(TestAsyncProcess.class); 092 private static final TableName DUMMY_TABLE = TableName.valueOf("DUMMY_TABLE"); 093 private static final byte[] DUMMY_BYTES_1 = Bytes.toBytes("DUMMY_BYTES_1"); 094 private static final byte[] DUMMY_BYTES_2 = Bytes.toBytes("DUMMY_BYTES_2"); 095 private static final byte[] DUMMY_BYTES_3 = Bytes.toBytes("DUMMY_BYTES_3"); 096 private static final byte[] FAILS = Bytes.toBytes("FAILS"); 097 private Configuration CONF; 098 private ConnectionConfiguration CONNECTION_CONFIG; 099 private static final ServerName sn = ServerName.valueOf("s1,1,1"); 100 private static final ServerName sn2 = ServerName.valueOf("s2,2,2"); 101 private static final ServerName sn3 = ServerName.valueOf("s3,3,3"); 102 private static final HRegionInfo hri1 = 103 new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_1, DUMMY_BYTES_2, false, 1); 104 private static final HRegionInfo hri2 = 105 new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_2, HConstants.EMPTY_END_ROW, false, 2); 106 private static final HRegionInfo hri3 = 107 new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_3, HConstants.EMPTY_END_ROW, false, 3); 108 private static final HRegionLocation loc1 = new HRegionLocation(hri1, sn); 109 private static final HRegionLocation loc2 = new HRegionLocation(hri2, sn); 110 private static final HRegionLocation loc3 = new HRegionLocation(hri3, sn2); 111 112 // Replica stuff 113 private static final RegionInfo hri1r1 = RegionReplicaUtil.getRegionInfoForReplica(hri1, 1); 114 private static final RegionInfo hri1r2 = RegionReplicaUtil.getRegionInfoForReplica(hri1, 2); 115 private static final RegionInfo hri2r1 = RegionReplicaUtil.getRegionInfoForReplica(hri2, 1); 116 private static final RegionLocations hrls1 = new RegionLocations(new HRegionLocation(hri1, sn), 117 new HRegionLocation(hri1r1, sn2), new HRegionLocation(hri1r2, sn3)); 118 private static final RegionLocations hrls2 = 119 new RegionLocations(new HRegionLocation(hri2, sn2), new HRegionLocation(hri2r1, sn3)); 120 private static final RegionLocations hrls3 = 121 new RegionLocations(new HRegionLocation(hri3, sn3), null); 122 123 private static final String success = "success"; 124 private static Exception failure = new Exception("failure"); 125 126 private static final int NB_RETRIES = 3; 127 128 private int RPC_TIMEOUT; 129 private int OPERATION_TIMEOUT; 130 131 @Before 132 public void beforeEach() { 133 this.CONF = new Configuration(); 134 CONF.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, NB_RETRIES); 135 this.CONNECTION_CONFIG = new ConnectionConfiguration(CONF); 136 this.RPC_TIMEOUT = 137 CONF.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT); 138 this.OPERATION_TIMEOUT = CONF.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 139 HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); 140 } 141 142 static class CountingThreadFactory implements ThreadFactory { 143 final AtomicInteger nbThreads; 144 ThreadFactory realFactory = 145 new ThreadFactoryBuilder().setNameFormat("test-TestAsyncProcess-pool-%d") 146 .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(); 147 148 @Override 149 public Thread newThread(Runnable r) { 150 nbThreads.incrementAndGet(); 151 return realFactory.newThread(r); 152 } 153 154 CountingThreadFactory(AtomicInteger nbThreads) { 155 this.nbThreads = nbThreads; 156 } 157 } 158 159 static class MyAsyncProcess extends AsyncProcess { 160 final AtomicInteger nbMultiResponse = new AtomicInteger(); 161 final AtomicInteger nbActions = new AtomicInteger(); 162 public List<AsyncRequestFuture> allReqs = new ArrayList<>(); 163 public AtomicInteger callsCt = new AtomicInteger(); 164 private Configuration conf; 165 166 private long previousTimeout = -1; 167 final ExecutorService service; 168 169 @Override 170 protected <Res> AsyncRequestFutureImpl<Res> createAsyncRequestFuture(AsyncProcessTask task, 171 List<Action> actions, long nonceGroup) { 172 // Test HTable has tableName of null, so pass DUMMY_TABLE 173 AsyncProcessTask wrap = new AsyncProcessTask(task) { 174 @Override 175 public TableName getTableName() { 176 return DUMMY_TABLE; 177 } 178 }; 179 AsyncRequestFutureImpl<Res> r = 180 new MyAsyncRequestFutureImpl<>(wrap, actions, nonceGroup, this); 181 allReqs.add(r); 182 return r; 183 } 184 185 public MyAsyncProcess(ClusterConnection hc, Configuration conf) { 186 super(hc, conf, new RpcRetryingCallerFactory(conf), new RpcControllerFactory(conf)); 187 service = Executors.newFixedThreadPool(5); 188 this.conf = conf; 189 } 190 191 public MyAsyncProcess(ClusterConnection hc, Configuration conf, AtomicInteger nbThreads) { 192 super(hc, conf, new RpcRetryingCallerFactory(conf), new RpcControllerFactory(conf)); 193 service = new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS, new SynchronousQueue<>(), 194 new CountingThreadFactory(nbThreads)); 195 } 196 197 public <CResult> AsyncRequestFuture submit(ExecutorService pool, TableName tableName, 198 List<? extends Row> rows, boolean atLeastOne, Batch.Callback<CResult> callback, 199 boolean needResults) throws InterruptedIOException { 200 AsyncProcessTask task = AsyncProcessTask.newBuilder(callback) 201 .setPool(pool == null ? service : pool).setTableName(tableName).setRowAccess(rows) 202 .setSubmittedRows(atLeastOne ? SubmittedRows.AT_LEAST_ONE : SubmittedRows.NORMAL) 203 .setNeedResults(needResults) 204 .setRpcTimeout( 205 conf.getInt(HConstants.HBASE_RPC_READ_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT)) 206 .setOperationTimeout(conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 207 HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT)) 208 .build(); 209 return submit(task); 210 } 211 212 public <CResult> AsyncRequestFuture submit(TableName tableName, final List<? extends Row> rows, 213 boolean atLeastOne, Batch.Callback<CResult> callback, boolean needResults) 214 throws InterruptedIOException { 215 return submit(null, tableName, rows, atLeastOne, callback, needResults); 216 } 217 218 @Override 219 public <Res> AsyncRequestFuture submit(AsyncProcessTask<Res> task) 220 throws InterruptedIOException { 221 previousTimeout = task.getRpcTimeout(); 222 // We use results in tests to check things, so override to always save them. 223 AsyncProcessTask<Res> wrap = new AsyncProcessTask<Res>(task) { 224 @Override 225 public boolean getNeedResults() { 226 return true; 227 } 228 }; 229 return super.submit(wrap); 230 } 231 232 @Override 233 protected RpcRetryingCaller<AbstractResponse> 234 createCaller(CancellableRegionServerCallable callable, int rpcTimeout) { 235 callsCt.incrementAndGet(); 236 MultiServerCallable callable1 = (MultiServerCallable) callable; 237 final MultiResponse mr = createMultiResponse(callable1.getMulti(), nbMultiResponse, nbActions, 238 new ResponseGenerator() { 239 @Override 240 public void addResponse(MultiResponse mr, byte[] regionName, Action a) { 241 if (Arrays.equals(FAILS, a.getAction().getRow())) { 242 mr.add(regionName, a.getOriginalIndex(), failure); 243 } else { 244 mr.add(regionName, a.getOriginalIndex(), success); 245 } 246 } 247 }); 248 249 return new RpcRetryingCallerImpl<AbstractResponse>(100, 500, 10, 9) { 250 @Override 251 public AbstractResponse callWithoutRetries(RetryingCallable<AbstractResponse> callable, 252 int callTimeout) throws IOException, RuntimeException { 253 try { 254 // sleep one second in order for threadpool to start another thread instead of reusing 255 // existing one. 256 Thread.sleep(1000); 257 } catch (InterruptedException e) { 258 // ignore error 259 } 260 return mr; 261 } 262 }; 263 } 264 265 } 266 267 static class MyAsyncRequestFutureImpl<Res> extends AsyncRequestFutureImpl<Res> { 268 private final Map<ServerName, List<Long>> heapSizesByServer = new HashMap<>(); 269 270 public MyAsyncRequestFutureImpl(AsyncProcessTask task, List<Action> actions, long nonceGroup, 271 AsyncProcess asyncProcess) { 272 super(task, actions, nonceGroup, asyncProcess); 273 } 274 275 @Override 276 protected void updateStats(ServerName server, MultiResponse resp) { 277 // Do nothing for avoiding the NPE if we test the ClientBackofPolicy. 278 } 279 280 Map<ServerName, List<Long>> getRequestHeapSize() { 281 return heapSizesByServer; 282 } 283 284 @Override 285 SingleServerRequestRunnable createSingleServerRequest(MultiAction multiAction, int numAttempt, 286 ServerName server, Set<CancellableRegionServerCallable> callsInProgress) { 287 SingleServerRequestRunnable rq = 288 new SingleServerRequestRunnable(multiAction, numAttempt, server, callsInProgress); 289 List<Long> heapCount = heapSizesByServer.get(server); 290 if (heapCount == null) { 291 heapCount = new ArrayList<>(); 292 heapSizesByServer.put(server, heapCount); 293 } 294 heapCount.add(heapSizeOf(multiAction)); 295 return rq; 296 } 297 298 private long heapSizeOf(MultiAction multiAction) { 299 return multiAction.actions.values().stream().flatMap(v -> v.stream()) 300 .map(action -> action.getAction()).filter(row -> row instanceof Mutation) 301 .mapToLong(row -> ((Mutation) row).heapSize()).sum(); 302 } 303 } 304 305 static class CallerWithFailure extends RpcRetryingCallerImpl<AbstractResponse> { 306 307 private final IOException e; 308 309 public CallerWithFailure(IOException e) { 310 super(100, 500, 100, 9); 311 this.e = e; 312 } 313 314 @Override 315 public AbstractResponse callWithoutRetries(RetryingCallable<AbstractResponse> callable, 316 int callTimeout) throws IOException, RuntimeException { 317 throw e; 318 } 319 } 320 321 static class AsyncProcessWithFailure extends MyAsyncProcess { 322 323 private final IOException ioe; 324 325 public AsyncProcessWithFailure(ClusterConnection hc, Configuration conf, IOException ioe) { 326 super(hc, conf); 327 this.ioe = ioe; 328 serverTrackerTimeout = 1L; 329 } 330 331 @Override 332 protected RpcRetryingCaller<AbstractResponse> 333 createCaller(CancellableRegionServerCallable callable, int rpcTimeout) { 334 callsCt.incrementAndGet(); 335 return new CallerWithFailure(ioe); 336 } 337 } 338 339 /** 340 * Make the backoff time always different on each call. 341 */ 342 static class MyClientBackoffPolicy implements ClientBackoffPolicy { 343 private final Map<ServerName, AtomicInteger> count = new HashMap<>(); 344 345 @Override 346 public long getBackoffTime(ServerName serverName, byte[] region, ServerStatistics stats) { 347 AtomicInteger inc = count.get(serverName); 348 if (inc == null) { 349 inc = new AtomicInteger(0); 350 count.put(serverName, inc); 351 } 352 return inc.getAndIncrement(); 353 } 354 } 355 356 static class MyAsyncProcessWithReplicas extends MyAsyncProcess { 357 private Set<byte[]> failures = new TreeSet<>(new Bytes.ByteArrayComparator()); 358 private long primarySleepMs = 0, replicaSleepMs = 0; 359 private Map<ServerName, Long> customPrimarySleepMs = new HashMap<>(); 360 private final AtomicLong replicaCalls = new AtomicLong(0); 361 362 public void addFailures(RegionInfo... hris) { 363 for (RegionInfo hri : hris) { 364 failures.add(hri.getRegionName()); 365 } 366 } 367 368 public long getReplicaCallCount() { 369 return replicaCalls.get(); 370 } 371 372 public void setPrimaryCallDelay(ServerName server, long primaryMs) { 373 customPrimarySleepMs.put(server, primaryMs); 374 } 375 376 public MyAsyncProcessWithReplicas(ClusterConnection hc, Configuration conf) { 377 super(hc, conf); 378 } 379 380 public void setCallDelays(long primaryMs, long replicaMs) { 381 this.primarySleepMs = primaryMs; 382 this.replicaSleepMs = replicaMs; 383 } 384 385 @Override 386 protected RpcRetryingCaller<AbstractResponse> 387 createCaller(CancellableRegionServerCallable payloadCallable, int rpcTimeout) { 388 MultiServerCallable callable = (MultiServerCallable) payloadCallable; 389 final MultiResponse mr = createMultiResponse(callable.getMulti(), nbMultiResponse, nbActions, 390 new ResponseGenerator() { 391 @Override 392 public void addResponse(MultiResponse mr, byte[] regionName, Action a) { 393 if (failures.contains(regionName)) { 394 mr.add(regionName, a.getOriginalIndex(), failure); 395 } else { 396 boolean isStale = !RegionReplicaUtil.isDefaultReplica(a.getReplicaId()); 397 mr.add(regionName, a.getOriginalIndex(), Result.create(new Cell[0], null, isStale)); 398 } 399 } 400 }); 401 // Currently AsyncProcess either sends all-replica, or all-primary request. 402 final boolean isDefault = RegionReplicaUtil.isDefaultReplica( 403 callable.getMulti().actions.values().iterator().next().iterator().next().getReplicaId()); 404 final ServerName server = ((MultiServerCallable) callable).getServerName(); 405 String debugMsg = "Call to " + server + ", primary=" + isDefault + " with " 406 + callable.getMulti().actions.size() + " entries: "; 407 for (byte[] region : callable.getMulti().actions.keySet()) { 408 debugMsg += "[" + Bytes.toStringBinary(region) + "], "; 409 } 410 LOG.debug(debugMsg); 411 if (!isDefault) { 412 replicaCalls.incrementAndGet(); 413 } 414 415 return new RpcRetryingCallerImpl<AbstractResponse>(100, 500, 10, 9) { 416 @Override 417 public MultiResponse callWithoutRetries(RetryingCallable<AbstractResponse> callable, 418 int callTimeout) throws IOException, RuntimeException { 419 long sleep = -1; 420 if (isDefault) { 421 Long customSleep = customPrimarySleepMs.get(server); 422 sleep = (customSleep == null ? primarySleepMs : customSleep.longValue()); 423 } else { 424 sleep = replicaSleepMs; 425 } 426 if (sleep != 0) { 427 try { 428 Thread.sleep(sleep); 429 } catch (InterruptedException e) { 430 // Restore interrupt status 431 Thread.currentThread().interrupt(); 432 } 433 } 434 return mr; 435 } 436 }; 437 } 438 } 439 440 static MultiResponse createMultiResponse(final MultiAction multi, AtomicInteger nbMultiResponse, 441 AtomicInteger nbActions, ResponseGenerator gen) { 442 final MultiResponse mr = new MultiResponse(); 443 nbMultiResponse.incrementAndGet(); 444 for (Map.Entry<byte[], List<Action>> entry : multi.actions.entrySet()) { 445 byte[] regionName = entry.getKey(); 446 for (Action a : entry.getValue()) { 447 nbActions.incrementAndGet(); 448 gen.addResponse(mr, regionName, a); 449 } 450 } 451 return mr; 452 } 453 454 private static interface ResponseGenerator { 455 void addResponse(final MultiResponse mr, byte[] regionName, Action a); 456 } 457 458 /** 459 * Returns our async process. 460 */ 461 static class MyConnectionImpl extends ConnectionImplementation { 462 public static class TestRegistry extends DoNothingConnectionRegistry { 463 464 public TestRegistry(Configuration conf) { 465 super(conf); 466 } 467 468 @Override 469 public CompletableFuture<String> getClusterId() { 470 return CompletableFuture.completedFuture("testClusterId"); 471 } 472 } 473 474 final AtomicInteger nbThreads = new AtomicInteger(0); 475 476 protected MyConnectionImpl(Configuration conf) throws IOException { 477 super(setupConf(conf), null, null); 478 } 479 480 private static Configuration setupConf(Configuration conf) { 481 conf.setClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, TestRegistry.class, 482 ConnectionRegistry.class); 483 return conf; 484 } 485 486 @Override 487 public RegionLocations locateRegion(TableName tableName, byte[] row, boolean useCache, 488 boolean retry, int replicaId) throws IOException { 489 return new RegionLocations(loc1); 490 } 491 492 @Override 493 public boolean hasCellBlockSupport() { 494 return false; 495 } 496 } 497 498 /** 499 * Returns our async process. 500 */ 501 static class MyConnectionImpl2 extends MyConnectionImpl { 502 List<HRegionLocation> hrl; 503 final boolean usedRegions[]; 504 505 protected MyConnectionImpl2(List<HRegionLocation> hrl, Configuration conf) throws IOException { 506 super(conf); 507 this.hrl = hrl; 508 this.usedRegions = new boolean[hrl.size()]; 509 } 510 511 @Override 512 public RegionLocations locateRegion(TableName tableName, byte[] row, boolean useCache, 513 boolean retry, int replicaId) throws IOException { 514 int i = 0; 515 for (HRegionLocation hr : hrl) { 516 if (Arrays.equals(row, hr.getRegionInfo().getStartKey())) { 517 usedRegions[i] = true; 518 return new RegionLocations(hr); 519 } 520 i++; 521 } 522 return null; 523 } 524 } 525 526 @Test 527 public void testListRowAccess() { 528 int count = 10; 529 List<String> values = new ArrayList<>(); 530 for (int i = 0; i != count; ++i) { 531 values.add(String.valueOf(i)); 532 } 533 534 ListRowAccess<String> taker = new ListRowAccess<>(values); 535 assertEquals(count, taker.size()); 536 537 int takeCount = 0; 538 Iterator<String> it = taker.iterator(); 539 while (it.hasNext()) { 540 String v = it.next(); 541 assertEquals(String.valueOf(takeCount), v); 542 ++takeCount; 543 it.remove(); 544 if (Math.random() >= 0.5) { 545 break; 546 } 547 } 548 assertEquals(count, taker.size() + takeCount); 549 550 it = taker.iterator(); 551 while (it.hasNext()) { 552 String v = it.next(); 553 assertEquals(String.valueOf(takeCount), v); 554 ++takeCount; 555 it.remove(); 556 } 557 assertEquals(0, taker.size()); 558 assertEquals(count, takeCount); 559 } 560 561 private static long calculateRequestCount(long putSizePerServer, long maxHeapSizePerRequest) { 562 if (putSizePerServer <= maxHeapSizePerRequest) { 563 return 1; 564 } else if (putSizePerServer % maxHeapSizePerRequest == 0) { 565 return putSizePerServer / maxHeapSizePerRequest; 566 } else { 567 return putSizePerServer / maxHeapSizePerRequest + 1; 568 } 569 } 570 571 @Test 572 public void testSubmitSameSizeOfRequest() throws Exception { 573 long writeBuffer = 2 * 1024 * 1024; 574 long putsHeapSize = writeBuffer; 575 doSubmitRequest(writeBuffer, putsHeapSize); 576 } 577 578 @Test 579 public void testSubmitLargeRequestWithUnlimitedSize() throws Exception { 580 long maxHeapSizePerRequest = Long.MAX_VALUE; 581 long putsHeapSize = 2 * 1024 * 1024; 582 doSubmitRequest(maxHeapSizePerRequest, putsHeapSize); 583 } 584 585 @Test 586 public void testSubmitRandomSizeRequest() throws Exception { 587 Random rn = new Random(); 588 final long limit = 10 * 1024 * 1024; 589 final int requestCount = 1 + (int) (rn.nextDouble() * 3); 590 long n = rn.nextLong(); 591 if (n < 0) { 592 n = -n; 593 } else if (n == 0) { 594 n = 1; 595 } 596 long putsHeapSize = n % limit; 597 long maxHeapSizePerRequest = putsHeapSize / requestCount; 598 LOG.info("[testSubmitRandomSizeRequest] maxHeapSizePerRequest=" + maxHeapSizePerRequest 599 + ", putsHeapSize=" + putsHeapSize); 600 doSubmitRequest(maxHeapSizePerRequest, putsHeapSize); 601 } 602 603 @Test 604 public void testSubmitSmallRequest() throws Exception { 605 long maxHeapSizePerRequest = 2 * 1024 * 1024; 606 long putsHeapSize = 100; 607 doSubmitRequest(maxHeapSizePerRequest, putsHeapSize); 608 } 609 610 @Test 611 public void testSubmitLargeRequest() throws Exception { 612 long maxHeapSizePerRequest = 2 * 1024 * 1024; 613 long putsHeapSize = maxHeapSizePerRequest * 2; 614 doSubmitRequest(maxHeapSizePerRequest, putsHeapSize); 615 } 616 617 private void doSubmitRequest(long maxHeapSizePerRequest, long putsHeapSize) throws Exception { 618 ClusterConnection conn = createHConnection(); 619 final String defaultClazz = 620 conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY); 621 final long defaultHeapSizePerRequest = 622 conn.getConfiguration().getLong(SimpleRequestController.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE, 623 SimpleRequestController.DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE); 624 conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY, 625 SimpleRequestController.class.getName()); 626 conn.getConfiguration().setLong(SimpleRequestController.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE, 627 maxHeapSizePerRequest); 628 629 // sn has two regions 630 long putSizeSN = 0; 631 long putSizeSN2 = 0; 632 List<Put> puts = new ArrayList<>(); 633 while ((putSizeSN + putSizeSN2) <= putsHeapSize) { 634 Put put1 = new Put(DUMMY_BYTES_1); 635 put1.addColumn(DUMMY_BYTES_1, DUMMY_BYTES_1, DUMMY_BYTES_1); 636 Put put2 = new Put(DUMMY_BYTES_2); 637 put2.addColumn(DUMMY_BYTES_2, DUMMY_BYTES_2, DUMMY_BYTES_2); 638 Put put3 = new Put(DUMMY_BYTES_3); 639 put3.addColumn(DUMMY_BYTES_3, DUMMY_BYTES_3, DUMMY_BYTES_3); 640 putSizeSN += (put1.heapSize() + put2.heapSize()); 641 putSizeSN2 += put3.heapSize(); 642 puts.add(put1); 643 puts.add(put2); 644 puts.add(put3); 645 } 646 647 int minCountSnRequest = (int) calculateRequestCount(putSizeSN, maxHeapSizePerRequest); 648 int minCountSn2Request = (int) calculateRequestCount(putSizeSN2, maxHeapSizePerRequest); 649 LOG.info("Total put count:" + puts.size() + ", putSizeSN:" + putSizeSN + ", putSizeSN2:" 650 + putSizeSN2 + ", maxHeapSizePerRequest:" + maxHeapSizePerRequest + ", minCountSnRequest:" 651 + minCountSnRequest + ", minCountSn2Request:" + minCountSn2Request); 652 653 MyAsyncProcess ap = new MyAsyncProcess(conn, CONF); 654 BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); 655 try (BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap)) { 656 mutator.mutate(puts); 657 mutator.flush(); 658 List<AsyncRequestFuture> reqs = ap.allReqs; 659 660 int actualSnReqCount = 0; 661 int actualSn2ReqCount = 0; 662 for (AsyncRequestFuture req : reqs) { 663 if (!(req instanceof AsyncRequestFutureImpl)) { 664 continue; 665 } 666 MyAsyncRequestFutureImpl ars = (MyAsyncRequestFutureImpl) req; 667 if (ars.getRequestHeapSize().containsKey(sn)) { 668 ++actualSnReqCount; 669 } 670 if (ars.getRequestHeapSize().containsKey(sn2)) { 671 ++actualSn2ReqCount; 672 } 673 } 674 // If the server is busy, the actual count may be incremented. 675 assertEquals(true, minCountSnRequest <= actualSnReqCount); 676 assertEquals(true, minCountSn2Request <= actualSn2ReqCount); 677 Map<ServerName, Long> sizePerServers = new HashMap<>(); 678 for (AsyncRequestFuture req : reqs) { 679 if (!(req instanceof AsyncRequestFutureImpl)) { 680 continue; 681 } 682 MyAsyncRequestFutureImpl ars = (MyAsyncRequestFutureImpl) req; 683 Map<ServerName, List<Long>> requestHeapSize = ars.getRequestHeapSize(); 684 for (Map.Entry<ServerName, List<Long>> entry : requestHeapSize.entrySet()) { 685 long sum = 0; 686 for (long size : entry.getValue()) { 687 assertEquals(true, size <= maxHeapSizePerRequest); 688 sum += size; 689 } 690 assertEquals(true, sum <= maxHeapSizePerRequest); 691 long value = sizePerServers.getOrDefault(entry.getKey(), 0L); 692 sizePerServers.put(entry.getKey(), value + sum); 693 } 694 } 695 assertEquals(true, sizePerServers.containsKey(sn)); 696 assertEquals(true, sizePerServers.containsKey(sn2)); 697 assertEquals(false, sizePerServers.containsKey(sn3)); 698 assertEquals(putSizeSN, (long) sizePerServers.get(sn)); 699 assertEquals(putSizeSN2, (long) sizePerServers.get(sn2)); 700 } 701 // restore config. 702 conn.getConfiguration().setLong(SimpleRequestController.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE, 703 defaultHeapSizePerRequest); 704 if (defaultClazz != null) { 705 conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY, 706 defaultClazz); 707 } 708 } 709 710 @Test 711 public void testSubmit() throws Exception { 712 ClusterConnection hc = createHConnection(); 713 MyAsyncProcess ap = new MyAsyncProcess(hc, CONF); 714 715 List<Put> puts = new ArrayList<>(1); 716 puts.add(createPut(1, true)); 717 718 ap.submit(null, DUMMY_TABLE, puts, false, null, false); 719 Assert.assertTrue(puts.isEmpty()); 720 } 721 722 @Test 723 public void testSubmitWithCB() throws Exception { 724 ClusterConnection hc = createHConnection(); 725 final AtomicInteger updateCalled = new AtomicInteger(0); 726 Batch.Callback<Object> cb = new Batch.Callback<Object>() { 727 @Override 728 public void update(byte[] region, byte[] row, Object result) { 729 updateCalled.incrementAndGet(); 730 } 731 }; 732 MyAsyncProcess ap = new MyAsyncProcess(hc, CONF); 733 734 List<Put> puts = new ArrayList<>(1); 735 puts.add(createPut(1, true)); 736 737 final AsyncRequestFuture ars = ap.submit(null, DUMMY_TABLE, puts, false, cb, false); 738 Assert.assertTrue(puts.isEmpty()); 739 ars.waitUntilDone(); 740 Assert.assertEquals(1, updateCalled.get()); 741 } 742 743 @Test 744 public void testSubmitBusyRegion() throws Exception { 745 ClusterConnection conn = createHConnection(); 746 final String defaultClazz = 747 conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY); 748 conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY, 749 SimpleRequestController.class.getName()); 750 MyAsyncProcess ap = new MyAsyncProcess(conn, CONF); 751 SimpleRequestController controller = (SimpleRequestController) ap.requestController; 752 List<Put> puts = new ArrayList<>(1); 753 puts.add(createPut(1, true)); 754 755 for (int i = 0; i != controller.maxConcurrentTasksPerRegion; ++i) { 756 ap.incTaskCounters(Collections.singleton(hri1.getRegionName()), sn); 757 } 758 ap.submit(null, DUMMY_TABLE, puts, false, null, false); 759 Assert.assertEquals(puts.size(), 1); 760 761 ap.decTaskCounters(Collections.singleton(hri1.getRegionName()), sn); 762 ap.submit(null, DUMMY_TABLE, puts, false, null, false); 763 Assert.assertEquals(0, puts.size()); 764 if (defaultClazz != null) { 765 conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY, 766 defaultClazz); 767 } 768 } 769 770 @Test 771 public void testSubmitBusyRegionServer() throws Exception { 772 ClusterConnection conn = createHConnection(); 773 MyAsyncProcess ap = new MyAsyncProcess(conn, CONF); 774 final String defaultClazz = 775 conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY); 776 conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY, 777 SimpleRequestController.class.getName()); 778 SimpleRequestController controller = (SimpleRequestController) ap.requestController; 779 controller.taskCounterPerServer.put(sn2, 780 new AtomicInteger(controller.maxConcurrentTasksPerServer)); 781 782 List<Put> puts = new ArrayList<>(4); 783 puts.add(createPut(1, true)); 784 puts.add(createPut(3, true)); // <== this one won't be taken, the rs is busy 785 puts.add(createPut(1, true)); // <== this one will make it, the region is already in 786 puts.add(createPut(2, true)); // <== new region, but the rs is ok 787 788 ap.submit(null, DUMMY_TABLE, puts, false, null, false); 789 Assert.assertEquals(" puts=" + puts, 1, puts.size()); 790 791 controller.taskCounterPerServer.put(sn2, 792 new AtomicInteger(controller.maxConcurrentTasksPerServer - 1)); 793 ap.submit(null, DUMMY_TABLE, puts, false, null, false); 794 Assert.assertTrue(puts.isEmpty()); 795 if (defaultClazz != null) { 796 conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY, 797 defaultClazz); 798 } 799 } 800 801 @Test 802 public void testFail() throws Exception { 803 MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF); 804 805 List<Put> puts = new ArrayList<>(1); 806 Put p = createPut(1, false); 807 puts.add(p); 808 809 AsyncRequestFuture ars = ap.submit(null, DUMMY_TABLE, puts, false, null, true); 810 Assert.assertEquals(0, puts.size()); 811 ars.waitUntilDone(); 812 verifyResult(ars, false); 813 Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get()); 814 815 Assert.assertEquals(1, ars.getErrors().exceptions.size()); 816 Assert.assertTrue("was: " + ars.getErrors().exceptions.get(0), 817 failure.equals(ars.getErrors().exceptions.get(0))); 818 Assert.assertTrue("was: " + ars.getErrors().exceptions.get(0), 819 failure.equals(ars.getErrors().exceptions.get(0))); 820 821 Assert.assertEquals(1, ars.getFailedOperations().size()); 822 Assert.assertTrue("was: " + ars.getFailedOperations().get(0), 823 p.equals(ars.getFailedOperations().get(0))); 824 } 825 826 @Test 827 public void testSubmitTrue() throws IOException { 828 ClusterConnection conn = createHConnection(); 829 final MyAsyncProcess ap = new MyAsyncProcess(conn, CONF); 830 final String defaultClazz = 831 conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY); 832 conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY, 833 SimpleRequestController.class.getName()); 834 SimpleRequestController controller = (SimpleRequestController) ap.requestController; 835 controller.tasksInProgress.incrementAndGet(); 836 final AtomicInteger ai = new AtomicInteger(controller.maxConcurrentTasksPerRegion); 837 controller.taskCounterPerRegion.put(hri1.getRegionName(), ai); 838 839 final AtomicBoolean checkPoint = new AtomicBoolean(false); 840 final AtomicBoolean checkPoint2 = new AtomicBoolean(false); 841 842 Thread t = new Thread() { 843 @Override 844 public void run() { 845 Threads.sleep(1000); 846 Assert.assertFalse(checkPoint.get()); // TODO: this is timing-dependent 847 ai.decrementAndGet(); 848 controller.tasksInProgress.decrementAndGet(); 849 checkPoint2.set(true); 850 } 851 }; 852 853 List<Put> puts = new ArrayList<>(1); 854 Put p = createPut(1, true); 855 puts.add(p); 856 857 ap.submit(null, DUMMY_TABLE, puts, false, null, false); 858 Assert.assertFalse(puts.isEmpty()); 859 860 t.start(); 861 862 ap.submit(null, DUMMY_TABLE, puts, true, null, false); 863 Assert.assertTrue(puts.isEmpty()); 864 865 checkPoint.set(true); 866 while (!checkPoint2.get()) { 867 Threads.sleep(1); 868 } 869 if (defaultClazz != null) { 870 conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY, 871 defaultClazz); 872 } 873 } 874 875 @Test 876 public void testFailAndSuccess() throws Exception { 877 MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF); 878 879 List<Put> puts = new ArrayList<>(3); 880 puts.add(createPut(1, false)); 881 puts.add(createPut(1, true)); 882 puts.add(createPut(1, true)); 883 884 AsyncRequestFuture ars = ap.submit(null, DUMMY_TABLE, puts, false, null, true); 885 Assert.assertTrue(puts.isEmpty()); 886 ars.waitUntilDone(); 887 verifyResult(ars, false, true, true); 888 Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get()); 889 ap.callsCt.set(0); 890 Assert.assertEquals(1, ars.getErrors().actions.size()); 891 892 puts.add(createPut(1, true)); 893 // Wait for AP to be free. While ars might have the result, ap counters are decreased later. 894 ap.waitForMaximumCurrentTasks(0, null); 895 ars = ap.submit(null, DUMMY_TABLE, puts, false, null, true); 896 Assert.assertEquals(0, puts.size()); 897 ars.waitUntilDone(); 898 Assert.assertEquals(1, ap.callsCt.get()); 899 verifyResult(ars, true); 900 } 901 902 @Test 903 public void testFlush() throws Exception { 904 MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF); 905 906 List<Put> puts = new ArrayList<>(3); 907 puts.add(createPut(1, false)); 908 puts.add(createPut(1, true)); 909 puts.add(createPut(1, true)); 910 911 AsyncRequestFuture ars = ap.submit(null, DUMMY_TABLE, puts, false, null, true); 912 ars.waitUntilDone(); 913 verifyResult(ars, false, true, true); 914 Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get()); 915 916 Assert.assertEquals(1, ars.getFailedOperations().size()); 917 } 918 919 @Test 920 public void testTaskCountWithoutClientBackoffPolicy() throws IOException, InterruptedException { 921 ClusterConnection hc = createHConnection(); 922 MyAsyncProcess ap = new MyAsyncProcess(hc, CONF); 923 testTaskCount(ap); 924 } 925 926 @Test 927 public void testTaskCountWithClientBackoffPolicy() throws IOException, InterruptedException { 928 Configuration copyConf = new Configuration(CONF); 929 copyConf.setBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE, true); 930 MyClientBackoffPolicy bp = new MyClientBackoffPolicy(); 931 ClusterConnection conn = createHConnection(); 932 Mockito.when(conn.getConfiguration()).thenReturn(copyConf); 933 Mockito.when(conn.getStatisticsTracker()).thenReturn(ServerStatisticTracker.create(copyConf)); 934 Mockito.when(conn.getBackoffPolicy()).thenReturn(bp); 935 final String defaultClazz = 936 conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY); 937 conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY, 938 SimpleRequestController.class.getName()); 939 MyAsyncProcess ap = new MyAsyncProcess(conn, copyConf); 940 testTaskCount(ap); 941 if (defaultClazz != null) { 942 conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY, 943 defaultClazz); 944 } 945 } 946 947 private void testTaskCount(MyAsyncProcess ap) 948 throws InterruptedIOException, InterruptedException { 949 SimpleRequestController controller = (SimpleRequestController) ap.requestController; 950 List<Put> puts = new ArrayList<>(); 951 for (int i = 0; i != 3; ++i) { 952 puts.add(createPut(1, true)); 953 puts.add(createPut(2, true)); 954 puts.add(createPut(3, true)); 955 } 956 ap.submit(null, DUMMY_TABLE, puts, true, null, false); 957 ap.waitForMaximumCurrentTasks(0, null); 958 // More time to wait if there are incorrect task count. 959 TimeUnit.SECONDS.sleep(1); 960 assertEquals(0, controller.tasksInProgress.get()); 961 for (AtomicInteger count : controller.taskCounterPerRegion.values()) { 962 assertEquals(0, count.get()); 963 } 964 for (AtomicInteger count : controller.taskCounterPerServer.values()) { 965 assertEquals(0, count.get()); 966 } 967 } 968 969 @Test 970 public void testMaxTask() throws Exception { 971 ClusterConnection conn = createHConnection(); 972 final String defaultClazz = 973 conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY); 974 conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY, 975 SimpleRequestController.class.getName()); 976 final MyAsyncProcess ap = new MyAsyncProcess(conn, CONF); 977 SimpleRequestController controller = (SimpleRequestController) ap.requestController; 978 979 for (int i = 0; i < 1000; i++) { 980 ap.incTaskCounters(Collections.singleton(Bytes.toBytes("dummy")), sn); 981 } 982 983 final Thread myThread = Thread.currentThread(); 984 985 Thread t = new Thread() { 986 @Override 987 public void run() { 988 Threads.sleep(2000); 989 myThread.interrupt(); 990 } 991 }; 992 993 List<Put> puts = new ArrayList<>(1); 994 puts.add(createPut(1, true)); 995 996 t.start(); 997 998 try { 999 ap.submit(null, DUMMY_TABLE, puts, false, null, false); 1000 Assert.fail("We should have been interrupted."); 1001 } catch (InterruptedIOException expected) { 1002 } 1003 1004 final long sleepTime = 2000; 1005 1006 Thread t2 = new Thread() { 1007 @Override 1008 public void run() { 1009 Threads.sleep(sleepTime); 1010 while (controller.tasksInProgress.get() > 0) { 1011 ap.decTaskCounters(Collections.singleton(Bytes.toBytes("dummy")), sn); 1012 } 1013 } 1014 }; 1015 t2.start(); 1016 1017 long start = EnvironmentEdgeManager.currentTime(); 1018 ap.submit(null, DUMMY_TABLE, new ArrayList<>(), false, null, false); 1019 long end = EnvironmentEdgeManager.currentTime(); 1020 1021 // Adds 100 to secure us against approximate timing. 1022 Assert.assertTrue(start + 100L + sleepTime > end); 1023 if (defaultClazz != null) { 1024 conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY, 1025 defaultClazz); 1026 } 1027 } 1028 1029 private ClusterConnection createHConnection() throws IOException { 1030 return createHConnection(CONNECTION_CONFIG); 1031 } 1032 1033 private ClusterConnection createHConnection(ConnectionConfiguration configuration) 1034 throws IOException { 1035 ClusterConnection hc = createHConnectionCommon(configuration); 1036 setMockLocation(hc, DUMMY_BYTES_1, new RegionLocations(loc1)); 1037 setMockLocation(hc, DUMMY_BYTES_2, new RegionLocations(loc2)); 1038 setMockLocation(hc, DUMMY_BYTES_3, new RegionLocations(loc3)); 1039 Mockito 1040 .when(hc.locateRegions(Mockito.eq(DUMMY_TABLE), Mockito.anyBoolean(), Mockito.anyBoolean())) 1041 .thenReturn(Arrays.asList(loc1, loc2, loc3)); 1042 setMockLocation(hc, FAILS, new RegionLocations(loc2)); 1043 return hc; 1044 } 1045 1046 private ClusterConnection createHConnectionWithReplicas(ConnectionConfiguration configuration) 1047 throws IOException { 1048 ClusterConnection hc = createHConnectionCommon(configuration); 1049 setMockLocation(hc, DUMMY_BYTES_1, hrls1); 1050 setMockLocation(hc, DUMMY_BYTES_2, hrls2); 1051 setMockLocation(hc, DUMMY_BYTES_3, hrls3); 1052 List<HRegionLocation> locations = new ArrayList<>(); 1053 for (HRegionLocation loc : hrls1.getRegionLocations()) { 1054 locations.add(loc); 1055 } 1056 for (HRegionLocation loc : hrls2.getRegionLocations()) { 1057 locations.add(loc); 1058 } 1059 for (HRegionLocation loc : hrls3.getRegionLocations()) { 1060 locations.add(loc); 1061 } 1062 Mockito 1063 .when(hc.locateRegions(Mockito.eq(DUMMY_TABLE), Mockito.anyBoolean(), Mockito.anyBoolean())) 1064 .thenReturn(locations); 1065 return hc; 1066 } 1067 1068 private static void setMockLocation(ClusterConnection hc, byte[] row, RegionLocations result) 1069 throws IOException { 1070 Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE), Mockito.eq(row), Mockito.anyBoolean(), 1071 Mockito.anyBoolean(), Mockito.anyInt())).thenReturn(result); 1072 Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE), Mockito.eq(row), Mockito.anyBoolean(), 1073 Mockito.anyBoolean())).thenReturn(result); 1074 } 1075 1076 private ClusterConnection 1077 createHConnectionCommon(ConnectionConfiguration connectionConfiguration) { 1078 ClusterConnection hc = Mockito.mock(ClusterConnection.class); 1079 NonceGenerator ng = Mockito.mock(NonceGenerator.class); 1080 Mockito.when(ng.getNonceGroup()).thenReturn(HConstants.NO_NONCE); 1081 Mockito.when(hc.getNonceGenerator()).thenReturn(ng); 1082 Mockito.when(hc.getConfiguration()).thenReturn(CONF); 1083 Mockito.when(hc.getConnectionConfiguration()).thenReturn(connectionConfiguration); 1084 return hc; 1085 } 1086 1087 @Test 1088 public void testHTablePutSuccess() throws Exception { 1089 ClusterConnection conn = createHConnection(); 1090 MyAsyncProcess ap = new MyAsyncProcess(conn, CONF); 1091 BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); 1092 BufferedMutatorImpl ht = new BufferedMutatorImpl(conn, bufferParam, ap); 1093 1094 Put put = createPut(1, true); 1095 1096 Assert.assertEquals(conn.getConnectionConfiguration().getWriteBufferSize(), 1097 ht.getWriteBufferSize()); 1098 Assert.assertEquals(0, ht.getCurrentWriteBufferSize()); 1099 ht.mutate(put); 1100 ht.flush(); 1101 Assert.assertEquals(0, ht.getCurrentWriteBufferSize()); 1102 } 1103 1104 @Test 1105 public void testSettingWriteBufferPeriodicFlushParameters() throws Exception { 1106 ClusterConnection conn = createHConnection(); 1107 MyAsyncProcess ap = new MyAsyncProcess(conn, CONF); 1108 1109 checkPeriodicFlushParameters(conn, ap, 1234, 1234, 1234, 1234); 1110 checkPeriodicFlushParameters(conn, ap, 0, 0, 0, 1111 BufferedMutator.MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS); 1112 checkPeriodicFlushParameters(conn, ap, -1234, 0, -1234, 1113 BufferedMutator.MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS); 1114 checkPeriodicFlushParameters(conn, ap, 1, 1, 1, 1115 BufferedMutator.MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS); 1116 } 1117 1118 private void checkPeriodicFlushParameters(ClusterConnection conn, MyAsyncProcess ap, long setTO, 1119 long expectTO, long setTT, long expectTT) { 1120 BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); 1121 1122 // The BufferedMutatorParams does nothing with the value 1123 bufferParam.setWriteBufferPeriodicFlushTimeoutMs(setTO); 1124 bufferParam.setWriteBufferPeriodicFlushTimerTickMs(setTT); 1125 Assert.assertEquals(setTO, bufferParam.getWriteBufferPeriodicFlushTimeoutMs()); 1126 Assert.assertEquals(setTT, bufferParam.getWriteBufferPeriodicFlushTimerTickMs()); 1127 1128 // The BufferedMutatorImpl corrects illegal values (indirect via BufferedMutatorParams) 1129 BufferedMutatorImpl ht1 = new BufferedMutatorImpl(conn, bufferParam, ap); 1130 Assert.assertEquals(expectTO, ht1.getWriteBufferPeriodicFlushTimeoutMs()); 1131 Assert.assertEquals(expectTT, ht1.getWriteBufferPeriodicFlushTimerTickMs()); 1132 1133 // The BufferedMutatorImpl corrects illegal values (direct via setter) 1134 BufferedMutatorImpl ht2 = 1135 new BufferedMutatorImpl(conn, createBufferedMutatorParams(ap, DUMMY_TABLE), ap); 1136 ht2.setWriteBufferPeriodicFlush(setTO, setTT); 1137 Assert.assertEquals(expectTO, ht2.getWriteBufferPeriodicFlushTimeoutMs()); 1138 Assert.assertEquals(expectTT, ht2.getWriteBufferPeriodicFlushTimerTickMs()); 1139 1140 } 1141 1142 @Test 1143 public void testWriteBufferPeriodicFlushTimeoutMs() throws Exception { 1144 ClusterConnection conn = createHConnection(); 1145 MyAsyncProcess ap = new MyAsyncProcess(conn, CONF); 1146 BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); 1147 1148 bufferParam.setWriteBufferPeriodicFlushTimeoutMs(1); // Flush ASAP 1149 bufferParam.setWriteBufferPeriodicFlushTimerTickMs(1); // Check every 100ms 1150 bufferParam.writeBufferSize(10000); // Write buffer set to much larger than the single record 1151 1152 BufferedMutatorImpl ht = new BufferedMutatorImpl(conn, bufferParam, ap); 1153 1154 // Verify if BufferedMutator has the right settings. 1155 Assert.assertEquals(10000, ht.getWriteBufferSize()); 1156 Assert.assertEquals(1, ht.getWriteBufferPeriodicFlushTimeoutMs()); 1157 Assert.assertEquals(BufferedMutator.MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS, 1158 ht.getWriteBufferPeriodicFlushTimerTickMs()); 1159 1160 Put put = createPut(1, true); 1161 1162 Assert.assertEquals(0, ht.getExecutedWriteBufferPeriodicFlushes()); 1163 Assert.assertEquals(0, ht.getCurrentWriteBufferSize()); 1164 1165 // ----- Insert, flush immediately, MUST NOT flush automatically 1166 ht.mutate(put); 1167 ht.flush(); 1168 1169 Thread.sleep(1000); 1170 Assert.assertEquals(0, ht.getExecutedWriteBufferPeriodicFlushes()); 1171 Assert.assertEquals(0, ht.getCurrentWriteBufferSize()); 1172 1173 // ----- Insert, NO flush, MUST flush automatically 1174 ht.mutate(put); 1175 Assert.assertEquals(0, ht.getExecutedWriteBufferPeriodicFlushes()); 1176 Assert.assertTrue(ht.getCurrentWriteBufferSize() > 0); 1177 1178 // The timerTick should fire every 100ms, so after twice that we must have 1179 // seen at least 1 tick and we should see an automatic flush 1180 Thread.sleep(200); 1181 Assert.assertEquals(1, ht.getExecutedWriteBufferPeriodicFlushes()); 1182 Assert.assertEquals(0, ht.getCurrentWriteBufferSize()); 1183 1184 // Ensure it does not flush twice 1185 Thread.sleep(200); 1186 Assert.assertEquals(1, ht.getExecutedWriteBufferPeriodicFlushes()); 1187 Assert.assertEquals(0, ht.getCurrentWriteBufferSize()); 1188 1189 // ----- DISABLE AUTO FLUSH, Insert, NO flush, MUST NOT flush automatically 1190 ht.disableWriteBufferPeriodicFlush(); 1191 ht.mutate(put); 1192 Assert.assertEquals(1, ht.getExecutedWriteBufferPeriodicFlushes()); 1193 Assert.assertTrue(ht.getCurrentWriteBufferSize() > 0); 1194 1195 // Wait for at least 1 timerTick, we should see NO flushes. 1196 Thread.sleep(200); 1197 Assert.assertEquals(1, ht.getExecutedWriteBufferPeriodicFlushes()); 1198 Assert.assertTrue(ht.getCurrentWriteBufferSize() > 0); 1199 1200 // Reenable periodic flushing, a flush seems to take about 1 second 1201 // so we wait for 2 seconds and it should have finished the flush. 1202 ht.setWriteBufferPeriodicFlush(1, 100); 1203 Thread.sleep(2000); 1204 Assert.assertEquals(2, ht.getExecutedWriteBufferPeriodicFlushes()); 1205 Assert.assertEquals(0, ht.getCurrentWriteBufferSize()); 1206 } 1207 1208 @Test 1209 public void testBufferedMutatorImplWithSharedPool() throws Exception { 1210 ClusterConnection conn = createHConnection(); 1211 MyAsyncProcess ap = new MyAsyncProcess(conn, CONF); 1212 BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); 1213 BufferedMutator ht = new BufferedMutatorImpl(conn, bufferParam, ap); 1214 1215 ht.close(); 1216 assertFalse(ap.service.isShutdown()); 1217 } 1218 1219 @Test 1220 public void testFailedPutAndNewPut() throws Exception { 1221 ClusterConnection conn = createHConnection(); 1222 MyAsyncProcess ap = new MyAsyncProcess(conn, CONF); 1223 BufferedMutatorParams bufferParam = 1224 createBufferedMutatorParams(ap, DUMMY_TABLE).writeBufferSize(0); 1225 BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap); 1226 1227 Put p = createPut(1, false); 1228 try { 1229 mutator.mutate(p); 1230 Assert.fail(); 1231 } catch (RetriesExhaustedWithDetailsException expected) { 1232 assertEquals(1, expected.getNumExceptions()); 1233 assertTrue(expected.getRow(0) == p); 1234 } 1235 // Let's do all the retries. 1236 ap.waitForMaximumCurrentTasks(0, null); 1237 Assert.assertEquals(0, mutator.size()); 1238 1239 // There is no global error so the new put should not fail 1240 mutator.mutate(createPut(1, true)); 1241 Assert.assertEquals("the put should not been inserted.", 0, mutator.size()); 1242 } 1243 1244 @SuppressWarnings("SelfComparison") 1245 @Test 1246 public void testAction() { 1247 Action action_0 = new Action(new Put(Bytes.toBytes("abc")), 10); 1248 Action action_1 = new Action(new Put(Bytes.toBytes("ccc")), 10); 1249 Action action_2 = new Action(new Put(Bytes.toBytes("ccc")), 10); 1250 Action action_3 = new Action(new Delete(Bytes.toBytes("ccc")), 10); 1251 assertFalse(action_0.equals(action_1)); 1252 assertTrue(action_0.equals(action_0)); 1253 assertTrue(action_1.equals(action_2)); 1254 assertTrue(action_2.equals(action_1)); 1255 assertFalse(action_0.equals(new Put(Bytes.toBytes("abc")))); 1256 assertTrue(action_2.equals(action_3)); 1257 assertFalse(action_0.equals(action_3)); 1258 assertEquals(0, action_0.compareTo(action_0)); 1259 assertTrue(action_0.compareTo(action_1) < 0); 1260 assertTrue(action_1.compareTo(action_0) > 0); 1261 assertEquals(0, action_1.compareTo(action_2)); 1262 } 1263 1264 @Test 1265 public void testBatch() throws IOException, InterruptedException { 1266 ClusterConnection conn = new MyConnectionImpl(CONF); 1267 HTable ht = (HTable) conn.getTable(DUMMY_TABLE); 1268 ht.multiAp = new MyAsyncProcess(conn, CONF); 1269 1270 List<Put> puts = new ArrayList<>(7); 1271 puts.add(createPut(1, true)); 1272 puts.add(createPut(1, true)); 1273 puts.add(createPut(1, true)); 1274 puts.add(createPut(1, true)); 1275 puts.add(createPut(1, false)); // <=== the bad apple, position 4 1276 puts.add(createPut(1, true)); 1277 puts.add(createPut(1, false)); // <=== another bad apple, position 6 1278 1279 Object[] res = new Object[puts.size()]; 1280 try { 1281 ht.batch(puts, res); 1282 Assert.fail(); 1283 } catch (RetriesExhaustedException expected) { 1284 } 1285 1286 Assert.assertEquals(success, res[0]); 1287 Assert.assertEquals(success, res[1]); 1288 Assert.assertEquals(success, res[2]); 1289 Assert.assertEquals(success, res[3]); 1290 Assert.assertEquals(failure, res[4]); 1291 Assert.assertEquals(success, res[5]); 1292 Assert.assertEquals(failure, res[6]); 1293 } 1294 1295 @Test 1296 public void testErrorsServers() throws IOException { 1297 Configuration configuration = new Configuration(CONF); 1298 ClusterConnection conn = new MyConnectionImpl(configuration); 1299 MyAsyncProcess ap = new MyAsyncProcess(conn, configuration); 1300 BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); 1301 BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap); 1302 configuration.setBoolean(ConnectionImplementation.RETRIES_BY_SERVER_KEY, true); 1303 1304 Assert.assertNotNull(ap.createServerErrorTracker()); 1305 Assert.assertTrue(ap.serverTrackerTimeout > 200L); 1306 ap.serverTrackerTimeout = 1L; 1307 1308 Put p = createPut(1, false); 1309 mutator.mutate(p); 1310 1311 try { 1312 mutator.flush(); 1313 Assert.fail(); 1314 } catch (RetriesExhaustedWithDetailsException expected) { 1315 assertEquals(1, expected.getNumExceptions()); 1316 assertTrue(expected.getRow(0) == p); 1317 } 1318 // Checking that the ErrorsServers came into play and didn't make us stop immediately 1319 Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get()); 1320 } 1321 1322 @Test 1323 public void testReadAndWriteTimeout() throws IOException { 1324 final long readTimeout = 10 * 1000; 1325 final long writeTimeout = 20 * 1000; 1326 Configuration copyConf = new Configuration(CONF); 1327 copyConf.setLong(HConstants.HBASE_RPC_READ_TIMEOUT_KEY, readTimeout); 1328 copyConf.setLong(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, writeTimeout); 1329 ClusterConnection conn = new MyConnectionImpl(copyConf); 1330 MyAsyncProcess ap = new MyAsyncProcess(conn, copyConf); 1331 try (HTable ht = (HTable) conn.getTable(DUMMY_TABLE)) { 1332 ht.multiAp = ap; 1333 List<Get> gets = new ArrayList<>(); 1334 gets.add(new Get(DUMMY_BYTES_1)); 1335 gets.add(new Get(DUMMY_BYTES_2)); 1336 try { 1337 ht.get(gets); 1338 } catch (ClassCastException e) { 1339 // No result response on this test. 1340 } 1341 assertEquals(readTimeout, ap.previousTimeout); 1342 ap.previousTimeout = -1; 1343 1344 try { 1345 ht.existsAll(gets); 1346 } catch (ClassCastException e) { 1347 // No result response on this test. 1348 } 1349 assertEquals(readTimeout, ap.previousTimeout); 1350 ap.previousTimeout = -1; 1351 1352 List<Delete> deletes = new ArrayList<>(); 1353 deletes.add(new Delete(DUMMY_BYTES_1)); 1354 deletes.add(new Delete(DUMMY_BYTES_2)); 1355 ht.delete(deletes); 1356 assertEquals(writeTimeout, ap.previousTimeout); 1357 } 1358 } 1359 1360 @Test 1361 public void testErrors() throws IOException { 1362 ClusterConnection conn = new MyConnectionImpl(CONF); 1363 AsyncProcessWithFailure ap = new AsyncProcessWithFailure(conn, CONF, new IOException("test")); 1364 BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); 1365 BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap); 1366 1367 Assert.assertNotNull(ap.createServerErrorTracker()); 1368 1369 Put p = createPut(1, true); 1370 mutator.mutate(p); 1371 1372 try { 1373 mutator.flush(); 1374 Assert.fail(); 1375 } catch (RetriesExhaustedWithDetailsException expected) { 1376 assertEquals(1, expected.getNumExceptions()); 1377 assertTrue(expected.getRow(0) == p); 1378 } 1379 // Checking that the ErrorsServers came into play and didn't make us stop immediately 1380 Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get()); 1381 } 1382 1383 @Test 1384 public void testCallQueueTooLarge() throws IOException { 1385 ClusterConnection conn = new MyConnectionImpl(CONF); 1386 AsyncProcessWithFailure ap = 1387 new AsyncProcessWithFailure(conn, CONF, new CallQueueTooBigException()); 1388 BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); 1389 BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap); 1390 Assert.assertNotNull(ap.createServerErrorTracker()); 1391 Put p = createPut(1, true); 1392 mutator.mutate(p); 1393 1394 try { 1395 mutator.flush(); 1396 Assert.fail(); 1397 } catch (RetriesExhaustedWithDetailsException expected) { 1398 assertEquals(1, expected.getNumExceptions()); 1399 assertTrue(expected.getRow(0) == p); 1400 } 1401 // Checking that the ErrorsServers came into play and didn't make us stop immediately 1402 Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get()); 1403 } 1404 1405 /** 1406 * This test simulates multiple regions on 2 servers. We should have 2 multi requests and 2 1407 * threads: 1 per server, this whatever the number of regions. 1408 */ 1409 @Test 1410 public void testThreadCreation() throws Exception { 1411 final int NB_REGS = 100; 1412 List<HRegionLocation> hrls = new ArrayList<>(NB_REGS); 1413 List<Get> gets = new ArrayList<>(NB_REGS); 1414 for (int i = 0; i < NB_REGS; i++) { 1415 HRegionInfo hri = 1416 new HRegionInfo(DUMMY_TABLE, Bytes.toBytes(i * 10L), Bytes.toBytes(i * 10L + 9L), false, i); 1417 HRegionLocation hrl = new HRegionLocation(hri, i % 2 == 0 ? sn : sn2); 1418 hrls.add(hrl); 1419 1420 Get get = new Get(Bytes.toBytes(i * 10L)); 1421 gets.add(get); 1422 } 1423 1424 MyConnectionImpl2 con = new MyConnectionImpl2(hrls, CONF); 1425 MyAsyncProcess ap = new MyAsyncProcess(con, CONF, con.nbThreads); 1426 HTable ht = (HTable) con.getTable(DUMMY_TABLE, ap.service); 1427 ht.multiAp = ap; 1428 ht.batch(gets, null); 1429 1430 Assert.assertEquals(NB_REGS, ap.nbActions.get()); 1431 Assert.assertEquals("1 multi response per server", 2, ap.nbMultiResponse.get()); 1432 Assert.assertEquals("1 thread per server", 2, con.nbThreads.get()); 1433 1434 int nbReg = 0; 1435 for (int i = 0; i < NB_REGS; i++) { 1436 if (con.usedRegions[i]) nbReg++; 1437 } 1438 Assert.assertEquals("nbReg=" + nbReg, NB_REGS, nbReg); 1439 } 1440 1441 @Test 1442 public void testReplicaReplicaSuccess() throws Exception { 1443 // Main call takes too long so replicas succeed, except for one region w/o replicas. 1444 // One region has no replica, so the main call succeeds for it. 1445 MyAsyncProcessWithReplicas ap = createReplicaAp(10, 1000, 0); 1446 List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2, DUMMY_BYTES_3); 1447 AsyncProcessTask task = AsyncProcessTask.newBuilder().setPool(ap.service) 1448 .setRpcTimeout(RPC_TIMEOUT).setOperationTimeout(OPERATION_TIMEOUT).setTableName(DUMMY_TABLE) 1449 .setRowAccess(rows).setResults(new Object[3]).setSubmittedRows(SubmittedRows.ALL).build(); 1450 AsyncRequestFuture ars = ap.submit(task); 1451 verifyReplicaResult(ars, RR.TRUE, RR.TRUE, RR.FALSE); 1452 Assert.assertEquals(2, ap.getReplicaCallCount()); 1453 } 1454 1455 @Test 1456 public void testReplicaPrimarySuccessWoReplicaCalls() throws Exception { 1457 // Main call succeeds before replica calls are kicked off. 1458 MyAsyncProcessWithReplicas ap = createReplicaAp(1000, 10, 0); 1459 List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2, DUMMY_BYTES_3); 1460 AsyncProcessTask task = AsyncProcessTask.newBuilder().setPool(ap.service) 1461 .setRpcTimeout(RPC_TIMEOUT).setOperationTimeout(OPERATION_TIMEOUT).setTableName(DUMMY_TABLE) 1462 .setRowAccess(rows).setResults(new Object[3]).setSubmittedRows(SubmittedRows.ALL).build(); 1463 AsyncRequestFuture ars = ap.submit(task); 1464 verifyReplicaResult(ars, RR.FALSE, RR.FALSE, RR.FALSE); 1465 Assert.assertEquals(0, ap.getReplicaCallCount()); 1466 } 1467 1468 @Test 1469 public void testReplicaParallelCallsSucceed() throws Exception { 1470 // Either main or replica can succeed. 1471 MyAsyncProcessWithReplicas ap = createReplicaAp(0, 0, 0); 1472 List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2); 1473 AsyncProcessTask task = AsyncProcessTask.newBuilder().setPool(ap.service) 1474 .setRpcTimeout(RPC_TIMEOUT).setOperationTimeout(OPERATION_TIMEOUT).setTableName(DUMMY_TABLE) 1475 .setRowAccess(rows).setResults(new Object[2]).setSubmittedRows(SubmittedRows.ALL).build(); 1476 AsyncRequestFuture ars = ap.submit(task); 1477 verifyReplicaResult(ars, RR.DONT_CARE, RR.DONT_CARE); 1478 long replicaCalls = ap.getReplicaCallCount(); 1479 Assert.assertTrue(replicaCalls >= 0); 1480 Assert.assertTrue(replicaCalls <= 2); 1481 } 1482 1483 @Test 1484 public void testReplicaPartialReplicaCall() throws Exception { 1485 // One server is slow, so the result for its region comes from replica, whereas 1486 // the result for other region comes from primary before replica calls happen. 1487 // There should be no replica call for that region at all. 1488 MyAsyncProcessWithReplicas ap = createReplicaAp(1000, 0, 0); 1489 ap.setPrimaryCallDelay(sn2, 2000); 1490 List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2); 1491 AsyncProcessTask task = AsyncProcessTask.newBuilder().setPool(ap.service) 1492 .setRpcTimeout(RPC_TIMEOUT).setOperationTimeout(OPERATION_TIMEOUT).setTableName(DUMMY_TABLE) 1493 .setRowAccess(rows).setResults(new Object[2]).setSubmittedRows(SubmittedRows.ALL).build(); 1494 AsyncRequestFuture ars = ap.submit(task); 1495 verifyReplicaResult(ars, RR.FALSE, RR.TRUE); 1496 Assert.assertEquals(1, ap.getReplicaCallCount()); 1497 } 1498 1499 @Test 1500 public void testReplicaMainFailsBeforeReplicaCalls() throws Exception { 1501 // Main calls fail before replica calls can start - this is currently not handled. 1502 // It would probably never happen if we can get location (due to retries), 1503 // and it would require additional synchronization. 1504 MyAsyncProcessWithReplicas ap = createReplicaAp(1000, 0, 0, 0); 1505 ap.addFailures(hri1, hri2); 1506 List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2); 1507 AsyncProcessTask task = AsyncProcessTask.newBuilder().setPool(ap.service) 1508 .setRpcTimeout(RPC_TIMEOUT).setOperationTimeout(OPERATION_TIMEOUT).setTableName(DUMMY_TABLE) 1509 .setRowAccess(rows).setResults(new Object[2]).setSubmittedRows(SubmittedRows.ALL).build(); 1510 AsyncRequestFuture ars = ap.submit(task); 1511 verifyReplicaResult(ars, RR.FAILED, RR.FAILED); 1512 Assert.assertEquals(0, ap.getReplicaCallCount()); 1513 } 1514 1515 @Test 1516 public void testReplicaReplicaSuccessWithParallelFailures() throws Exception { 1517 // Main calls fails after replica calls start. For two-replica region, one replica call 1518 // also fails. Regardless, we get replica results for both regions. 1519 MyAsyncProcessWithReplicas ap = createReplicaAp(0, 1000, 1000, 0); 1520 ap.addFailures(hri1, hri1r2, hri2); 1521 List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2); 1522 AsyncProcessTask task = AsyncProcessTask.newBuilder().setPool(ap.service) 1523 .setRpcTimeout(RPC_TIMEOUT).setOperationTimeout(OPERATION_TIMEOUT).setTableName(DUMMY_TABLE) 1524 .setRowAccess(rows).setResults(new Object[2]).setSubmittedRows(SubmittedRows.ALL).build(); 1525 AsyncRequestFuture ars = ap.submit(task); 1526 verifyReplicaResult(ars, RR.TRUE, RR.TRUE); 1527 Assert.assertEquals(2, ap.getReplicaCallCount()); 1528 } 1529 1530 @Test 1531 public void testReplicaAllCallsFailForOneRegion() throws Exception { 1532 // For one of the region, all 3, main and replica, calls fail. For the other, replica 1533 // call fails but its exception should not be visible as it did succeed. 1534 MyAsyncProcessWithReplicas ap = createReplicaAp(500, 1000, 0, 0); 1535 ap.addFailures(hri1, hri1r1, hri1r2, hri2r1); 1536 List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2); 1537 AsyncProcessTask task = AsyncProcessTask.newBuilder().setPool(ap.service) 1538 .setRpcTimeout(RPC_TIMEOUT).setOperationTimeout(OPERATION_TIMEOUT).setTableName(DUMMY_TABLE) 1539 .setRowAccess(rows).setResults(new Object[2]).setSubmittedRows(SubmittedRows.ALL).build(); 1540 AsyncRequestFuture ars = ap.submit(task); 1541 verifyReplicaResult(ars, RR.FAILED, RR.FALSE); 1542 // We should get 3 exceptions, for main + 2 replicas for DUMMY_BYTES_1 1543 Assert.assertEquals(3, ars.getErrors().getNumExceptions()); 1544 for (int i = 0; i < ars.getErrors().getNumExceptions(); ++i) { 1545 Assert.assertArrayEquals(DUMMY_BYTES_1, ars.getErrors().getRow(i).getRow()); 1546 } 1547 } 1548 1549 private MyAsyncProcessWithReplicas createReplicaAp(int replicaAfterMs, int primaryMs, 1550 int replicaMs) throws Exception { 1551 return createReplicaAp(replicaAfterMs, primaryMs, replicaMs, -1); 1552 } 1553 1554 private MyAsyncProcessWithReplicas createReplicaAp(int replicaAfterMs, int primaryMs, 1555 int replicaMs, int retries) throws Exception { 1556 // TODO: this is kind of timing dependent... perhaps it should detect from createCaller 1557 // that the replica call has happened and that way control the ordering. 1558 Configuration conf = new Configuration(); 1559 conf.setInt(AsyncProcess.PRIMARY_CALL_TIMEOUT_KEY, replicaAfterMs * 1000); 1560 if (retries >= 0) { 1561 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, retries); 1562 } 1563 ClusterConnection conn = createHConnectionWithReplicas(new ConnectionConfiguration(conf)); 1564 MyAsyncProcessWithReplicas ap = new MyAsyncProcessWithReplicas(conn, conf); 1565 ap.setCallDelays(primaryMs, replicaMs); 1566 return ap; 1567 } 1568 1569 private BufferedMutatorParams createBufferedMutatorParams(MyAsyncProcess ap, TableName name) { 1570 return new BufferedMutatorParams(name).pool(ap.service).rpcTimeout(RPC_TIMEOUT) 1571 .opertationTimeout(OPERATION_TIMEOUT); 1572 } 1573 1574 private static List<Get> makeTimelineGets(byte[]... rows) { 1575 List<Get> result = new ArrayList<>(rows.length); 1576 for (byte[] row : rows) { 1577 Get get = new Get(row); 1578 get.setConsistency(Consistency.TIMELINE); 1579 result.add(get); 1580 } 1581 return result; 1582 } 1583 1584 private void verifyResult(AsyncRequestFuture ars, boolean... expected) throws Exception { 1585 Object[] actual = ars.getResults(); 1586 Assert.assertEquals(expected.length, actual.length); 1587 for (int i = 0; i < expected.length; ++i) { 1588 Assert.assertEquals(expected[i], !(actual[i] instanceof Throwable)); 1589 } 1590 } 1591 1592 /** After reading TheDailyWtf, I always wanted to create a MyBoolean enum like this! */ 1593 private enum RR { 1594 TRUE, 1595 FALSE, 1596 DONT_CARE, 1597 FAILED 1598 } 1599 1600 private void verifyReplicaResult(AsyncRequestFuture ars, RR... expecteds) throws Exception { 1601 Object[] actuals = ars.getResults(); 1602 Assert.assertEquals(expecteds.length, actuals.length); 1603 for (int i = 0; i < expecteds.length; ++i) { 1604 Object actual = actuals[i]; 1605 RR expected = expecteds[i]; 1606 Assert.assertEquals(actual.toString(), expected == RR.FAILED, actual instanceof Throwable); 1607 if (expected != RR.FAILED && expected != RR.DONT_CARE) { 1608 Assert.assertEquals(expected == RR.TRUE, ((Result) actual).isStale()); 1609 } 1610 } 1611 } 1612 1613 /** 1614 * @param regCnt the region: 1 to 3. 1615 * @param success if true, the put will succeed. 1616 * @return a put 1617 */ 1618 private Put createPut(int regCnt, boolean success) { 1619 Put p; 1620 if (!success) { 1621 p = new Put(FAILS); 1622 } else switch (regCnt) { 1623 case 1: 1624 p = new Put(DUMMY_BYTES_1); 1625 break; 1626 case 2: 1627 p = new Put(DUMMY_BYTES_2); 1628 break; 1629 case 3: 1630 p = new Put(DUMMY_BYTES_3); 1631 break; 1632 default: 1633 throw new IllegalArgumentException("unknown " + regCnt); 1634 } 1635 1636 p.addColumn(DUMMY_BYTES_1, DUMMY_BYTES_1, DUMMY_BYTES_1); 1637 1638 return p; 1639 } 1640 1641 static class MyThreadPoolExecutor extends ThreadPoolExecutor { 1642 public MyThreadPoolExecutor(int coreThreads, int maxThreads, long keepAliveTime, 1643 TimeUnit timeunit, BlockingQueue<Runnable> blockingqueue) { 1644 super(coreThreads, maxThreads, keepAliveTime, timeunit, blockingqueue); 1645 } 1646 1647 @Override 1648 public Future submit(Runnable runnable) { 1649 throw new OutOfMemoryError("OutOfMemory error thrown by means"); 1650 } 1651 } 1652 1653 static class AsyncProcessForThrowableCheck extends AsyncProcess { 1654 public AsyncProcessForThrowableCheck(ClusterConnection hc, Configuration conf) { 1655 super(hc, conf, new RpcRetryingCallerFactory(conf), new RpcControllerFactory(conf)); 1656 } 1657 } 1658 1659 @Test 1660 public void testUncheckedException() throws Exception { 1661 // Test the case pool.submit throws unchecked exception 1662 ClusterConnection hc = createHConnection(); 1663 MyThreadPoolExecutor myPool = 1664 new MyThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(200)); 1665 AsyncProcess ap = new AsyncProcessForThrowableCheck(hc, CONF); 1666 1667 List<Put> puts = new ArrayList<>(1); 1668 puts.add(createPut(1, true)); 1669 AsyncProcessTask task = AsyncProcessTask.newBuilder().setPool(myPool).setRpcTimeout(RPC_TIMEOUT) 1670 .setOperationTimeout(OPERATION_TIMEOUT).setTableName(DUMMY_TABLE).setRowAccess(puts) 1671 .setSubmittedRows(SubmittedRows.NORMAL).build(); 1672 ap.submit(task); 1673 Assert.assertTrue(puts.isEmpty()); 1674 } 1675 1676 /** 1677 * Below tests make sure we could use a special pause setting when retry an exception where 1678 * {@link HBaseServerException#isServerOverloaded(Throwable)} is true, see HBASE-17114 1679 */ 1680 1681 @Test 1682 public void testRetryPauseWhenServerOverloadedDueToCQTBE() throws Exception { 1683 testRetryPauseWhenServerIsOverloaded(new CallQueueTooBigException()); 1684 } 1685 1686 @Test 1687 public void testRetryPauseWhenServerOverloadedDueToCDE() throws Exception { 1688 testRetryPauseWhenServerIsOverloaded(new CallDroppedException()); 1689 } 1690 1691 private void testRetryPauseWhenServerIsOverloaded(HBaseServerException exception) 1692 throws IOException { 1693 Configuration conf = new Configuration(CONF); 1694 final long specialPause = 500L; 1695 final int retries = 1; 1696 conf.setLong(ConnectionConfiguration.HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED, specialPause); 1697 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, retries); 1698 1699 ClusterConnection conn = new MyConnectionImpl(conf); 1700 AsyncProcessWithFailure ap = new AsyncProcessWithFailure(conn, conf, exception); 1701 BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); 1702 BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap); 1703 1704 Assert.assertNotNull(mutator.getAsyncProcess().createServerErrorTracker()); 1705 1706 Put p = createPut(1, true); 1707 mutator.mutate(p); 1708 1709 long startTime = EnvironmentEdgeManager.currentTime(); 1710 try { 1711 mutator.flush(); 1712 Assert.fail(); 1713 } catch (RetriesExhaustedWithDetailsException expected) { 1714 assertEquals(1, expected.getNumExceptions()); 1715 assertTrue(expected.getRow(0) == p); 1716 } 1717 long actualSleep = EnvironmentEdgeManager.currentTime() - startTime; 1718 long expectedSleep = 0L; 1719 for (int i = 0; i < retries; i++) { 1720 expectedSleep += ConnectionUtils.getPauseTime(specialPause, i); 1721 // Prevent jitter in ConcurrentMapUtils#getPauseTime to affect result 1722 actualSleep += (long) (specialPause * 0.01f); 1723 } 1724 LOG.debug("Expected to sleep " + expectedSleep + "ms, actually slept " + actualSleep + "ms"); 1725 Assert.assertTrue("Expected to sleep " + expectedSleep + " but actually " + actualSleep + "ms", 1726 actualSleep >= expectedSleep); 1727 1728 // check and confirm normal IOE will use the normal pause 1729 final long normalPause = 1730 conf.getLong(HConstants.HBASE_CLIENT_PAUSE, HConstants.DEFAULT_HBASE_CLIENT_PAUSE); 1731 ap = new AsyncProcessWithFailure(conn, conf, new IOException()); 1732 bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); 1733 mutator = new BufferedMutatorImpl(conn, bufferParam, ap); 1734 Assert.assertNotNull(mutator.getAsyncProcess().createServerErrorTracker()); 1735 mutator.mutate(p); 1736 startTime = EnvironmentEdgeManager.currentTime(); 1737 try { 1738 mutator.flush(); 1739 Assert.fail(); 1740 } catch (RetriesExhaustedWithDetailsException expected) { 1741 assertEquals(1, expected.getNumExceptions()); 1742 assertTrue(expected.getRow(0) == p); 1743 } 1744 actualSleep = EnvironmentEdgeManager.currentTime() - startTime; 1745 expectedSleep = 0L; 1746 for (int i = 0; i < retries; i++) { 1747 expectedSleep += ConnectionUtils.getPauseTime(normalPause, i); 1748 } 1749 // plus an additional pause to balance the program execution time 1750 expectedSleep += normalPause; 1751 LOG.debug("Expected to sleep " + expectedSleep + "ms, actually slept " + actualSleep + "ms"); 1752 Assert.assertTrue("Slept for too long: " + actualSleep + "ms", actualSleep <= expectedSleep); 1753 } 1754 1755 @Test 1756 public void testRetryWithExceptionClearsMetaCache() throws Exception { 1757 Configuration myConf = new Configuration(CONF); 1758 myConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0); 1759 ClusterConnection conn = createHConnection(new ConnectionConfiguration(myConf)); 1760 1761 AsyncProcessWithFailure ap = 1762 new AsyncProcessWithFailure(conn, myConf, new RegionOpeningException("test")); 1763 BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); 1764 BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap); 1765 1766 Assert.assertNotNull(mutator.getAsyncProcess().createServerErrorTracker()); 1767 1768 Assert.assertEquals(conn.locateRegion(DUMMY_TABLE, DUMMY_BYTES_1, true, true).toString(), 1769 new RegionLocations(loc1).toString()); 1770 1771 Mockito.verify(conn, Mockito.times(0)).clearCaches(Mockito.any()); 1772 1773 Put p = createPut(1, true); 1774 mutator.mutate(p); 1775 1776 try { 1777 mutator.flush(); 1778 Assert.fail(); 1779 } catch (RetriesExhaustedWithDetailsException expected) { 1780 assertEquals(1, expected.getNumExceptions()); 1781 assertTrue(expected.getRow(0) == p); 1782 } 1783 1784 Mockito.verify(conn, Mockito.times(1)).clearCaches(loc1.getServerName()); 1785 } 1786 1787 @Test 1788 public void testQueueRowAccess() throws Exception { 1789 ClusterConnection conn = createHConnection(); 1790 BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, null, null, 1791 new BufferedMutatorParams(DUMMY_TABLE).writeBufferSize(100000)); 1792 Put p0 = new Put(DUMMY_BYTES_1).addColumn(DUMMY_BYTES_1, DUMMY_BYTES_1, DUMMY_BYTES_1); 1793 Put p1 = new Put(DUMMY_BYTES_2).addColumn(DUMMY_BYTES_2, DUMMY_BYTES_2, DUMMY_BYTES_2); 1794 mutator.mutate(p0); 1795 BufferedMutatorImpl.QueueRowAccess ra0 = mutator.createQueueRowAccess(); 1796 // QueueRowAccess should take all undealt mutations 1797 assertEquals(0, mutator.size()); 1798 mutator.mutate(p1); 1799 assertEquals(1, mutator.size()); 1800 BufferedMutatorImpl.QueueRowAccess ra1 = mutator.createQueueRowAccess(); 1801 // QueueRowAccess should take all undealt mutations 1802 assertEquals(0, mutator.size()); 1803 assertEquals(1, ra0.size()); 1804 assertEquals(1, ra1.size()); 1805 Iterator<Row> iter0 = ra0.iterator(); 1806 Iterator<Row> iter1 = ra1.iterator(); 1807 assertTrue(iter0.hasNext()); 1808 assertTrue(iter1.hasNext()); 1809 // the next() will poll the mutation from inner buffer and update the buffer count 1810 assertTrue(iter0.next() == p0); 1811 assertEquals(1, mutator.getUnflushedSize()); 1812 assertEquals(p1.heapSize(), mutator.getCurrentWriteBufferSize()); 1813 assertTrue(iter1.next() == p1); 1814 assertEquals(0, mutator.getUnflushedSize()); 1815 assertEquals(0, mutator.getCurrentWriteBufferSize()); 1816 assertFalse(iter0.hasNext()); 1817 assertFalse(iter1.hasNext()); 1818 // ra0 doest handle the mutation so the mutation won't be pushed back to buffer 1819 iter0.remove(); 1820 ra0.close(); 1821 assertEquals(0, mutator.size()); 1822 assertEquals(0, mutator.getUnflushedSize()); 1823 assertEquals(0, mutator.getCurrentWriteBufferSize()); 1824 // ra1 doesn't handle the mutation so the mutation will be pushed back to buffer 1825 ra1.close(); 1826 assertEquals(1, mutator.size()); 1827 assertEquals(1, mutator.getUnflushedSize()); 1828 assertEquals(p1.heapSize(), mutator.getCurrentWriteBufferSize()); 1829 } 1830}