001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023 024import java.io.IOException; 025import java.io.InterruptedIOException; 026import java.util.ArrayList; 027import java.util.Arrays; 028import java.util.Collections; 029import java.util.HashMap; 030import java.util.Iterator; 031import java.util.LinkedList; 032import java.util.List; 033import java.util.Map; 034import java.util.Random; 035import java.util.Set; 036import java.util.TreeSet; 037import java.util.concurrent.BlockingQueue; 038import java.util.concurrent.CompletableFuture; 039import java.util.concurrent.ExecutorService; 040import java.util.concurrent.Executors; 041import java.util.concurrent.Future; 042import java.util.concurrent.LinkedBlockingQueue; 043import java.util.concurrent.SynchronousQueue; 044import java.util.concurrent.ThreadFactory; 045import java.util.concurrent.ThreadPoolExecutor; 046import java.util.concurrent.TimeUnit; 047import java.util.concurrent.atomic.AtomicBoolean; 048import java.util.concurrent.atomic.AtomicInteger; 049import java.util.concurrent.atomic.AtomicLong; 050import org.apache.hadoop.conf.Configuration; 051import org.apache.hadoop.hbase.CallQueueTooBigException; 052import org.apache.hadoop.hbase.Cell; 053import org.apache.hadoop.hbase.HBaseClassTestRule; 054import org.apache.hadoop.hbase.HConstants; 055import org.apache.hadoop.hbase.HRegionInfo; 056import org.apache.hadoop.hbase.HRegionLocation; 057import org.apache.hadoop.hbase.RegionLocations; 058import org.apache.hadoop.hbase.ServerName; 059import org.apache.hadoop.hbase.TableName; 060import org.apache.hadoop.hbase.client.AsyncProcessTask.ListRowAccess; 061import org.apache.hadoop.hbase.client.AsyncProcessTask.SubmittedRows; 062import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy; 063import org.apache.hadoop.hbase.client.backoff.ServerStatistics; 064import org.apache.hadoop.hbase.client.coprocessor.Batch; 065import org.apache.hadoop.hbase.exceptions.RegionOpeningException; 066import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 067import org.apache.hadoop.hbase.testclassification.ClientTests; 068import org.apache.hadoop.hbase.testclassification.LargeTests; 069import org.apache.hadoop.hbase.util.Bytes; 070import org.apache.hadoop.hbase.util.Threads; 071import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 072import org.junit.Assert; 073import org.junit.Before; 074import org.junit.ClassRule; 075import org.junit.Test; 076import org.junit.experimental.categories.Category; 077import org.mockito.Mockito; 078import org.slf4j.Logger; 079import org.slf4j.LoggerFactory; 080 081@Category({ClientTests.class, LargeTests.class}) 082public class TestAsyncProcess { 083 084 @ClassRule 085 public static final HBaseClassTestRule CLASS_RULE = 086 HBaseClassTestRule.forClass(TestAsyncProcess.class); 087 088 private static final Logger LOG = LoggerFactory.getLogger(TestAsyncProcess.class); 089 private static final TableName DUMMY_TABLE = 090 TableName.valueOf("DUMMY_TABLE"); 091 private static final byte[] DUMMY_BYTES_1 = Bytes.toBytes("DUMMY_BYTES_1"); 092 private static final byte[] DUMMY_BYTES_2 = Bytes.toBytes("DUMMY_BYTES_2"); 093 private static final byte[] DUMMY_BYTES_3 = Bytes.toBytes("DUMMY_BYTES_3"); 094 private static final byte[] FAILS = Bytes.toBytes("FAILS"); 095 private Configuration CONF; 096 private ConnectionConfiguration CONNECTION_CONFIG; 097 private static final ServerName sn = ServerName.valueOf("s1,1,1"); 098 private static final ServerName sn2 = ServerName.valueOf("s2,2,2"); 099 private static final ServerName sn3 = ServerName.valueOf("s3,3,3"); 100 private static final HRegionInfo hri1 = 101 new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_1, DUMMY_BYTES_2, false, 1); 102 private static final HRegionInfo hri2 = 103 new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_2, HConstants.EMPTY_END_ROW, false, 2); 104 private static final HRegionInfo hri3 = 105 new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_3, HConstants.EMPTY_END_ROW, false, 3); 106 private static final HRegionLocation loc1 = new HRegionLocation(hri1, sn); 107 private static final HRegionLocation loc2 = new HRegionLocation(hri2, sn); 108 private static final HRegionLocation loc3 = new HRegionLocation(hri3, sn2); 109 110 // Replica stuff 111 private static final RegionInfo hri1r1 = RegionReplicaUtil.getRegionInfoForReplica(hri1, 1); 112 private static final RegionInfo hri1r2 = RegionReplicaUtil.getRegionInfoForReplica(hri1, 2); 113 private static final RegionInfo hri2r1 = RegionReplicaUtil.getRegionInfoForReplica(hri2, 1); 114 private static final RegionLocations hrls1 = new RegionLocations(new HRegionLocation(hri1, sn), 115 new HRegionLocation(hri1r1, sn2), new HRegionLocation(hri1r2, sn3)); 116 private static final RegionLocations hrls2 = new RegionLocations(new HRegionLocation(hri2, sn2), 117 new HRegionLocation(hri2r1, sn3)); 118 private static final RegionLocations hrls3 = 119 new RegionLocations(new HRegionLocation(hri3, sn3), null); 120 121 private static final String success = "success"; 122 private static Exception failure = new Exception("failure"); 123 124 private static final int NB_RETRIES = 3; 125 126 private int RPC_TIMEOUT; 127 private int OPERATION_TIMEOUT; 128 129 @Before 130 public void beforeEach() { 131 this.CONF = new Configuration(); 132 CONF.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, NB_RETRIES); 133 this.CONNECTION_CONFIG = new ConnectionConfiguration(CONF); 134 this.RPC_TIMEOUT = CONF.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 135 HConstants.DEFAULT_HBASE_RPC_TIMEOUT); 136 this.OPERATION_TIMEOUT = CONF.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 137 HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); 138 } 139 140 static class CountingThreadFactory implements ThreadFactory { 141 final AtomicInteger nbThreads; 142 ThreadFactory realFactory = 143 new ThreadFactoryBuilder().setNameFormat("test-TestAsyncProcess-pool-%d") 144 .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(); 145 @Override 146 public Thread newThread(Runnable r) { 147 nbThreads.incrementAndGet(); 148 return realFactory.newThread(r); 149 } 150 151 CountingThreadFactory(AtomicInteger nbThreads){ 152 this.nbThreads = nbThreads; 153 } 154 } 155 156 static class MyAsyncProcess extends AsyncProcess { 157 final AtomicInteger nbMultiResponse = new AtomicInteger(); 158 final AtomicInteger nbActions = new AtomicInteger(); 159 public List<AsyncRequestFuture> allReqs = new ArrayList<>(); 160 public AtomicInteger callsCt = new AtomicInteger(); 161 private Configuration conf; 162 163 private long previousTimeout = -1; 164 final ExecutorService service; 165 @Override 166 protected <Res> AsyncRequestFutureImpl<Res> createAsyncRequestFuture( 167 AsyncProcessTask task, List<Action> actions, long nonceGroup) { 168 // Test HTable has tableName of null, so pass DUMMY_TABLE 169 AsyncProcessTask wrap = new AsyncProcessTask(task){ 170 @Override 171 public TableName getTableName() { 172 return DUMMY_TABLE; 173 } 174 }; 175 AsyncRequestFutureImpl<Res> r = new MyAsyncRequestFutureImpl<>( 176 wrap, actions, nonceGroup, this); 177 allReqs.add(r); 178 return r; 179 } 180 181 public MyAsyncProcess(ClusterConnection hc, Configuration conf) { 182 super(hc, conf, 183 new RpcRetryingCallerFactory(conf), new RpcControllerFactory(conf)); 184 service = Executors.newFixedThreadPool(5); 185 this.conf = conf; 186 } 187 188 public MyAsyncProcess(ClusterConnection hc, Configuration conf, AtomicInteger nbThreads) { 189 super(hc, conf, new RpcRetryingCallerFactory(conf), new RpcControllerFactory(conf)); 190 service = new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS, 191 new SynchronousQueue<>(), new CountingThreadFactory(nbThreads)); 192 } 193 194 public <CResult> AsyncRequestFuture submit(ExecutorService pool, TableName tableName, 195 List<? extends Row> rows, boolean atLeastOne, Batch.Callback<CResult> callback, 196 boolean needResults) throws InterruptedIOException { 197 AsyncProcessTask task = AsyncProcessTask.newBuilder(callback) 198 .setPool(pool == null ? service : pool) 199 .setTableName(tableName) 200 .setRowAccess(rows) 201 .setSubmittedRows(atLeastOne ? SubmittedRows.AT_LEAST_ONE : SubmittedRows.NORMAL) 202 .setNeedResults(needResults) 203 .setRpcTimeout(conf.getInt(HConstants.HBASE_RPC_READ_TIMEOUT_KEY, 204 HConstants.DEFAULT_HBASE_RPC_TIMEOUT)) 205 .setOperationTimeout(conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 206 HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT)) 207 .build(); 208 return submit(task); 209 } 210 211 public <CResult> AsyncRequestFuture submit(TableName tableName, 212 final List<? extends Row> rows, boolean atLeastOne, Batch.Callback<CResult> callback, 213 boolean needResults) throws InterruptedIOException { 214 return submit(null, tableName, rows, atLeastOne, callback, needResults); 215 } 216 217 @Override 218 public <Res> AsyncRequestFuture submit(AsyncProcessTask<Res> task) 219 throws InterruptedIOException { 220 previousTimeout = task.getRpcTimeout(); 221 // We use results in tests to check things, so override to always save them. 222 AsyncProcessTask<Res> wrap = new AsyncProcessTask<Res>(task) { 223 @Override 224 public boolean getNeedResults() { 225 return true; 226 } 227 }; 228 return super.submit(wrap); 229 } 230 231 @Override 232 protected RpcRetryingCaller<AbstractResponse> createCaller( 233 CancellableRegionServerCallable callable, int rpcTimeout) { 234 callsCt.incrementAndGet(); 235 MultiServerCallable callable1 = (MultiServerCallable) callable; 236 final MultiResponse mr = createMultiResponse( 237 callable1.getMulti(), nbMultiResponse, nbActions, 238 new ResponseGenerator() { 239 @Override 240 public void addResponse(MultiResponse mr, byte[] regionName, Action a) { 241 if (Arrays.equals(FAILS, a.getAction().getRow())) { 242 mr.add(regionName, a.getOriginalIndex(), failure); 243 } else { 244 mr.add(regionName, a.getOriginalIndex(), success); 245 } 246 } 247 }); 248 249 return new RpcRetryingCallerImpl<AbstractResponse>(100, 500, 10, 9) { 250 @Override 251 public AbstractResponse callWithoutRetries(RetryingCallable<AbstractResponse> callable, 252 int callTimeout) 253 throws IOException, RuntimeException { 254 try { 255 // sleep one second in order for threadpool to start another thread instead of reusing 256 // existing one. 257 Thread.sleep(1000); 258 } catch (InterruptedException e) { 259 // ignore error 260 } 261 return mr; 262 } 263 }; 264 } 265 266 267 } 268 269 static class MyAsyncRequestFutureImpl<Res> extends AsyncRequestFutureImpl<Res> { 270 private final Map<ServerName, List<Long>> heapSizesByServer = new HashMap<>(); 271 public MyAsyncRequestFutureImpl(AsyncProcessTask task, List<Action> actions, 272 long nonceGroup, AsyncProcess asyncProcess) { 273 super(task, actions, nonceGroup, asyncProcess); 274 } 275 276 @Override 277 protected void updateStats(ServerName server, MultiResponse resp) { 278 // Do nothing for avoiding the NPE if we test the ClientBackofPolicy. 279 } 280 281 Map<ServerName, List<Long>> getRequestHeapSize() { 282 return heapSizesByServer; 283 } 284 285 @Override 286 SingleServerRequestRunnable createSingleServerRequest( 287 MultiAction multiAction, int numAttempt, ServerName server, 288 Set<CancellableRegionServerCallable> callsInProgress) { 289 SingleServerRequestRunnable rq = new SingleServerRequestRunnable( 290 multiAction, numAttempt, server, callsInProgress); 291 List<Long> heapCount = heapSizesByServer.get(server); 292 if (heapCount == null) { 293 heapCount = new ArrayList<>(); 294 heapSizesByServer.put(server, heapCount); 295 } 296 heapCount.add(heapSizeOf(multiAction)); 297 return rq; 298 } 299 300 private long heapSizeOf(MultiAction multiAction) { 301 return multiAction.actions.values().stream() 302 .flatMap(v -> v.stream()) 303 .map(action -> action.getAction()) 304 .filter(row -> row instanceof Mutation) 305 .mapToLong(row -> ((Mutation) row).heapSize()) 306 .sum(); 307 } 308 } 309 310 static class CallerWithFailure extends RpcRetryingCallerImpl<AbstractResponse>{ 311 312 private final IOException e; 313 314 public CallerWithFailure(IOException e) { 315 super(100, 500, 100, 9); 316 this.e = e; 317 } 318 319 @Override 320 public AbstractResponse callWithoutRetries(RetryingCallable<AbstractResponse> callable, 321 int callTimeout) 322 throws IOException, RuntimeException { 323 throw e; 324 } 325 } 326 327 328 static class AsyncProcessWithFailure extends MyAsyncProcess { 329 330 private final IOException ioe; 331 332 public AsyncProcessWithFailure(ClusterConnection hc, Configuration conf, IOException ioe) { 333 super(hc, conf); 334 this.ioe = ioe; 335 serverTrackerTimeout = 1L; 336 } 337 338 @Override 339 protected RpcRetryingCaller<AbstractResponse> createCaller( 340 CancellableRegionServerCallable callable, int rpcTimeout) { 341 callsCt.incrementAndGet(); 342 return new CallerWithFailure(ioe); 343 } 344 } 345 346 /** 347 * Make the backoff time always different on each call. 348 */ 349 static class MyClientBackoffPolicy implements ClientBackoffPolicy { 350 private final Map<ServerName, AtomicInteger> count = new HashMap<>(); 351 @Override 352 public long getBackoffTime(ServerName serverName, byte[] region, ServerStatistics stats) { 353 AtomicInteger inc = count.get(serverName); 354 if (inc == null) { 355 inc = new AtomicInteger(0); 356 count.put(serverName, inc); 357 } 358 return inc.getAndIncrement(); 359 } 360 } 361 362 static class MyAsyncProcessWithReplicas extends MyAsyncProcess { 363 private Set<byte[]> failures = new TreeSet<>(new Bytes.ByteArrayComparator()); 364 private long primarySleepMs = 0, replicaSleepMs = 0; 365 private Map<ServerName, Long> customPrimarySleepMs = new HashMap<>(); 366 private final AtomicLong replicaCalls = new AtomicLong(0); 367 368 public void addFailures(RegionInfo... hris) { 369 for (RegionInfo hri : hris) { 370 failures.add(hri.getRegionName()); 371 } 372 } 373 374 public long getReplicaCallCount() { 375 return replicaCalls.get(); 376 } 377 378 public void setPrimaryCallDelay(ServerName server, long primaryMs) { 379 customPrimarySleepMs.put(server, primaryMs); 380 } 381 382 public MyAsyncProcessWithReplicas(ClusterConnection hc, Configuration conf) { 383 super(hc, conf); 384 } 385 386 public void setCallDelays(long primaryMs, long replicaMs) { 387 this.primarySleepMs = primaryMs; 388 this.replicaSleepMs = replicaMs; 389 } 390 391 @Override 392 protected RpcRetryingCaller<AbstractResponse> createCaller( 393 CancellableRegionServerCallable payloadCallable, int rpcTimeout) { 394 MultiServerCallable callable = (MultiServerCallable) payloadCallable; 395 final MultiResponse mr = createMultiResponse( 396 callable.getMulti(), nbMultiResponse, nbActions, new ResponseGenerator() { 397 @Override 398 public void addResponse(MultiResponse mr, byte[] regionName, Action a) { 399 if (failures.contains(regionName)) { 400 mr.add(regionName, a.getOriginalIndex(), failure); 401 } else { 402 boolean isStale = !RegionReplicaUtil.isDefaultReplica(a.getReplicaId()); 403 mr.add(regionName, a.getOriginalIndex(), 404 Result.create(new Cell[0], null, isStale)); 405 } 406 } 407 }); 408 // Currently AsyncProcess either sends all-replica, or all-primary request. 409 final boolean isDefault = RegionReplicaUtil.isDefaultReplica( 410 callable.getMulti().actions.values().iterator().next().iterator().next().getReplicaId()); 411 final ServerName server = ((MultiServerCallable)callable).getServerName(); 412 String debugMsg = "Call to " + server + ", primary=" + isDefault + " with " 413 + callable.getMulti().actions.size() + " entries: "; 414 for (byte[] region : callable.getMulti().actions.keySet()) { 415 debugMsg += "[" + Bytes.toStringBinary(region) + "], "; 416 } 417 LOG.debug(debugMsg); 418 if (!isDefault) { 419 replicaCalls.incrementAndGet(); 420 } 421 422 return new RpcRetryingCallerImpl<AbstractResponse>(100, 500, 10, 9) { 423 @Override 424 public MultiResponse callWithoutRetries(RetryingCallable<AbstractResponse> callable, 425 int callTimeout) 426 throws IOException, RuntimeException { 427 long sleep = -1; 428 if (isDefault) { 429 Long customSleep = customPrimarySleepMs.get(server); 430 sleep = (customSleep == null ? primarySleepMs : customSleep.longValue()); 431 } else { 432 sleep = replicaSleepMs; 433 } 434 if (sleep != 0) { 435 try { 436 Thread.sleep(sleep); 437 } catch (InterruptedException e) { 438 } 439 } 440 return mr; 441 } 442 }; 443 } 444 } 445 446 static MultiResponse createMultiResponse(final MultiAction multi, 447 AtomicInteger nbMultiResponse, AtomicInteger nbActions, ResponseGenerator gen) { 448 final MultiResponse mr = new MultiResponse(); 449 nbMultiResponse.incrementAndGet(); 450 for (Map.Entry<byte[], List<Action>> entry : multi.actions.entrySet()) { 451 byte[] regionName = entry.getKey(); 452 for (Action a : entry.getValue()) { 453 nbActions.incrementAndGet(); 454 gen.addResponse(mr, regionName, a); 455 } 456 } 457 return mr; 458 } 459 460 private static interface ResponseGenerator { 461 void addResponse(final MultiResponse mr, byte[] regionName, Action a); 462 } 463 464 /** 465 * Returns our async process. 466 */ 467 static class MyConnectionImpl extends ConnectionImplementation { 468 public static class TestRegistry extends DoNothingConnectionRegistry { 469 470 public TestRegistry(Configuration conf) { 471 super(conf); 472 } 473 474 @Override 475 public CompletableFuture<String> getClusterId() { 476 return CompletableFuture.completedFuture("testClusterId"); 477 } 478 } 479 480 final AtomicInteger nbThreads = new AtomicInteger(0); 481 482 protected MyConnectionImpl(Configuration conf) throws IOException { 483 super(setupConf(conf), null, null); 484 } 485 486 private static Configuration setupConf(Configuration conf) { 487 conf.setClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, 488 TestRegistry.class, ConnectionRegistry.class); 489 return conf; 490 } 491 492 @Override 493 public RegionLocations locateRegion(TableName tableName, 494 byte[] row, boolean useCache, boolean retry, int replicaId) throws IOException { 495 return new RegionLocations(loc1); 496 } 497 498 @Override 499 public boolean hasCellBlockSupport() { 500 return false; 501 } 502 } 503 504 /** 505 * Returns our async process. 506 */ 507 static class MyConnectionImpl2 extends MyConnectionImpl { 508 List<HRegionLocation> hrl; 509 final boolean usedRegions[]; 510 511 protected MyConnectionImpl2(List<HRegionLocation> hrl, Configuration conf) throws IOException { 512 super(conf); 513 this.hrl = hrl; 514 this.usedRegions = new boolean[hrl.size()]; 515 } 516 517 @Override 518 public RegionLocations locateRegion(TableName tableName, 519 byte[] row, boolean useCache, boolean retry, int replicaId) throws IOException { 520 int i = 0; 521 for (HRegionLocation hr : hrl){ 522 if (Arrays.equals(row, hr.getRegionInfo().getStartKey())) { 523 usedRegions[i] = true; 524 return new RegionLocations(hr); 525 } 526 i++; 527 } 528 return null; 529 } 530 } 531 @Test 532 public void testListRowAccess() { 533 int count = 10; 534 List<String> values = new LinkedList<>(); 535 for (int i = 0; i != count; ++i) { 536 values.add(String.valueOf(i)); 537 } 538 539 ListRowAccess<String> taker = new ListRowAccess(values); 540 assertEquals(count, taker.size()); 541 542 int restoreCount = 0; 543 int takeCount = 0; 544 Iterator<String> it = taker.iterator(); 545 while (it.hasNext()) { 546 String v = it.next(); 547 assertEquals(String.valueOf(takeCount), v); 548 ++takeCount; 549 it.remove(); 550 if (Math.random() >= 0.5) { 551 break; 552 } 553 } 554 assertEquals(count, taker.size() + takeCount); 555 556 it = taker.iterator(); 557 while (it.hasNext()) { 558 String v = it.next(); 559 assertEquals(String.valueOf(takeCount), v); 560 ++takeCount; 561 it.remove(); 562 } 563 assertEquals(0, taker.size()); 564 assertEquals(count, takeCount); 565 } 566 private static long calculateRequestCount(long putSizePerServer, long maxHeapSizePerRequest) { 567 if (putSizePerServer <= maxHeapSizePerRequest) { 568 return 1; 569 } else if (putSizePerServer % maxHeapSizePerRequest == 0) { 570 return putSizePerServer / maxHeapSizePerRequest; 571 } else { 572 return putSizePerServer / maxHeapSizePerRequest + 1; 573 } 574 } 575 576 @Test 577 public void testSubmitSameSizeOfRequest() throws Exception { 578 long writeBuffer = 2 * 1024 * 1024; 579 long putsHeapSize = writeBuffer; 580 doSubmitRequest(writeBuffer, putsHeapSize); 581 } 582 583 @Test 584 public void testSubmitLargeRequestWithUnlimitedSize() throws Exception { 585 long maxHeapSizePerRequest = Long.MAX_VALUE; 586 long putsHeapSize = 2 * 1024 * 1024; 587 doSubmitRequest(maxHeapSizePerRequest, putsHeapSize); 588 } 589 590 @Test 591 public void testSubmitRandomSizeRequest() throws Exception { 592 Random rn = new Random(); 593 final long limit = 10 * 1024 * 1024; 594 final int requestCount = 1 + (int) (rn.nextDouble() * 3); 595 long n = rn.nextLong(); 596 if (n < 0) { 597 n = -n; 598 } else if (n == 0) { 599 n = 1; 600 } 601 long putsHeapSize = n % limit; 602 long maxHeapSizePerRequest = putsHeapSize / requestCount; 603 LOG.info("[testSubmitRandomSizeRequest] maxHeapSizePerRequest=" + maxHeapSizePerRequest + 604 ", putsHeapSize=" + putsHeapSize); 605 doSubmitRequest(maxHeapSizePerRequest, putsHeapSize); 606 } 607 608 @Test 609 public void testSubmitSmallRequest() throws Exception { 610 long maxHeapSizePerRequest = 2 * 1024 * 1024; 611 long putsHeapSize = 100; 612 doSubmitRequest(maxHeapSizePerRequest, putsHeapSize); 613 } 614 615 @Test 616 public void testSubmitLargeRequest() throws Exception { 617 long maxHeapSizePerRequest = 2 * 1024 * 1024; 618 long putsHeapSize = maxHeapSizePerRequest * 2; 619 doSubmitRequest(maxHeapSizePerRequest, putsHeapSize); 620 } 621 622 private void doSubmitRequest(long maxHeapSizePerRequest, long putsHeapSize) throws Exception { 623 ClusterConnection conn = createHConnection(); 624 final String defaultClazz = 625 conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY); 626 final long defaultHeapSizePerRequest = conn.getConfiguration().getLong( 627 SimpleRequestController.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE, 628 SimpleRequestController.DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE); 629 conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY, 630 SimpleRequestController.class.getName()); 631 conn.getConfiguration().setLong(SimpleRequestController.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE, 632 maxHeapSizePerRequest); 633 634 // sn has two regions 635 long putSizeSN = 0; 636 long putSizeSN2 = 0; 637 List<Put> puts = new ArrayList<>(); 638 while ((putSizeSN + putSizeSN2) <= putsHeapSize) { 639 Put put1 = new Put(DUMMY_BYTES_1); 640 put1.addColumn(DUMMY_BYTES_1, DUMMY_BYTES_1, DUMMY_BYTES_1); 641 Put put2 = new Put(DUMMY_BYTES_2); 642 put2.addColumn(DUMMY_BYTES_2, DUMMY_BYTES_2, DUMMY_BYTES_2); 643 Put put3 = new Put(DUMMY_BYTES_3); 644 put3.addColumn(DUMMY_BYTES_3, DUMMY_BYTES_3, DUMMY_BYTES_3); 645 putSizeSN += (put1.heapSize() + put2.heapSize()); 646 putSizeSN2 += put3.heapSize(); 647 puts.add(put1); 648 puts.add(put2); 649 puts.add(put3); 650 } 651 652 int minCountSnRequest = (int) calculateRequestCount(putSizeSN, maxHeapSizePerRequest); 653 int minCountSn2Request = (int) calculateRequestCount(putSizeSN2, maxHeapSizePerRequest); 654 LOG.info("Total put count:" + puts.size() + ", putSizeSN:"+ putSizeSN 655 + ", putSizeSN2:" + putSizeSN2 656 + ", maxHeapSizePerRequest:" + maxHeapSizePerRequest 657 + ", minCountSnRequest:" + minCountSnRequest 658 + ", minCountSn2Request:" + minCountSn2Request); 659 660 MyAsyncProcess ap = new MyAsyncProcess(conn, CONF); 661 BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); 662 try (BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap)) { 663 mutator.mutate(puts); 664 mutator.flush(); 665 List<AsyncRequestFuture> reqs = ap.allReqs; 666 667 int actualSnReqCount = 0; 668 int actualSn2ReqCount = 0; 669 for (AsyncRequestFuture req : reqs) { 670 if (!(req instanceof AsyncRequestFutureImpl)) { 671 continue; 672 } 673 MyAsyncRequestFutureImpl ars = (MyAsyncRequestFutureImpl) req; 674 if (ars.getRequestHeapSize().containsKey(sn)) { 675 ++actualSnReqCount; 676 } 677 if (ars.getRequestHeapSize().containsKey(sn2)) { 678 ++actualSn2ReqCount; 679 } 680 } 681 // If the server is busy, the actual count may be incremented. 682 assertEquals(true, minCountSnRequest <= actualSnReqCount); 683 assertEquals(true, minCountSn2Request <= actualSn2ReqCount); 684 Map<ServerName, Long> sizePerServers = new HashMap<>(); 685 for (AsyncRequestFuture req : reqs) { 686 if (!(req instanceof AsyncRequestFutureImpl)) { 687 continue; 688 } 689 MyAsyncRequestFutureImpl ars = (MyAsyncRequestFutureImpl) req; 690 Map<ServerName, List<Long>> requestHeapSize = ars.getRequestHeapSize(); 691 for (Map.Entry<ServerName, List<Long>> entry : requestHeapSize.entrySet()) { 692 long sum = 0; 693 for (long size : entry.getValue()) { 694 assertEquals(true, size <= maxHeapSizePerRequest); 695 sum += size; 696 } 697 assertEquals(true, sum <= maxHeapSizePerRequest); 698 long value = sizePerServers.getOrDefault(entry.getKey(), 0L); 699 sizePerServers.put(entry.getKey(), value + sum); 700 } 701 } 702 assertEquals(true, sizePerServers.containsKey(sn)); 703 assertEquals(true, sizePerServers.containsKey(sn2)); 704 assertEquals(false, sizePerServers.containsKey(sn3)); 705 assertEquals(putSizeSN, (long) sizePerServers.get(sn)); 706 assertEquals(putSizeSN2, (long) sizePerServers.get(sn2)); 707 } 708 // restore config. 709 conn.getConfiguration().setLong(SimpleRequestController.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE, 710 defaultHeapSizePerRequest); 711 if (defaultClazz != null) { 712 conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY, 713 defaultClazz); 714 } 715 } 716 717 @Test 718 public void testSubmit() throws Exception { 719 ClusterConnection hc = createHConnection(); 720 MyAsyncProcess ap = new MyAsyncProcess(hc, CONF); 721 722 List<Put> puts = new ArrayList<>(1); 723 puts.add(createPut(1, true)); 724 725 ap.submit(null, DUMMY_TABLE, puts, false, null, false); 726 Assert.assertTrue(puts.isEmpty()); 727 } 728 729 @Test 730 public void testSubmitWithCB() throws Exception { 731 ClusterConnection hc = createHConnection(); 732 final AtomicInteger updateCalled = new AtomicInteger(0); 733 Batch.Callback<Object> cb = new Batch.Callback<Object>() { 734 @Override 735 public void update(byte[] region, byte[] row, Object result) { 736 updateCalled.incrementAndGet(); 737 } 738 }; 739 MyAsyncProcess ap = new MyAsyncProcess(hc, CONF); 740 741 List<Put> puts = new ArrayList<>(1); 742 puts.add(createPut(1, true)); 743 744 final AsyncRequestFuture ars = ap.submit(null, DUMMY_TABLE, puts, false, cb, false); 745 Assert.assertTrue(puts.isEmpty()); 746 ars.waitUntilDone(); 747 Assert.assertEquals(1, updateCalled.get()); 748 } 749 750 @Test 751 public void testSubmitBusyRegion() throws Exception { 752 ClusterConnection conn = createHConnection(); 753 final String defaultClazz = 754 conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY); 755 conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY, 756 SimpleRequestController.class.getName()); 757 MyAsyncProcess ap = new MyAsyncProcess(conn, CONF); 758 SimpleRequestController controller = (SimpleRequestController) ap.requestController; 759 List<Put> puts = new ArrayList<>(1); 760 puts.add(createPut(1, true)); 761 762 for (int i = 0; i != controller.maxConcurrentTasksPerRegion; ++i) { 763 ap.incTaskCounters(Collections.singleton(hri1.getRegionName()), sn); 764 } 765 ap.submit(null, DUMMY_TABLE, puts, false, null, false); 766 Assert.assertEquals(puts.size(), 1); 767 768 ap.decTaskCounters(Collections.singleton(hri1.getRegionName()), sn); 769 ap.submit(null, DUMMY_TABLE, puts, false, null, false); 770 Assert.assertEquals(0, puts.size()); 771 if (defaultClazz != null) { 772 conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY, 773 defaultClazz); 774 } 775 } 776 777 778 @Test 779 public void testSubmitBusyRegionServer() throws Exception { 780 ClusterConnection conn = createHConnection(); 781 MyAsyncProcess ap = new MyAsyncProcess(conn, CONF); 782 final String defaultClazz = 783 conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY); 784 conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY, 785 SimpleRequestController.class.getName()); 786 SimpleRequestController controller = (SimpleRequestController) ap.requestController; 787 controller.taskCounterPerServer.put(sn2, 788 new AtomicInteger(controller.maxConcurrentTasksPerServer)); 789 790 List<Put> puts = new ArrayList<>(4); 791 puts.add(createPut(1, true)); 792 puts.add(createPut(3, true)); // <== this one won't be taken, the rs is busy 793 puts.add(createPut(1, true)); // <== this one will make it, the region is already in 794 puts.add(createPut(2, true)); // <== new region, but the rs is ok 795 796 ap.submit(null, DUMMY_TABLE, puts, false, null, false); 797 Assert.assertEquals(" puts=" + puts, 1, puts.size()); 798 799 controller.taskCounterPerServer.put(sn2, 800 new AtomicInteger(controller.maxConcurrentTasksPerServer - 1)); 801 ap.submit(null, DUMMY_TABLE, puts, false, null, false); 802 Assert.assertTrue(puts.isEmpty()); 803 if (defaultClazz != null) { 804 conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY, 805 defaultClazz); 806 } 807 } 808 809 @Test 810 public void testFail() throws Exception { 811 MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF); 812 813 List<Put> puts = new ArrayList<>(1); 814 Put p = createPut(1, false); 815 puts.add(p); 816 817 AsyncRequestFuture ars = ap.submit(null, DUMMY_TABLE, puts, false, null, true); 818 Assert.assertEquals(0, puts.size()); 819 ars.waitUntilDone(); 820 verifyResult(ars, false); 821 Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get()); 822 823 Assert.assertEquals(1, ars.getErrors().exceptions.size()); 824 Assert.assertTrue("was: " + ars.getErrors().exceptions.get(0), 825 failure.equals(ars.getErrors().exceptions.get(0))); 826 Assert.assertTrue("was: " + ars.getErrors().exceptions.get(0), 827 failure.equals(ars.getErrors().exceptions.get(0))); 828 829 Assert.assertEquals(1, ars.getFailedOperations().size()); 830 Assert.assertTrue("was: " + ars.getFailedOperations().get(0), 831 p.equals(ars.getFailedOperations().get(0))); 832 } 833 834 835 @Test 836 public void testSubmitTrue() throws IOException { 837 ClusterConnection conn = createHConnection(); 838 final MyAsyncProcess ap = new MyAsyncProcess(conn, CONF); 839 final String defaultClazz = 840 conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY); 841 conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY, 842 SimpleRequestController.class.getName()); 843 SimpleRequestController controller = (SimpleRequestController) ap.requestController; 844 controller.tasksInProgress.incrementAndGet(); 845 final AtomicInteger ai = new AtomicInteger(controller.maxConcurrentTasksPerRegion); 846 controller.taskCounterPerRegion.put(hri1.getRegionName(), ai); 847 848 final AtomicBoolean checkPoint = new AtomicBoolean(false); 849 final AtomicBoolean checkPoint2 = new AtomicBoolean(false); 850 851 Thread t = new Thread(){ 852 @Override 853 public void run(){ 854 Threads.sleep(1000); 855 Assert.assertFalse(checkPoint.get()); // TODO: this is timing-dependent 856 ai.decrementAndGet(); 857 controller.tasksInProgress.decrementAndGet(); 858 checkPoint2.set(true); 859 } 860 }; 861 862 List<Put> puts = new ArrayList<>(1); 863 Put p = createPut(1, true); 864 puts.add(p); 865 866 ap.submit(null, DUMMY_TABLE, puts, false, null, false); 867 Assert.assertFalse(puts.isEmpty()); 868 869 t.start(); 870 871 ap.submit(null, DUMMY_TABLE, puts, true, null, false); 872 Assert.assertTrue(puts.isEmpty()); 873 874 checkPoint.set(true); 875 while (!checkPoint2.get()){ 876 Threads.sleep(1); 877 } 878 if (defaultClazz != null) { 879 conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY, 880 defaultClazz); 881 } 882 } 883 884 @Test 885 public void testFailAndSuccess() throws Exception { 886 MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF); 887 888 List<Put> puts = new ArrayList<>(3); 889 puts.add(createPut(1, false)); 890 puts.add(createPut(1, true)); 891 puts.add(createPut(1, true)); 892 893 AsyncRequestFuture ars = ap.submit(null, DUMMY_TABLE, puts, false, null, true); 894 Assert.assertTrue(puts.isEmpty()); 895 ars.waitUntilDone(); 896 verifyResult(ars, false, true, true); 897 Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get()); 898 ap.callsCt.set(0); 899 Assert.assertEquals(1, ars.getErrors().actions.size()); 900 901 puts.add(createPut(1, true)); 902 // Wait for AP to be free. While ars might have the result, ap counters are decreased later. 903 ap.waitForMaximumCurrentTasks(0, null); 904 ars = ap.submit(null, DUMMY_TABLE, puts, false, null, true); 905 Assert.assertEquals(0, puts.size()); 906 ars.waitUntilDone(); 907 Assert.assertEquals(1, ap.callsCt.get()); 908 verifyResult(ars, true); 909 } 910 911 @Test 912 public void testFlush() throws Exception { 913 MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF); 914 915 List<Put> puts = new ArrayList<>(3); 916 puts.add(createPut(1, false)); 917 puts.add(createPut(1, true)); 918 puts.add(createPut(1, true)); 919 920 AsyncRequestFuture ars = ap.submit(null, DUMMY_TABLE, puts, false, null, true); 921 ars.waitUntilDone(); 922 verifyResult(ars, false, true, true); 923 Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get()); 924 925 Assert.assertEquals(1, ars.getFailedOperations().size()); 926 } 927 928 @Test 929 public void testTaskCountWithoutClientBackoffPolicy() throws IOException, InterruptedException { 930 ClusterConnection hc = createHConnection(); 931 MyAsyncProcess ap = new MyAsyncProcess(hc, CONF); 932 testTaskCount(ap); 933 } 934 935 @Test 936 public void testTaskCountWithClientBackoffPolicy() throws IOException, InterruptedException { 937 Configuration copyConf = new Configuration(CONF); 938 copyConf.setBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE, true); 939 MyClientBackoffPolicy bp = new MyClientBackoffPolicy(); 940 ClusterConnection conn = createHConnection(); 941 Mockito.when(conn.getConfiguration()).thenReturn(copyConf); 942 Mockito.when(conn.getStatisticsTracker()).thenReturn(ServerStatisticTracker.create(copyConf)); 943 Mockito.when(conn.getBackoffPolicy()).thenReturn(bp); 944 final String defaultClazz = 945 conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY); 946 conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY, 947 SimpleRequestController.class.getName()); 948 MyAsyncProcess ap = new MyAsyncProcess(conn, copyConf); 949 testTaskCount(ap); 950 if (defaultClazz != null) { 951 conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY, 952 defaultClazz); 953 } 954 } 955 956 private void testTaskCount(MyAsyncProcess ap) 957 throws InterruptedIOException, InterruptedException { 958 SimpleRequestController controller = (SimpleRequestController) ap.requestController; 959 List<Put> puts = new ArrayList<>(); 960 for (int i = 0; i != 3; ++i) { 961 puts.add(createPut(1, true)); 962 puts.add(createPut(2, true)); 963 puts.add(createPut(3, true)); 964 } 965 ap.submit(null, DUMMY_TABLE, puts, true, null, false); 966 ap.waitForMaximumCurrentTasks(0, null); 967 // More time to wait if there are incorrect task count. 968 TimeUnit.SECONDS.sleep(1); 969 assertEquals(0, controller.tasksInProgress.get()); 970 for (AtomicInteger count : controller.taskCounterPerRegion.values()) { 971 assertEquals(0, count.get()); 972 } 973 for (AtomicInteger count : controller.taskCounterPerServer.values()) { 974 assertEquals(0, count.get()); 975 } 976 } 977 978 @Test 979 public void testMaxTask() throws Exception { 980 ClusterConnection conn = createHConnection(); 981 final String defaultClazz = 982 conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY); 983 conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY, 984 SimpleRequestController.class.getName()); 985 final MyAsyncProcess ap = new MyAsyncProcess(conn, CONF); 986 SimpleRequestController controller = (SimpleRequestController) ap.requestController; 987 988 989 for (int i = 0; i < 1000; i++) { 990 ap.incTaskCounters(Collections.singleton(Bytes.toBytes("dummy")), sn); 991 } 992 993 final Thread myThread = Thread.currentThread(); 994 995 Thread t = new Thread() { 996 @Override 997 public void run() { 998 Threads.sleep(2000); 999 myThread.interrupt(); 1000 } 1001 }; 1002 1003 List<Put> puts = new ArrayList<>(1); 1004 puts.add(createPut(1, true)); 1005 1006 t.start(); 1007 1008 try { 1009 ap.submit(null, DUMMY_TABLE, puts, false, null, false); 1010 Assert.fail("We should have been interrupted."); 1011 } catch (InterruptedIOException expected) { 1012 } 1013 1014 final long sleepTime = 2000; 1015 1016 Thread t2 = new Thread() { 1017 @Override 1018 public void run() { 1019 Threads.sleep(sleepTime); 1020 while (controller.tasksInProgress.get() > 0) { 1021 ap.decTaskCounters(Collections.singleton(Bytes.toBytes("dummy")), sn); 1022 } 1023 } 1024 }; 1025 t2.start(); 1026 1027 long start = System.currentTimeMillis(); 1028 ap.submit(null, DUMMY_TABLE, new ArrayList<>(), false, null, false); 1029 long end = System.currentTimeMillis(); 1030 1031 //Adds 100 to secure us against approximate timing. 1032 Assert.assertTrue(start + 100L + sleepTime > end); 1033 if (defaultClazz != null) { 1034 conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY, 1035 defaultClazz); 1036 } 1037 } 1038 1039 private ClusterConnection createHConnection() throws IOException { 1040 ClusterConnection hc = createHConnectionCommon(); 1041 setMockLocation(hc, DUMMY_BYTES_1, new RegionLocations(loc1)); 1042 setMockLocation(hc, DUMMY_BYTES_2, new RegionLocations(loc2)); 1043 setMockLocation(hc, DUMMY_BYTES_3, new RegionLocations(loc3)); 1044 Mockito.when(hc.locateRegions(Mockito.eq(DUMMY_TABLE), Mockito.anyBoolean(), 1045 Mockito.anyBoolean())).thenReturn(Arrays.asList(loc1, loc2, loc3)); 1046 setMockLocation(hc, FAILS, new RegionLocations(loc2)); 1047 return hc; 1048 } 1049 1050 private ClusterConnection createHConnectionWithReplicas() throws IOException { 1051 ClusterConnection hc = createHConnectionCommon(); 1052 setMockLocation(hc, DUMMY_BYTES_1, hrls1); 1053 setMockLocation(hc, DUMMY_BYTES_2, hrls2); 1054 setMockLocation(hc, DUMMY_BYTES_3, hrls3); 1055 List<HRegionLocation> locations = new ArrayList<>(); 1056 for (HRegionLocation loc : hrls1.getRegionLocations()) { 1057 locations.add(loc); 1058 } 1059 for (HRegionLocation loc : hrls2.getRegionLocations()) { 1060 locations.add(loc); 1061 } 1062 for (HRegionLocation loc : hrls3.getRegionLocations()) { 1063 locations.add(loc); 1064 } 1065 Mockito.when(hc.locateRegions(Mockito.eq(DUMMY_TABLE), Mockito.anyBoolean(), 1066 Mockito.anyBoolean())).thenReturn(locations); 1067 return hc; 1068 } 1069 1070 private static void setMockLocation(ClusterConnection hc, byte[] row, 1071 RegionLocations result) throws IOException { 1072 Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE), Mockito.eq(row), 1073 Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyInt())).thenReturn(result); 1074 Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE), Mockito.eq(row), 1075 Mockito.anyBoolean(), Mockito.anyBoolean())).thenReturn(result); 1076 } 1077 1078 private ClusterConnection createHConnectionCommon() { 1079 ClusterConnection hc = Mockito.mock(ClusterConnection.class); 1080 NonceGenerator ng = Mockito.mock(NonceGenerator.class); 1081 Mockito.when(ng.getNonceGroup()).thenReturn(HConstants.NO_NONCE); 1082 Mockito.when(hc.getNonceGenerator()).thenReturn(ng); 1083 Mockito.when(hc.getConfiguration()).thenReturn(CONF); 1084 Mockito.when(hc.getConnectionConfiguration()).thenReturn(CONNECTION_CONFIG); 1085 return hc; 1086 } 1087 1088 @Test 1089 public void testHTablePutSuccess() throws Exception { 1090 ClusterConnection conn = createHConnection(); 1091 MyAsyncProcess ap = new MyAsyncProcess(conn, CONF); 1092 BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); 1093 BufferedMutatorImpl ht = new BufferedMutatorImpl(conn, bufferParam, ap); 1094 1095 Put put = createPut(1, true); 1096 1097 Assert.assertEquals(conn.getConnectionConfiguration().getWriteBufferSize(), 1098 ht.getWriteBufferSize()); 1099 Assert.assertEquals(0, ht.getCurrentWriteBufferSize()); 1100 ht.mutate(put); 1101 ht.flush(); 1102 Assert.assertEquals(0, ht.getCurrentWriteBufferSize()); 1103 } 1104 1105 @Test 1106 public void testSettingWriteBufferPeriodicFlushParameters() throws Exception { 1107 ClusterConnection conn = createHConnection(); 1108 MyAsyncProcess ap = new MyAsyncProcess(conn, CONF); 1109 1110 checkPeriodicFlushParameters(conn, ap, 1111 1234, 1234, 1112 1234, 1234); 1113 checkPeriodicFlushParameters(conn, ap, 1114 0, 0, 1115 0, BufferedMutator.MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS); 1116 checkPeriodicFlushParameters(conn, ap, 1117 -1234, 0, 1118 -1234, BufferedMutator.MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS); 1119 checkPeriodicFlushParameters(conn, ap, 1120 1, 1, 1121 1, BufferedMutator.MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS); 1122 } 1123 1124 private void checkPeriodicFlushParameters(ClusterConnection conn, 1125 MyAsyncProcess ap, 1126 long setTO, long expectTO, 1127 long setTT, long expectTT 1128 ) { 1129 BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); 1130 1131 // The BufferedMutatorParams does nothing with the value 1132 bufferParam.setWriteBufferPeriodicFlushTimeoutMs(setTO); 1133 bufferParam.setWriteBufferPeriodicFlushTimerTickMs(setTT); 1134 Assert.assertEquals(setTO, bufferParam.getWriteBufferPeriodicFlushTimeoutMs()); 1135 Assert.assertEquals(setTT, bufferParam.getWriteBufferPeriodicFlushTimerTickMs()); 1136 1137 // The BufferedMutatorImpl corrects illegal values (indirect via BufferedMutatorParams) 1138 BufferedMutatorImpl ht1 = new BufferedMutatorImpl(conn, bufferParam, ap); 1139 Assert.assertEquals(expectTO, ht1.getWriteBufferPeriodicFlushTimeoutMs()); 1140 Assert.assertEquals(expectTT, ht1.getWriteBufferPeriodicFlushTimerTickMs()); 1141 1142 // The BufferedMutatorImpl corrects illegal values (direct via setter) 1143 BufferedMutatorImpl ht2 = 1144 new BufferedMutatorImpl(conn, createBufferedMutatorParams(ap, DUMMY_TABLE), ap); 1145 ht2.setWriteBufferPeriodicFlush(setTO, setTT); 1146 Assert.assertEquals(expectTO, ht2.getWriteBufferPeriodicFlushTimeoutMs()); 1147 Assert.assertEquals(expectTT, ht2.getWriteBufferPeriodicFlushTimerTickMs()); 1148 1149 } 1150 1151 @Test 1152 public void testWriteBufferPeriodicFlushTimeoutMs() throws Exception { 1153 ClusterConnection conn = createHConnection(); 1154 MyAsyncProcess ap = new MyAsyncProcess(conn, CONF); 1155 BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); 1156 1157 bufferParam.setWriteBufferPeriodicFlushTimeoutMs(1); // Flush ASAP 1158 bufferParam.setWriteBufferPeriodicFlushTimerTickMs(1); // Check every 100ms 1159 bufferParam.writeBufferSize(10000); // Write buffer set to much larger than the single record 1160 1161 BufferedMutatorImpl ht = new BufferedMutatorImpl(conn, bufferParam, ap); 1162 1163 // Verify if BufferedMutator has the right settings. 1164 Assert.assertEquals(10000, ht.getWriteBufferSize()); 1165 Assert.assertEquals(1, ht.getWriteBufferPeriodicFlushTimeoutMs()); 1166 Assert.assertEquals(BufferedMutator.MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS, 1167 ht.getWriteBufferPeriodicFlushTimerTickMs()); 1168 1169 Put put = createPut(1, true); 1170 1171 Assert.assertEquals(0, ht.getExecutedWriteBufferPeriodicFlushes()); 1172 Assert.assertEquals(0, ht.getCurrentWriteBufferSize()); 1173 1174 // ----- Insert, flush immediately, MUST NOT flush automatically 1175 ht.mutate(put); 1176 ht.flush(); 1177 1178 Thread.sleep(1000); 1179 Assert.assertEquals(0, ht.getExecutedWriteBufferPeriodicFlushes()); 1180 Assert.assertEquals(0, ht.getCurrentWriteBufferSize()); 1181 1182 // ----- Insert, NO flush, MUST flush automatically 1183 ht.mutate(put); 1184 Assert.assertEquals(0, ht.getExecutedWriteBufferPeriodicFlushes()); 1185 Assert.assertTrue(ht.getCurrentWriteBufferSize() > 0); 1186 1187 // The timerTick should fire every 100ms, so after twice that we must have 1188 // seen at least 1 tick and we should see an automatic flush 1189 Thread.sleep(200); 1190 Assert.assertEquals(1, ht.getExecutedWriteBufferPeriodicFlushes()); 1191 Assert.assertEquals(0, ht.getCurrentWriteBufferSize()); 1192 1193 // Ensure it does not flush twice 1194 Thread.sleep(200); 1195 Assert.assertEquals(1, ht.getExecutedWriteBufferPeriodicFlushes()); 1196 Assert.assertEquals(0, ht.getCurrentWriteBufferSize()); 1197 1198 // ----- DISABLE AUTO FLUSH, Insert, NO flush, MUST NOT flush automatically 1199 ht.disableWriteBufferPeriodicFlush(); 1200 ht.mutate(put); 1201 Assert.assertEquals(1, ht.getExecutedWriteBufferPeriodicFlushes()); 1202 Assert.assertTrue(ht.getCurrentWriteBufferSize() > 0); 1203 1204 // Wait for at least 1 timerTick, we should see NO flushes. 1205 Thread.sleep(200); 1206 Assert.assertEquals(1, ht.getExecutedWriteBufferPeriodicFlushes()); 1207 Assert.assertTrue(ht.getCurrentWriteBufferSize() > 0); 1208 1209 // Reenable periodic flushing, a flush seems to take about 1 second 1210 // so we wait for 2 seconds and it should have finished the flush. 1211 ht.setWriteBufferPeriodicFlush(1, 100); 1212 Thread.sleep(2000); 1213 Assert.assertEquals(2, ht.getExecutedWriteBufferPeriodicFlushes()); 1214 Assert.assertEquals(0, ht.getCurrentWriteBufferSize()); 1215 } 1216 1217 1218 @Test 1219 public void testBufferedMutatorImplWithSharedPool() throws Exception { 1220 ClusterConnection conn = createHConnection(); 1221 MyAsyncProcess ap = new MyAsyncProcess(conn, CONF); 1222 BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); 1223 BufferedMutator ht = new BufferedMutatorImpl(conn, bufferParam, ap); 1224 1225 ht.close(); 1226 assertFalse(ap.service.isShutdown()); 1227 } 1228 1229 @Test 1230 public void testFailedPutAndNewPut() throws Exception { 1231 ClusterConnection conn = createHConnection(); 1232 MyAsyncProcess ap = new MyAsyncProcess(conn, CONF); 1233 BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE) 1234 .writeBufferSize(0); 1235 BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap); 1236 1237 Put p = createPut(1, false); 1238 try { 1239 mutator.mutate(p); 1240 Assert.fail(); 1241 } catch (RetriesExhaustedWithDetailsException expected) { 1242 assertEquals(1, expected.getNumExceptions()); 1243 assertTrue(expected.getRow(0) == p); 1244 } 1245 // Let's do all the retries. 1246 ap.waitForMaximumCurrentTasks(0, null); 1247 Assert.assertEquals(0, mutator.size()); 1248 1249 // There is no global error so the new put should not fail 1250 mutator.mutate(createPut(1, true)); 1251 Assert.assertEquals("the put should not been inserted.", 0, mutator.size()); 1252 } 1253 1254 @SuppressWarnings("SelfComparison") 1255 @Test 1256 public void testAction() { 1257 Action action_0 = new Action(new Put(Bytes.toBytes("abc")), 10); 1258 Action action_1 = new Action(new Put(Bytes.toBytes("ccc")), 10); 1259 Action action_2 = new Action(new Put(Bytes.toBytes("ccc")), 10); 1260 Action action_3 = new Action(new Delete(Bytes.toBytes("ccc")), 10); 1261 assertFalse(action_0.equals(action_1)); 1262 assertTrue(action_0.equals(action_0)); 1263 assertTrue(action_1.equals(action_2)); 1264 assertTrue(action_2.equals(action_1)); 1265 assertFalse(action_0.equals(new Put(Bytes.toBytes("abc")))); 1266 assertTrue(action_2.equals(action_3)); 1267 assertFalse(action_0.equals(action_3)); 1268 assertEquals(0, action_0.compareTo(action_0)); 1269 assertTrue(action_0.compareTo(action_1) < 0); 1270 assertTrue(action_1.compareTo(action_0) > 0); 1271 assertEquals(0, action_1.compareTo(action_2)); 1272 } 1273 1274 @Test 1275 public void testBatch() throws IOException, InterruptedException { 1276 ClusterConnection conn = new MyConnectionImpl(CONF); 1277 HTable ht = (HTable) conn.getTable(DUMMY_TABLE); 1278 ht.multiAp = new MyAsyncProcess(conn, CONF); 1279 1280 List<Put> puts = new ArrayList<>(7); 1281 puts.add(createPut(1, true)); 1282 puts.add(createPut(1, true)); 1283 puts.add(createPut(1, true)); 1284 puts.add(createPut(1, true)); 1285 puts.add(createPut(1, false)); // <=== the bad apple, position 4 1286 puts.add(createPut(1, true)); 1287 puts.add(createPut(1, false)); // <=== another bad apple, position 6 1288 1289 Object[] res = new Object[puts.size()]; 1290 try { 1291 ht.batch(puts, res); 1292 Assert.fail(); 1293 } catch (RetriesExhaustedException expected) { 1294 } 1295 1296 Assert.assertEquals(success, res[0]); 1297 Assert.assertEquals(success, res[1]); 1298 Assert.assertEquals(success, res[2]); 1299 Assert.assertEquals(success, res[3]); 1300 Assert.assertEquals(failure, res[4]); 1301 Assert.assertEquals(success, res[5]); 1302 Assert.assertEquals(failure, res[6]); 1303 } 1304 @Test 1305 public void testErrorsServers() throws IOException { 1306 Configuration configuration = new Configuration(CONF); 1307 ClusterConnection conn = new MyConnectionImpl(configuration); 1308 MyAsyncProcess ap = new MyAsyncProcess(conn, configuration); 1309 BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); 1310 BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap); 1311 configuration.setBoolean(ConnectionImplementation.RETRIES_BY_SERVER_KEY, true); 1312 1313 Assert.assertNotNull(ap.createServerErrorTracker()); 1314 Assert.assertTrue(ap.serverTrackerTimeout > 200L); 1315 ap.serverTrackerTimeout = 1L; 1316 1317 Put p = createPut(1, false); 1318 mutator.mutate(p); 1319 1320 try { 1321 mutator.flush(); 1322 Assert.fail(); 1323 } catch (RetriesExhaustedWithDetailsException expected) { 1324 assertEquals(1, expected.getNumExceptions()); 1325 assertTrue(expected.getRow(0) == p); 1326 } 1327 // Checking that the ErrorsServers came into play and didn't make us stop immediately 1328 Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get()); 1329 } 1330 1331 @Test 1332 public void testReadAndWriteTimeout() throws IOException { 1333 final long readTimeout = 10 * 1000; 1334 final long writeTimeout = 20 * 1000; 1335 Configuration copyConf = new Configuration(CONF); 1336 copyConf.setLong(HConstants.HBASE_RPC_READ_TIMEOUT_KEY, readTimeout); 1337 copyConf.setLong(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, writeTimeout); 1338 ClusterConnection conn = new MyConnectionImpl(copyConf); 1339 MyAsyncProcess ap = new MyAsyncProcess(conn, copyConf); 1340 try (HTable ht = (HTable) conn.getTable(DUMMY_TABLE)) { 1341 ht.multiAp = ap; 1342 List<Get> gets = new LinkedList<>(); 1343 gets.add(new Get(DUMMY_BYTES_1)); 1344 gets.add(new Get(DUMMY_BYTES_2)); 1345 try { 1346 ht.get(gets); 1347 } catch (ClassCastException e) { 1348 // No result response on this test. 1349 } 1350 assertEquals(readTimeout, ap.previousTimeout); 1351 ap.previousTimeout = -1; 1352 1353 try { 1354 ht.existsAll(gets); 1355 } catch (ClassCastException e) { 1356 // No result response on this test. 1357 } 1358 assertEquals(readTimeout, ap.previousTimeout); 1359 ap.previousTimeout = -1; 1360 1361 List<Delete> deletes = new LinkedList<>(); 1362 deletes.add(new Delete(DUMMY_BYTES_1)); 1363 deletes.add(new Delete(DUMMY_BYTES_2)); 1364 ht.delete(deletes); 1365 assertEquals(writeTimeout, ap.previousTimeout); 1366 } 1367 } 1368 1369 @Test 1370 public void testErrors() throws IOException { 1371 ClusterConnection conn = new MyConnectionImpl(CONF); 1372 AsyncProcessWithFailure ap = new AsyncProcessWithFailure(conn, CONF, new IOException("test")); 1373 BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); 1374 BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap); 1375 1376 Assert.assertNotNull(ap.createServerErrorTracker()); 1377 1378 Put p = createPut(1, true); 1379 mutator.mutate(p); 1380 1381 try { 1382 mutator.flush(); 1383 Assert.fail(); 1384 } catch (RetriesExhaustedWithDetailsException expected) { 1385 assertEquals(1, expected.getNumExceptions()); 1386 assertTrue(expected.getRow(0) == p); 1387 } 1388 // Checking that the ErrorsServers came into play and didn't make us stop immediately 1389 Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get()); 1390 } 1391 1392 1393 @Test 1394 public void testCallQueueTooLarge() throws IOException { 1395 ClusterConnection conn = new MyConnectionImpl(CONF); 1396 AsyncProcessWithFailure ap = 1397 new AsyncProcessWithFailure(conn, CONF, new CallQueueTooBigException()); 1398 BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); 1399 BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap); 1400 Assert.assertNotNull(ap.createServerErrorTracker()); 1401 Put p = createPut(1, true); 1402 mutator.mutate(p); 1403 1404 try { 1405 mutator.flush(); 1406 Assert.fail(); 1407 } catch (RetriesExhaustedWithDetailsException expected) { 1408 assertEquals(1, expected.getNumExceptions()); 1409 assertTrue(expected.getRow(0) == p); 1410 } 1411 // Checking that the ErrorsServers came into play and didn't make us stop immediately 1412 Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get()); 1413 } 1414 /** 1415 * This test simulates multiple regions on 2 servers. We should have 2 multi requests and 1416 * 2 threads: 1 per server, this whatever the number of regions. 1417 */ 1418 @Test 1419 public void testThreadCreation() throws Exception { 1420 final int NB_REGS = 100; 1421 List<HRegionLocation> hrls = new ArrayList<>(NB_REGS); 1422 List<Get> gets = new ArrayList<>(NB_REGS); 1423 for (int i = 0; i < NB_REGS; i++) { 1424 HRegionInfo hri = new HRegionInfo( 1425 DUMMY_TABLE, Bytes.toBytes(i * 10L), Bytes.toBytes(i * 10L + 9L), false, i); 1426 HRegionLocation hrl = new HRegionLocation(hri, i % 2 == 0 ? sn : sn2); 1427 hrls.add(hrl); 1428 1429 Get get = new Get(Bytes.toBytes(i * 10L)); 1430 gets.add(get); 1431 } 1432 1433 MyConnectionImpl2 con = new MyConnectionImpl2(hrls, CONF); 1434 MyAsyncProcess ap = new MyAsyncProcess(con, CONF, con.nbThreads); 1435 HTable ht = (HTable) con.getTable(DUMMY_TABLE, ap.service); 1436 ht.multiAp = ap; 1437 ht.batch(gets, null); 1438 1439 Assert.assertEquals(NB_REGS, ap.nbActions.get()); 1440 Assert.assertEquals("1 multi response per server", 2, ap.nbMultiResponse.get()); 1441 Assert.assertEquals("1 thread per server", 2, con.nbThreads.get()); 1442 1443 int nbReg = 0; 1444 for (int i =0; i<NB_REGS; i++){ 1445 if (con.usedRegions[i]) nbReg++; 1446 } 1447 Assert.assertEquals("nbReg=" + nbReg, NB_REGS, nbReg); 1448 } 1449 1450 @Test 1451 public void testReplicaReplicaSuccess() throws Exception { 1452 // Main call takes too long so replicas succeed, except for one region w/o replicas. 1453 // One region has no replica, so the main call succeeds for it. 1454 MyAsyncProcessWithReplicas ap = createReplicaAp(10, 1000, 0); 1455 List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2, DUMMY_BYTES_3); 1456 AsyncProcessTask task = AsyncProcessTask.newBuilder() 1457 .setPool(ap.service) 1458 .setRpcTimeout(RPC_TIMEOUT) 1459 .setOperationTimeout(OPERATION_TIMEOUT) 1460 .setTableName(DUMMY_TABLE) 1461 .setRowAccess(rows) 1462 .setResults(new Object[3]) 1463 .setSubmittedRows(SubmittedRows.ALL) 1464 .build(); 1465 AsyncRequestFuture ars = ap.submit(task); 1466 verifyReplicaResult(ars, RR.TRUE, RR.TRUE, RR.FALSE); 1467 Assert.assertEquals(2, ap.getReplicaCallCount()); 1468 } 1469 1470 @Test 1471 public void testReplicaPrimarySuccessWoReplicaCalls() throws Exception { 1472 // Main call succeeds before replica calls are kicked off. 1473 MyAsyncProcessWithReplicas ap = createReplicaAp(1000, 10, 0); 1474 List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2, DUMMY_BYTES_3); 1475 AsyncProcessTask task = AsyncProcessTask.newBuilder() 1476 .setPool(ap.service) 1477 .setRpcTimeout(RPC_TIMEOUT) 1478 .setOperationTimeout(OPERATION_TIMEOUT) 1479 .setTableName(DUMMY_TABLE) 1480 .setRowAccess(rows) 1481 .setResults(new Object[3]) 1482 .setSubmittedRows(SubmittedRows.ALL) 1483 .build(); 1484 AsyncRequestFuture ars = ap.submit(task); 1485 verifyReplicaResult(ars, RR.FALSE, RR.FALSE, RR.FALSE); 1486 Assert.assertEquals(0, ap.getReplicaCallCount()); 1487 } 1488 1489 @Test 1490 public void testReplicaParallelCallsSucceed() throws Exception { 1491 // Either main or replica can succeed. 1492 MyAsyncProcessWithReplicas ap = createReplicaAp(0, 0, 0); 1493 List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2); 1494 AsyncProcessTask task = AsyncProcessTask.newBuilder() 1495 .setPool(ap.service) 1496 .setRpcTimeout(RPC_TIMEOUT) 1497 .setOperationTimeout(OPERATION_TIMEOUT) 1498 .setTableName(DUMMY_TABLE) 1499 .setRowAccess(rows) 1500 .setResults(new Object[2]) 1501 .setSubmittedRows(SubmittedRows.ALL) 1502 .build(); 1503 AsyncRequestFuture ars = ap.submit(task); 1504 verifyReplicaResult(ars, RR.DONT_CARE, RR.DONT_CARE); 1505 long replicaCalls = ap.getReplicaCallCount(); 1506 Assert.assertTrue(replicaCalls >= 0); 1507 Assert.assertTrue(replicaCalls <= 2); 1508 } 1509 1510 @Test 1511 public void testReplicaPartialReplicaCall() throws Exception { 1512 // One server is slow, so the result for its region comes from replica, whereas 1513 // the result for other region comes from primary before replica calls happen. 1514 // There should be no replica call for that region at all. 1515 MyAsyncProcessWithReplicas ap = createReplicaAp(1000, 0, 0); 1516 ap.setPrimaryCallDelay(sn2, 2000); 1517 List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2); 1518 AsyncProcessTask task = AsyncProcessTask.newBuilder() 1519 .setPool(ap.service) 1520 .setRpcTimeout(RPC_TIMEOUT) 1521 .setOperationTimeout(OPERATION_TIMEOUT) 1522 .setTableName(DUMMY_TABLE) 1523 .setRowAccess(rows) 1524 .setResults(new Object[2]) 1525 .setSubmittedRows(SubmittedRows.ALL) 1526 .build(); 1527 AsyncRequestFuture ars = ap.submit(task); 1528 verifyReplicaResult(ars, RR.FALSE, RR.TRUE); 1529 Assert.assertEquals(1, ap.getReplicaCallCount()); 1530 } 1531 1532 @Test 1533 public void testReplicaMainFailsBeforeReplicaCalls() throws Exception { 1534 // Main calls fail before replica calls can start - this is currently not handled. 1535 // It would probably never happen if we can get location (due to retries), 1536 // and it would require additional synchronization. 1537 MyAsyncProcessWithReplicas ap = createReplicaAp(1000, 0, 0, 0); 1538 ap.addFailures(hri1, hri2); 1539 List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2); 1540 AsyncProcessTask task = AsyncProcessTask.newBuilder() 1541 .setPool(ap.service) 1542 .setRpcTimeout(RPC_TIMEOUT) 1543 .setOperationTimeout(OPERATION_TIMEOUT) 1544 .setTableName(DUMMY_TABLE) 1545 .setRowAccess(rows) 1546 .setResults(new Object[2]) 1547 .setSubmittedRows(SubmittedRows.ALL) 1548 .build(); 1549 AsyncRequestFuture ars = ap.submit(task); 1550 verifyReplicaResult(ars, RR.FAILED, RR.FAILED); 1551 Assert.assertEquals(0, ap.getReplicaCallCount()); 1552 } 1553 1554 @Test 1555 public void testReplicaReplicaSuccessWithParallelFailures() throws Exception { 1556 // Main calls fails after replica calls start. For two-replica region, one replica call 1557 // also fails. Regardless, we get replica results for both regions. 1558 MyAsyncProcessWithReplicas ap = createReplicaAp(0, 1000, 1000, 0); 1559 ap.addFailures(hri1, hri1r2, hri2); 1560 List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2); 1561 AsyncProcessTask task = AsyncProcessTask.newBuilder() 1562 .setPool(ap.service) 1563 .setRpcTimeout(RPC_TIMEOUT) 1564 .setOperationTimeout(OPERATION_TIMEOUT) 1565 .setTableName(DUMMY_TABLE) 1566 .setRowAccess(rows) 1567 .setResults(new Object[2]) 1568 .setSubmittedRows(SubmittedRows.ALL) 1569 .build(); 1570 AsyncRequestFuture ars = ap.submit(task); 1571 verifyReplicaResult(ars, RR.TRUE, RR.TRUE); 1572 Assert.assertEquals(2, ap.getReplicaCallCount()); 1573 } 1574 1575 @Test 1576 public void testReplicaAllCallsFailForOneRegion() throws Exception { 1577 // For one of the region, all 3, main and replica, calls fail. For the other, replica 1578 // call fails but its exception should not be visible as it did succeed. 1579 MyAsyncProcessWithReplicas ap = createReplicaAp(500, 1000, 0, 0); 1580 ap.addFailures(hri1, hri1r1, hri1r2, hri2r1); 1581 List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2); 1582 AsyncProcessTask task = AsyncProcessTask.newBuilder() 1583 .setPool(ap.service) 1584 .setRpcTimeout(RPC_TIMEOUT) 1585 .setOperationTimeout(OPERATION_TIMEOUT) 1586 .setTableName(DUMMY_TABLE) 1587 .setRowAccess(rows) 1588 .setResults(new Object[2]) 1589 .setSubmittedRows(SubmittedRows.ALL) 1590 .build(); 1591 AsyncRequestFuture ars = ap.submit(task); 1592 verifyReplicaResult(ars, RR.FAILED, RR.FALSE); 1593 // We should get 3 exceptions, for main + 2 replicas for DUMMY_BYTES_1 1594 Assert.assertEquals(3, ars.getErrors().getNumExceptions()); 1595 for (int i = 0; i < ars.getErrors().getNumExceptions(); ++i) { 1596 Assert.assertArrayEquals(DUMMY_BYTES_1, ars.getErrors().getRow(i).getRow()); 1597 } 1598 } 1599 1600 private MyAsyncProcessWithReplicas createReplicaAp( 1601 int replicaAfterMs, int primaryMs, int replicaMs) throws Exception { 1602 return createReplicaAp(replicaAfterMs, primaryMs, replicaMs, -1); 1603 } 1604 1605 private MyAsyncProcessWithReplicas createReplicaAp( 1606 int replicaAfterMs, int primaryMs, int replicaMs, int retries) throws Exception { 1607 // TODO: this is kind of timing dependent... perhaps it should detect from createCaller 1608 // that the replica call has happened and that way control the ordering. 1609 Configuration conf = new Configuration(); 1610 ClusterConnection conn = createHConnectionWithReplicas(); 1611 conf.setInt(AsyncProcess.PRIMARY_CALL_TIMEOUT_KEY, replicaAfterMs * 1000); 1612 if (retries >= 0) { 1613 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, retries); 1614 } 1615 MyAsyncProcessWithReplicas ap = new MyAsyncProcessWithReplicas(conn, conf); 1616 ap.setCallDelays(primaryMs, replicaMs); 1617 return ap; 1618 } 1619 1620 private BufferedMutatorParams createBufferedMutatorParams(MyAsyncProcess ap, 1621 TableName name) { 1622 return new BufferedMutatorParams(name) 1623 .pool(ap.service) 1624 .rpcTimeout(RPC_TIMEOUT) 1625 .opertationTimeout(OPERATION_TIMEOUT); 1626 } 1627 1628 private static List<Get> makeTimelineGets(byte[]... rows) { 1629 List<Get> result = new ArrayList<>(rows.length); 1630 for (byte[] row : rows) { 1631 Get get = new Get(row); 1632 get.setConsistency(Consistency.TIMELINE); 1633 result.add(get); 1634 } 1635 return result; 1636 } 1637 1638 private void verifyResult(AsyncRequestFuture ars, boolean... expected) throws Exception { 1639 Object[] actual = ars.getResults(); 1640 Assert.assertEquals(expected.length, actual.length); 1641 for (int i = 0; i < expected.length; ++i) { 1642 Assert.assertEquals(expected[i], !(actual[i] instanceof Throwable)); 1643 } 1644 } 1645 1646 /** After reading TheDailyWtf, I always wanted to create a MyBoolean enum like this! */ 1647 private enum RR { 1648 TRUE, 1649 FALSE, 1650 DONT_CARE, 1651 FAILED 1652 } 1653 1654 private void verifyReplicaResult(AsyncRequestFuture ars, RR... expecteds) throws Exception { 1655 Object[] actuals = ars.getResults(); 1656 Assert.assertEquals(expecteds.length, actuals.length); 1657 for (int i = 0; i < expecteds.length; ++i) { 1658 Object actual = actuals[i]; 1659 RR expected = expecteds[i]; 1660 Assert.assertEquals(actual.toString(), expected == RR.FAILED, actual instanceof Throwable); 1661 if (expected != RR.FAILED && expected != RR.DONT_CARE) { 1662 Assert.assertEquals(expected == RR.TRUE, ((Result)actual).isStale()); 1663 } 1664 } 1665 } 1666 1667 /** 1668 * @param regCnt the region: 1 to 3. 1669 * @param success if true, the put will succeed. 1670 * @return a put 1671 */ 1672 private Put createPut(int regCnt, boolean success) { 1673 Put p; 1674 if (!success) { 1675 p = new Put(FAILS); 1676 } else switch (regCnt){ 1677 case 1 : 1678 p = new Put(DUMMY_BYTES_1); 1679 break; 1680 case 2: 1681 p = new Put(DUMMY_BYTES_2); 1682 break; 1683 case 3: 1684 p = new Put(DUMMY_BYTES_3); 1685 break; 1686 default: 1687 throw new IllegalArgumentException("unknown " + regCnt); 1688 } 1689 1690 p.addColumn(DUMMY_BYTES_1, DUMMY_BYTES_1, DUMMY_BYTES_1); 1691 1692 return p; 1693 } 1694 1695 static class MyThreadPoolExecutor extends ThreadPoolExecutor { 1696 public MyThreadPoolExecutor(int coreThreads, int maxThreads, long keepAliveTime, 1697 TimeUnit timeunit, BlockingQueue<Runnable> blockingqueue) { 1698 super(coreThreads, maxThreads, keepAliveTime, timeunit, blockingqueue); 1699 } 1700 1701 @Override 1702 public Future submit(Runnable runnable) { 1703 throw new OutOfMemoryError("OutOfMemory error thrown by means"); 1704 } 1705 } 1706 1707 static class AsyncProcessForThrowableCheck extends AsyncProcess { 1708 public AsyncProcessForThrowableCheck(ClusterConnection hc, Configuration conf) { 1709 super(hc, conf, new RpcRetryingCallerFactory(conf), new RpcControllerFactory( 1710 conf)); 1711 } 1712 } 1713 1714 @Test 1715 public void testUncheckedException() throws Exception { 1716 // Test the case pool.submit throws unchecked exception 1717 ClusterConnection hc = createHConnection(); 1718 MyThreadPoolExecutor myPool = 1719 new MyThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS, 1720 new LinkedBlockingQueue<>(200)); 1721 AsyncProcess ap = new AsyncProcessForThrowableCheck(hc, CONF); 1722 1723 List<Put> puts = new ArrayList<>(1); 1724 puts.add(createPut(1, true)); 1725 AsyncProcessTask task = AsyncProcessTask.newBuilder() 1726 .setPool(myPool) 1727 .setRpcTimeout(RPC_TIMEOUT) 1728 .setOperationTimeout(OPERATION_TIMEOUT) 1729 .setTableName(DUMMY_TABLE) 1730 .setRowAccess(puts) 1731 .setSubmittedRows(SubmittedRows.NORMAL) 1732 .build(); 1733 ap.submit(task); 1734 Assert.assertTrue(puts.isEmpty()); 1735 } 1736 1737 /** 1738 * Test and make sure we could use a special pause setting when retry with 1739 * CallQueueTooBigException, see HBASE-17114 1740 * @throws Exception if unexpected error happened during test 1741 */ 1742 @Test 1743 public void testRetryPauseWithCallQueueTooBigException() throws Exception { 1744 Configuration myConf = new Configuration(CONF); 1745 final long specialPause = 500L; 1746 final int retries = 1; 1747 myConf.setLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, specialPause); 1748 myConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, retries); 1749 ClusterConnection conn = new MyConnectionImpl(myConf); 1750 AsyncProcessWithFailure ap = 1751 new AsyncProcessWithFailure(conn, myConf, new CallQueueTooBigException()); 1752 BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); 1753 BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap); 1754 1755 Assert.assertNotNull(mutator.getAsyncProcess().createServerErrorTracker()); 1756 1757 Put p = createPut(1, true); 1758 mutator.mutate(p); 1759 1760 long startTime = System.currentTimeMillis(); 1761 try { 1762 mutator.flush(); 1763 Assert.fail(); 1764 } catch (RetriesExhaustedWithDetailsException expected) { 1765 assertEquals(1, expected.getNumExceptions()); 1766 assertTrue(expected.getRow(0) == p); 1767 } 1768 long actualSleep = System.currentTimeMillis() - startTime; 1769 long expectedSleep = 0L; 1770 for (int i = 0; i < retries; i++) { 1771 expectedSleep += ConnectionUtils.getPauseTime(specialPause, i); 1772 // Prevent jitter in ConcurrentMapUtils#getPauseTime to affect result 1773 actualSleep += (long) (specialPause * 0.01f); 1774 } 1775 LOG.debug("Expected to sleep " + expectedSleep + "ms, actually slept " + actualSleep + "ms"); 1776 Assert.assertTrue("Expected to sleep " + expectedSleep + " but actually " + actualSleep + "ms", 1777 actualSleep >= expectedSleep); 1778 1779 // check and confirm normal IOE will use the normal pause 1780 final long normalPause = 1781 myConf.getLong(HConstants.HBASE_CLIENT_PAUSE, HConstants.DEFAULT_HBASE_CLIENT_PAUSE); 1782 ap = new AsyncProcessWithFailure(conn, myConf, new IOException()); 1783 bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); 1784 mutator = new BufferedMutatorImpl(conn, bufferParam, ap); 1785 Assert.assertNotNull(mutator.getAsyncProcess().createServerErrorTracker()); 1786 mutator.mutate(p); 1787 startTime = System.currentTimeMillis(); 1788 try { 1789 mutator.flush(); 1790 Assert.fail(); 1791 } catch (RetriesExhaustedWithDetailsException expected) { 1792 assertEquals(1, expected.getNumExceptions()); 1793 assertTrue(expected.getRow(0) == p); 1794 } 1795 actualSleep = System.currentTimeMillis() - startTime; 1796 expectedSleep = 0L; 1797 for (int i = 0; i < retries; i++) { 1798 expectedSleep += ConnectionUtils.getPauseTime(normalPause, i); 1799 } 1800 // plus an additional pause to balance the program execution time 1801 expectedSleep += normalPause; 1802 LOG.debug("Expected to sleep " + expectedSleep + "ms, actually slept " + actualSleep + "ms"); 1803 Assert.assertTrue("Slept for too long: " + actualSleep + "ms", actualSleep <= expectedSleep); 1804 } 1805 1806 @Test 1807 public void testRetryWithExceptionClearsMetaCache() throws Exception { 1808 ClusterConnection conn = createHConnection(); 1809 Configuration myConf = conn.getConfiguration(); 1810 myConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0); 1811 1812 AsyncProcessWithFailure ap = 1813 new AsyncProcessWithFailure(conn, myConf, new RegionOpeningException("test")); 1814 BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); 1815 BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap); 1816 1817 Assert.assertNotNull(mutator.getAsyncProcess().createServerErrorTracker()); 1818 1819 Assert.assertEquals( 1820 conn.locateRegion(DUMMY_TABLE, DUMMY_BYTES_1, true, true).toString(), 1821 new RegionLocations(loc1).toString()); 1822 1823 Mockito.verify(conn, Mockito.times(0)).clearCaches(Mockito.any()); 1824 1825 Put p = createPut(1, true); 1826 mutator.mutate(p); 1827 1828 try { 1829 mutator.flush(); 1830 Assert.fail(); 1831 } catch (RetriesExhaustedWithDetailsException expected) { 1832 assertEquals(1, expected.getNumExceptions()); 1833 assertTrue(expected.getRow(0) == p); 1834 } 1835 1836 Mockito.verify(conn, Mockito.times(1)).clearCaches(loc1.getServerName()); 1837 } 1838 1839 @Test 1840 public void testQueueRowAccess() throws Exception { 1841 ClusterConnection conn = createHConnection(); 1842 BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, null, null, 1843 new BufferedMutatorParams(DUMMY_TABLE).writeBufferSize(100000)); 1844 Put p0 = new Put(DUMMY_BYTES_1).addColumn(DUMMY_BYTES_1, DUMMY_BYTES_1, DUMMY_BYTES_1); 1845 Put p1 = new Put(DUMMY_BYTES_2).addColumn(DUMMY_BYTES_2, DUMMY_BYTES_2, DUMMY_BYTES_2); 1846 mutator.mutate(p0); 1847 BufferedMutatorImpl.QueueRowAccess ra0 = mutator.createQueueRowAccess(); 1848 // QueueRowAccess should take all undealt mutations 1849 assertEquals(0, mutator.size()); 1850 mutator.mutate(p1); 1851 assertEquals(1, mutator.size()); 1852 BufferedMutatorImpl.QueueRowAccess ra1 = mutator.createQueueRowAccess(); 1853 // QueueRowAccess should take all undealt mutations 1854 assertEquals(0, mutator.size()); 1855 assertEquals(1, ra0.size()); 1856 assertEquals(1, ra1.size()); 1857 Iterator<Row> iter0 = ra0.iterator(); 1858 Iterator<Row> iter1 = ra1.iterator(); 1859 assertTrue(iter0.hasNext()); 1860 assertTrue(iter1.hasNext()); 1861 // the next() will poll the mutation from inner buffer and update the buffer count 1862 assertTrue(iter0.next() == p0); 1863 assertEquals(1, mutator.getUnflushedSize()); 1864 assertEquals(p1.heapSize(), mutator.getCurrentWriteBufferSize()); 1865 assertTrue(iter1.next() == p1); 1866 assertEquals(0, mutator.getUnflushedSize()); 1867 assertEquals(0, mutator.getCurrentWriteBufferSize()); 1868 assertFalse(iter0.hasNext()); 1869 assertFalse(iter1.hasNext()); 1870 // ra0 doest handle the mutation so the mutation won't be pushed back to buffer 1871 iter0.remove(); 1872 ra0.close(); 1873 assertEquals(0, mutator.size()); 1874 assertEquals(0, mutator.getUnflushedSize()); 1875 assertEquals(0, mutator.getCurrentWriteBufferSize()); 1876 // ra1 doesn't handle the mutation so the mutation will be pushed back to buffer 1877 ra1.close(); 1878 assertEquals(1, mutator.size()); 1879 assertEquals(1, mutator.getUnflushedSize()); 1880 assertEquals(p1.heapSize(), mutator.getCurrentWriteBufferSize()); 1881 } 1882}