001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023 024import java.io.IOException; 025import java.io.InterruptedIOException; 026import java.util.ArrayList; 027import java.util.Arrays; 028import java.util.Collections; 029import java.util.HashMap; 030import java.util.Iterator; 031import java.util.LinkedList; 032import java.util.List; 033import java.util.Map; 034import java.util.Random; 035import java.util.Set; 036import java.util.TreeSet; 037import java.util.concurrent.BlockingQueue; 038import java.util.concurrent.CompletableFuture; 039import java.util.concurrent.ExecutorService; 040import java.util.concurrent.Executors; 041import java.util.concurrent.Future; 042import java.util.concurrent.LinkedBlockingQueue; 043import java.util.concurrent.SynchronousQueue; 044import java.util.concurrent.ThreadFactory; 045import java.util.concurrent.ThreadPoolExecutor; 046import java.util.concurrent.TimeUnit; 047import java.util.concurrent.atomic.AtomicBoolean; 048import java.util.concurrent.atomic.AtomicInteger; 049import java.util.concurrent.atomic.AtomicLong; 050import org.apache.hadoop.conf.Configuration; 051import org.apache.hadoop.hbase.CallQueueTooBigException; 052import org.apache.hadoop.hbase.Cell; 053import org.apache.hadoop.hbase.HBaseClassTestRule; 054import org.apache.hadoop.hbase.HConstants; 055import org.apache.hadoop.hbase.HRegionInfo; 056import org.apache.hadoop.hbase.HRegionLocation; 057import org.apache.hadoop.hbase.RegionLocations; 058import org.apache.hadoop.hbase.ServerName; 059import org.apache.hadoop.hbase.TableName; 060import org.apache.hadoop.hbase.client.AsyncProcessTask.ListRowAccess; 061import org.apache.hadoop.hbase.client.AsyncProcessTask.SubmittedRows; 062import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy; 063import org.apache.hadoop.hbase.client.backoff.ServerStatistics; 064import org.apache.hadoop.hbase.client.coprocessor.Batch; 065import org.apache.hadoop.hbase.exceptions.RegionOpeningException; 066import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 067import org.apache.hadoop.hbase.testclassification.ClientTests; 068import org.apache.hadoop.hbase.testclassification.MediumTests; 069import org.apache.hadoop.hbase.util.Bytes; 070import org.apache.hadoop.hbase.util.Threads; 071import org.junit.Assert; 072import org.junit.Before; 073import org.junit.ClassRule; 074import org.junit.Test; 075import org.junit.experimental.categories.Category; 076import org.mockito.Mockito; 077import org.slf4j.Logger; 078import org.slf4j.LoggerFactory; 079 080@Category({ClientTests.class, MediumTests.class}) 081public class TestAsyncProcess { 082 083 @ClassRule 084 public static final HBaseClassTestRule CLASS_RULE = 085 HBaseClassTestRule.forClass(TestAsyncProcess.class); 086 087 private static final Logger LOG = LoggerFactory.getLogger(TestAsyncProcess.class); 088 private static final TableName DUMMY_TABLE = 089 TableName.valueOf("DUMMY_TABLE"); 090 private static final byte[] DUMMY_BYTES_1 = Bytes.toBytes("DUMMY_BYTES_1"); 091 private static final byte[] DUMMY_BYTES_2 = Bytes.toBytes("DUMMY_BYTES_2"); 092 private static final byte[] DUMMY_BYTES_3 = Bytes.toBytes("DUMMY_BYTES_3"); 093 private static final byte[] FAILS = Bytes.toBytes("FAILS"); 094 private Configuration CONF; 095 private ConnectionConfiguration CONNECTION_CONFIG; 096 private static final ServerName sn = ServerName.valueOf("s1,1,1"); 097 private static final ServerName sn2 = ServerName.valueOf("s2,2,2"); 098 private static final ServerName sn3 = ServerName.valueOf("s3,3,3"); 099 private static final HRegionInfo hri1 = 100 new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_1, DUMMY_BYTES_2, false, 1); 101 private static final HRegionInfo hri2 = 102 new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_2, HConstants.EMPTY_END_ROW, false, 2); 103 private static final HRegionInfo hri3 = 104 new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_3, HConstants.EMPTY_END_ROW, false, 3); 105 private static final HRegionLocation loc1 = new HRegionLocation(hri1, sn); 106 private static final HRegionLocation loc2 = new HRegionLocation(hri2, sn); 107 private static final HRegionLocation loc3 = new HRegionLocation(hri3, sn2); 108 109 // Replica stuff 110 private static final RegionInfo hri1r1 = RegionReplicaUtil.getRegionInfoForReplica(hri1, 1); 111 private static final RegionInfo hri1r2 = RegionReplicaUtil.getRegionInfoForReplica(hri1, 2); 112 private static final RegionInfo hri2r1 = RegionReplicaUtil.getRegionInfoForReplica(hri2, 1); 113 private static final RegionLocations hrls1 = new RegionLocations(new HRegionLocation(hri1, sn), 114 new HRegionLocation(hri1r1, sn2), new HRegionLocation(hri1r2, sn3)); 115 private static final RegionLocations hrls2 = new RegionLocations(new HRegionLocation(hri2, sn2), 116 new HRegionLocation(hri2r1, sn3)); 117 private static final RegionLocations hrls3 = 118 new RegionLocations(new HRegionLocation(hri3, sn3), null); 119 120 private static final String success = "success"; 121 private static Exception failure = new Exception("failure"); 122 123 private static final int NB_RETRIES = 3; 124 125 private int RPC_TIMEOUT; 126 private int OPERATION_TIMEOUT; 127 128 @Before 129 public void beforeEach() { 130 this.CONF = new Configuration(); 131 CONF.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, NB_RETRIES); 132 this.CONNECTION_CONFIG = new ConnectionConfiguration(CONF); 133 this.RPC_TIMEOUT = CONF.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 134 HConstants.DEFAULT_HBASE_RPC_TIMEOUT); 135 this.OPERATION_TIMEOUT = CONF.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 136 HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); 137 } 138 139 static class CountingThreadFactory implements ThreadFactory { 140 final AtomicInteger nbThreads; 141 ThreadFactory realFactory = Threads.newDaemonThreadFactory("test-TestAsyncProcess"); 142 @Override 143 public Thread newThread(Runnable r) { 144 nbThreads.incrementAndGet(); 145 return realFactory.newThread(r); 146 } 147 148 CountingThreadFactory(AtomicInteger nbThreads){ 149 this.nbThreads = nbThreads; 150 } 151 } 152 153 static class MyAsyncProcess extends AsyncProcess { 154 final AtomicInteger nbMultiResponse = new AtomicInteger(); 155 final AtomicInteger nbActions = new AtomicInteger(); 156 public List<AsyncRequestFuture> allReqs = new ArrayList<>(); 157 public AtomicInteger callsCt = new AtomicInteger(); 158 private Configuration conf; 159 160 private long previousTimeout = -1; 161 final ExecutorService service; 162 @Override 163 protected <Res> AsyncRequestFutureImpl<Res> createAsyncRequestFuture( 164 AsyncProcessTask task, List<Action> actions, long nonceGroup) { 165 // Test HTable has tableName of null, so pass DUMMY_TABLE 166 AsyncProcessTask wrap = new AsyncProcessTask(task){ 167 @Override 168 public TableName getTableName() { 169 return DUMMY_TABLE; 170 } 171 }; 172 AsyncRequestFutureImpl<Res> r = new MyAsyncRequestFutureImpl<>( 173 wrap, actions, nonceGroup, this); 174 allReqs.add(r); 175 return r; 176 } 177 178 public MyAsyncProcess(ClusterConnection hc, Configuration conf) { 179 super(hc, conf, 180 new RpcRetryingCallerFactory(conf), new RpcControllerFactory(conf)); 181 service = Executors.newFixedThreadPool(5); 182 this.conf = conf; 183 } 184 185 public MyAsyncProcess(ClusterConnection hc, Configuration conf, AtomicInteger nbThreads) { 186 super(hc, conf, new RpcRetryingCallerFactory(conf), new RpcControllerFactory(conf)); 187 service = new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS, 188 new SynchronousQueue<>(), new CountingThreadFactory(nbThreads)); 189 } 190 191 public <CResult> AsyncRequestFuture submit(ExecutorService pool, TableName tableName, 192 List<? extends Row> rows, boolean atLeastOne, Batch.Callback<CResult> callback, 193 boolean needResults) throws InterruptedIOException { 194 AsyncProcessTask task = AsyncProcessTask.newBuilder(callback) 195 .setPool(pool == null ? service : pool) 196 .setTableName(tableName) 197 .setRowAccess(rows) 198 .setSubmittedRows(atLeastOne ? SubmittedRows.AT_LEAST_ONE : SubmittedRows.NORMAL) 199 .setNeedResults(needResults) 200 .setRpcTimeout(conf.getInt(HConstants.HBASE_RPC_READ_TIMEOUT_KEY, 201 HConstants.DEFAULT_HBASE_RPC_TIMEOUT)) 202 .setOperationTimeout(conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 203 HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT)) 204 .build(); 205 return submit(task); 206 } 207 208 public <CResult> AsyncRequestFuture submit(TableName tableName, 209 final List<? extends Row> rows, boolean atLeastOne, Batch.Callback<CResult> callback, 210 boolean needResults) throws InterruptedIOException { 211 return submit(null, tableName, rows, atLeastOne, callback, needResults); 212 } 213 214 @Override 215 public <Res> AsyncRequestFuture submit(AsyncProcessTask<Res> task) 216 throws InterruptedIOException { 217 previousTimeout = task.getRpcTimeout(); 218 // We use results in tests to check things, so override to always save them. 219 AsyncProcessTask<Res> wrap = new AsyncProcessTask<Res>(task) { 220 @Override 221 public boolean getNeedResults() { 222 return true; 223 } 224 }; 225 return super.submit(wrap); 226 } 227 228 @Override 229 protected RpcRetryingCaller<AbstractResponse> createCaller( 230 CancellableRegionServerCallable callable, int rpcTimeout) { 231 callsCt.incrementAndGet(); 232 MultiServerCallable callable1 = (MultiServerCallable) callable; 233 final MultiResponse mr = createMultiResponse( 234 callable1.getMulti(), nbMultiResponse, nbActions, 235 new ResponseGenerator() { 236 @Override 237 public void addResponse(MultiResponse mr, byte[] regionName, Action a) { 238 if (Arrays.equals(FAILS, a.getAction().getRow())) { 239 mr.add(regionName, a.getOriginalIndex(), failure); 240 } else { 241 mr.add(regionName, a.getOriginalIndex(), success); 242 } 243 } 244 }); 245 246 return new RpcRetryingCallerImpl<AbstractResponse>(100, 500, 10, 9) { 247 @Override 248 public AbstractResponse callWithoutRetries(RetryingCallable<AbstractResponse> callable, 249 int callTimeout) 250 throws IOException, RuntimeException { 251 try { 252 // sleep one second in order for threadpool to start another thread instead of reusing 253 // existing one. 254 Thread.sleep(1000); 255 } catch (InterruptedException e) { 256 // ignore error 257 } 258 return mr; 259 } 260 }; 261 } 262 263 264 } 265 266 static class MyAsyncRequestFutureImpl<Res> extends AsyncRequestFutureImpl<Res> { 267 private final Map<ServerName, List<Long>> heapSizesByServer = new HashMap<>(); 268 public MyAsyncRequestFutureImpl(AsyncProcessTask task, List<Action> actions, 269 long nonceGroup, AsyncProcess asyncProcess) { 270 super(task, actions, nonceGroup, asyncProcess); 271 } 272 273 @Override 274 protected void updateStats(ServerName server, Map<byte[], MultiResponse.RegionResult> results) { 275 // Do nothing for avoiding the NPE if we test the ClientBackofPolicy. 276 } 277 278 Map<ServerName, List<Long>> getRequestHeapSize() { 279 return heapSizesByServer; 280 } 281 282 @Override 283 SingleServerRequestRunnable createSingleServerRequest( 284 MultiAction multiAction, int numAttempt, ServerName server, 285 Set<CancellableRegionServerCallable> callsInProgress) { 286 SingleServerRequestRunnable rq = new SingleServerRequestRunnable( 287 multiAction, numAttempt, server, callsInProgress); 288 List<Long> heapCount = heapSizesByServer.get(server); 289 if (heapCount == null) { 290 heapCount = new ArrayList<>(); 291 heapSizesByServer.put(server, heapCount); 292 } 293 heapCount.add(heapSizeOf(multiAction)); 294 return rq; 295 } 296 297 private long heapSizeOf(MultiAction multiAction) { 298 return multiAction.actions.values().stream() 299 .flatMap(v -> v.stream()) 300 .map(action -> action.getAction()) 301 .filter(row -> row instanceof Mutation) 302 .mapToLong(row -> ((Mutation) row).heapSize()) 303 .sum(); 304 } 305 } 306 307 static class CallerWithFailure extends RpcRetryingCallerImpl<AbstractResponse>{ 308 309 private final IOException e; 310 311 public CallerWithFailure(IOException e) { 312 super(100, 500, 100, 9); 313 this.e = e; 314 } 315 316 @Override 317 public AbstractResponse callWithoutRetries(RetryingCallable<AbstractResponse> callable, 318 int callTimeout) 319 throws IOException, RuntimeException { 320 throw e; 321 } 322 } 323 324 325 static class AsyncProcessWithFailure extends MyAsyncProcess { 326 327 private final IOException ioe; 328 329 public AsyncProcessWithFailure(ClusterConnection hc, Configuration conf, IOException ioe) { 330 super(hc, conf); 331 this.ioe = ioe; 332 serverTrackerTimeout = 1L; 333 } 334 335 @Override 336 protected RpcRetryingCaller<AbstractResponse> createCaller( 337 CancellableRegionServerCallable callable, int rpcTimeout) { 338 callsCt.incrementAndGet(); 339 return new CallerWithFailure(ioe); 340 } 341 } 342 343 /** 344 * Make the backoff time always different on each call. 345 */ 346 static class MyClientBackoffPolicy implements ClientBackoffPolicy { 347 private final Map<ServerName, AtomicInteger> count = new HashMap<>(); 348 @Override 349 public long getBackoffTime(ServerName serverName, byte[] region, ServerStatistics stats) { 350 AtomicInteger inc = count.get(serverName); 351 if (inc == null) { 352 inc = new AtomicInteger(0); 353 count.put(serverName, inc); 354 } 355 return inc.getAndIncrement(); 356 } 357 } 358 359 static class MyAsyncProcessWithReplicas extends MyAsyncProcess { 360 private Set<byte[]> failures = new TreeSet<>(new Bytes.ByteArrayComparator()); 361 private long primarySleepMs = 0, replicaSleepMs = 0; 362 private Map<ServerName, Long> customPrimarySleepMs = new HashMap<>(); 363 private final AtomicLong replicaCalls = new AtomicLong(0); 364 365 public void addFailures(RegionInfo... hris) { 366 for (RegionInfo hri : hris) { 367 failures.add(hri.getRegionName()); 368 } 369 } 370 371 public long getReplicaCallCount() { 372 return replicaCalls.get(); 373 } 374 375 public void setPrimaryCallDelay(ServerName server, long primaryMs) { 376 customPrimarySleepMs.put(server, primaryMs); 377 } 378 379 public MyAsyncProcessWithReplicas(ClusterConnection hc, Configuration conf) { 380 super(hc, conf); 381 } 382 383 public void setCallDelays(long primaryMs, long replicaMs) { 384 this.primarySleepMs = primaryMs; 385 this.replicaSleepMs = replicaMs; 386 } 387 388 @Override 389 protected RpcRetryingCaller<AbstractResponse> createCaller( 390 CancellableRegionServerCallable payloadCallable, int rpcTimeout) { 391 MultiServerCallable callable = (MultiServerCallable) payloadCallable; 392 final MultiResponse mr = createMultiResponse( 393 callable.getMulti(), nbMultiResponse, nbActions, new ResponseGenerator() { 394 @Override 395 public void addResponse(MultiResponse mr, byte[] regionName, Action a) { 396 if (failures.contains(regionName)) { 397 mr.add(regionName, a.getOriginalIndex(), failure); 398 } else { 399 boolean isStale = !RegionReplicaUtil.isDefaultReplica(a.getReplicaId()); 400 mr.add(regionName, a.getOriginalIndex(), 401 Result.create(new Cell[0], null, isStale)); 402 } 403 } 404 }); 405 // Currently AsyncProcess either sends all-replica, or all-primary request. 406 final boolean isDefault = RegionReplicaUtil.isDefaultReplica( 407 callable.getMulti().actions.values().iterator().next().iterator().next().getReplicaId()); 408 final ServerName server = ((MultiServerCallable)callable).getServerName(); 409 String debugMsg = "Call to " + server + ", primary=" + isDefault + " with " 410 + callable.getMulti().actions.size() + " entries: "; 411 for (byte[] region : callable.getMulti().actions.keySet()) { 412 debugMsg += "[" + Bytes.toStringBinary(region) + "], "; 413 } 414 LOG.debug(debugMsg); 415 if (!isDefault) { 416 replicaCalls.incrementAndGet(); 417 } 418 419 return new RpcRetryingCallerImpl<AbstractResponse>(100, 500, 10, 9) { 420 @Override 421 public MultiResponse callWithoutRetries(RetryingCallable<AbstractResponse> callable, 422 int callTimeout) 423 throws IOException, RuntimeException { 424 long sleep = -1; 425 if (isDefault) { 426 Long customSleep = customPrimarySleepMs.get(server); 427 sleep = (customSleep == null ? primarySleepMs : customSleep.longValue()); 428 } else { 429 sleep = replicaSleepMs; 430 } 431 if (sleep != 0) { 432 try { 433 Thread.sleep(sleep); 434 } catch (InterruptedException e) { 435 } 436 } 437 return mr; 438 } 439 }; 440 } 441 } 442 443 static MultiResponse createMultiResponse(final MultiAction multi, 444 AtomicInteger nbMultiResponse, AtomicInteger nbActions, ResponseGenerator gen) { 445 final MultiResponse mr = new MultiResponse(); 446 nbMultiResponse.incrementAndGet(); 447 for (Map.Entry<byte[], List<Action>> entry : multi.actions.entrySet()) { 448 byte[] regionName = entry.getKey(); 449 for (Action a : entry.getValue()) { 450 nbActions.incrementAndGet(); 451 gen.addResponse(mr, regionName, a); 452 } 453 } 454 return mr; 455 } 456 457 private static interface ResponseGenerator { 458 void addResponse(final MultiResponse mr, byte[] regionName, Action a); 459 } 460 461 /** 462 * Returns our async process. 463 */ 464 static class MyConnectionImpl extends ConnectionImplementation { 465 public static class TestRegistry extends DoNothingAsyncRegistry { 466 467 public TestRegistry(Configuration conf) { 468 super(conf); 469 } 470 471 @Override 472 public CompletableFuture<String> getClusterId() { 473 return CompletableFuture.completedFuture("testClusterId"); 474 } 475 476 @Override 477 public CompletableFuture<Integer> getCurrentNrHRS() { 478 return CompletableFuture.completedFuture(1); 479 } 480 } 481 482 final AtomicInteger nbThreads = new AtomicInteger(0); 483 484 protected MyConnectionImpl(Configuration conf) throws IOException { 485 super(setupConf(conf), null, null); 486 } 487 488 private static Configuration setupConf(Configuration conf) { 489 conf.setClass(AsyncRegistryFactory.REGISTRY_IMPL_CONF_KEY, TestRegistry.class, 490 AsyncRegistry.class); 491 return conf; 492 } 493 494 @Override 495 public RegionLocations locateRegion(TableName tableName, 496 byte[] row, boolean useCache, boolean retry, int replicaId) throws IOException { 497 return new RegionLocations(loc1); 498 } 499 500 @Override 501 public boolean hasCellBlockSupport() { 502 return false; 503 } 504 } 505 506 /** 507 * Returns our async process. 508 */ 509 static class MyConnectionImpl2 extends MyConnectionImpl { 510 List<HRegionLocation> hrl; 511 final boolean usedRegions[]; 512 513 protected MyConnectionImpl2(List<HRegionLocation> hrl, Configuration conf) throws IOException { 514 super(conf); 515 this.hrl = hrl; 516 this.usedRegions = new boolean[hrl.size()]; 517 } 518 519 @Override 520 public RegionLocations locateRegion(TableName tableName, 521 byte[] row, boolean useCache, boolean retry, int replicaId) throws IOException { 522 int i = 0; 523 for (HRegionLocation hr : hrl){ 524 if (Arrays.equals(row, hr.getRegionInfo().getStartKey())) { 525 usedRegions[i] = true; 526 return new RegionLocations(hr); 527 } 528 i++; 529 } 530 return null; 531 } 532 } 533 @Test 534 public void testListRowAccess() { 535 int count = 10; 536 List<String> values = new LinkedList<>(); 537 for (int i = 0; i != count; ++i) { 538 values.add(String.valueOf(i)); 539 } 540 541 ListRowAccess<String> taker = new ListRowAccess(values); 542 assertEquals(count, taker.size()); 543 544 int restoreCount = 0; 545 int takeCount = 0; 546 Iterator<String> it = taker.iterator(); 547 while (it.hasNext()) { 548 String v = it.next(); 549 assertEquals(String.valueOf(takeCount), v); 550 ++takeCount; 551 it.remove(); 552 if (Math.random() >= 0.5) { 553 break; 554 } 555 } 556 assertEquals(count, taker.size() + takeCount); 557 558 it = taker.iterator(); 559 while (it.hasNext()) { 560 String v = it.next(); 561 assertEquals(String.valueOf(takeCount), v); 562 ++takeCount; 563 it.remove(); 564 } 565 assertEquals(0, taker.size()); 566 assertEquals(count, takeCount); 567 } 568 private static long calculateRequestCount(long putSizePerServer, long maxHeapSizePerRequest) { 569 if (putSizePerServer <= maxHeapSizePerRequest) { 570 return 1; 571 } else if (putSizePerServer % maxHeapSizePerRequest == 0) { 572 return putSizePerServer / maxHeapSizePerRequest; 573 } else { 574 return putSizePerServer / maxHeapSizePerRequest + 1; 575 } 576 } 577 578 @Test 579 public void testSubmitSameSizeOfRequest() throws Exception { 580 long writeBuffer = 2 * 1024 * 1024; 581 long putsHeapSize = writeBuffer; 582 doSubmitRequest(writeBuffer, putsHeapSize); 583 } 584 585 @Test 586 public void testSubmitLargeRequestWithUnlimitedSize() throws Exception { 587 long maxHeapSizePerRequest = Long.MAX_VALUE; 588 long putsHeapSize = 2 * 1024 * 1024; 589 doSubmitRequest(maxHeapSizePerRequest, putsHeapSize); 590 } 591 592 @Test 593 public void testSubmitRandomSizeRequest() throws Exception { 594 Random rn = new Random(); 595 final long limit = 10 * 1024 * 1024; 596 final int requestCount = 1 + (int) (rn.nextDouble() * 3); 597 long n = rn.nextLong(); 598 if (n < 0) { 599 n = -n; 600 } else if (n == 0) { 601 n = 1; 602 } 603 long putsHeapSize = n % limit; 604 long maxHeapSizePerRequest = putsHeapSize / requestCount; 605 LOG.info("[testSubmitRandomSizeRequest] maxHeapSizePerRequest=" + maxHeapSizePerRequest + 606 ", putsHeapSize=" + putsHeapSize); 607 doSubmitRequest(maxHeapSizePerRequest, putsHeapSize); 608 } 609 610 @Test 611 public void testSubmitSmallRequest() throws Exception { 612 long maxHeapSizePerRequest = 2 * 1024 * 1024; 613 long putsHeapSize = 100; 614 doSubmitRequest(maxHeapSizePerRequest, putsHeapSize); 615 } 616 617 @Test 618 public void testSubmitLargeRequest() throws Exception { 619 long maxHeapSizePerRequest = 2 * 1024 * 1024; 620 long putsHeapSize = maxHeapSizePerRequest * 2; 621 doSubmitRequest(maxHeapSizePerRequest, putsHeapSize); 622 } 623 624 private void doSubmitRequest(long maxHeapSizePerRequest, long putsHeapSize) throws Exception { 625 ClusterConnection conn = createHConnection(); 626 final String defaultClazz = 627 conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY); 628 final long defaultHeapSizePerRequest = conn.getConfiguration().getLong( 629 SimpleRequestController.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE, 630 SimpleRequestController.DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE); 631 conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY, 632 SimpleRequestController.class.getName()); 633 conn.getConfiguration().setLong(SimpleRequestController.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE, 634 maxHeapSizePerRequest); 635 636 // sn has two regions 637 long putSizeSN = 0; 638 long putSizeSN2 = 0; 639 List<Put> puts = new ArrayList<>(); 640 while ((putSizeSN + putSizeSN2) <= putsHeapSize) { 641 Put put1 = new Put(DUMMY_BYTES_1); 642 put1.addColumn(DUMMY_BYTES_1, DUMMY_BYTES_1, DUMMY_BYTES_1); 643 Put put2 = new Put(DUMMY_BYTES_2); 644 put2.addColumn(DUMMY_BYTES_2, DUMMY_BYTES_2, DUMMY_BYTES_2); 645 Put put3 = new Put(DUMMY_BYTES_3); 646 put3.addColumn(DUMMY_BYTES_3, DUMMY_BYTES_3, DUMMY_BYTES_3); 647 putSizeSN += (put1.heapSize() + put2.heapSize()); 648 putSizeSN2 += put3.heapSize(); 649 puts.add(put1); 650 puts.add(put2); 651 puts.add(put3); 652 } 653 654 int minCountSnRequest = (int) calculateRequestCount(putSizeSN, maxHeapSizePerRequest); 655 int minCountSn2Request = (int) calculateRequestCount(putSizeSN2, maxHeapSizePerRequest); 656 LOG.info("Total put count:" + puts.size() + ", putSizeSN:"+ putSizeSN 657 + ", putSizeSN2:" + putSizeSN2 658 + ", maxHeapSizePerRequest:" + maxHeapSizePerRequest 659 + ", minCountSnRequest:" + minCountSnRequest 660 + ", minCountSn2Request:" + minCountSn2Request); 661 662 MyAsyncProcess ap = new MyAsyncProcess(conn, CONF); 663 BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); 664 try (BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap)) { 665 mutator.mutate(puts); 666 mutator.flush(); 667 List<AsyncRequestFuture> reqs = ap.allReqs; 668 669 int actualSnReqCount = 0; 670 int actualSn2ReqCount = 0; 671 for (AsyncRequestFuture req : reqs) { 672 if (!(req instanceof AsyncRequestFutureImpl)) { 673 continue; 674 } 675 MyAsyncRequestFutureImpl ars = (MyAsyncRequestFutureImpl) req; 676 if (ars.getRequestHeapSize().containsKey(sn)) { 677 ++actualSnReqCount; 678 } 679 if (ars.getRequestHeapSize().containsKey(sn2)) { 680 ++actualSn2ReqCount; 681 } 682 } 683 // If the server is busy, the actual count may be incremented. 684 assertEquals(true, minCountSnRequest <= actualSnReqCount); 685 assertEquals(true, minCountSn2Request <= actualSn2ReqCount); 686 Map<ServerName, Long> sizePerServers = new HashMap<>(); 687 for (AsyncRequestFuture req : reqs) { 688 if (!(req instanceof AsyncRequestFutureImpl)) { 689 continue; 690 } 691 MyAsyncRequestFutureImpl ars = (MyAsyncRequestFutureImpl) req; 692 Map<ServerName, List<Long>> requestHeapSize = ars.getRequestHeapSize(); 693 for (Map.Entry<ServerName, List<Long>> entry : requestHeapSize.entrySet()) { 694 long sum = 0; 695 for (long size : entry.getValue()) { 696 assertEquals(true, size <= maxHeapSizePerRequest); 697 sum += size; 698 } 699 assertEquals(true, sum <= maxHeapSizePerRequest); 700 long value = sizePerServers.getOrDefault(entry.getKey(), 0L); 701 sizePerServers.put(entry.getKey(), value + sum); 702 } 703 } 704 assertEquals(true, sizePerServers.containsKey(sn)); 705 assertEquals(true, sizePerServers.containsKey(sn2)); 706 assertEquals(false, sizePerServers.containsKey(sn3)); 707 assertEquals(putSizeSN, (long) sizePerServers.get(sn)); 708 assertEquals(putSizeSN2, (long) sizePerServers.get(sn2)); 709 } 710 // restore config. 711 conn.getConfiguration().setLong(SimpleRequestController.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE, 712 defaultHeapSizePerRequest); 713 if (defaultClazz != null) { 714 conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY, 715 defaultClazz); 716 } 717 } 718 719 @Test 720 public void testSubmit() throws Exception { 721 ClusterConnection hc = createHConnection(); 722 MyAsyncProcess ap = new MyAsyncProcess(hc, CONF); 723 724 List<Put> puts = new ArrayList<>(1); 725 puts.add(createPut(1, true)); 726 727 ap.submit(null, DUMMY_TABLE, puts, false, null, false); 728 Assert.assertTrue(puts.isEmpty()); 729 } 730 731 @Test 732 public void testSubmitWithCB() throws Exception { 733 ClusterConnection hc = createHConnection(); 734 final AtomicInteger updateCalled = new AtomicInteger(0); 735 Batch.Callback<Object> cb = new Batch.Callback<Object>() { 736 @Override 737 public void update(byte[] region, byte[] row, Object result) { 738 updateCalled.incrementAndGet(); 739 } 740 }; 741 MyAsyncProcess ap = new MyAsyncProcess(hc, CONF); 742 743 List<Put> puts = new ArrayList<>(1); 744 puts.add(createPut(1, true)); 745 746 final AsyncRequestFuture ars = ap.submit(null, DUMMY_TABLE, puts, false, cb, false); 747 Assert.assertTrue(puts.isEmpty()); 748 ars.waitUntilDone(); 749 Assert.assertEquals(1, updateCalled.get()); 750 } 751 752 @Test 753 public void testSubmitBusyRegion() throws Exception { 754 ClusterConnection conn = createHConnection(); 755 final String defaultClazz = 756 conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY); 757 conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY, 758 SimpleRequestController.class.getName()); 759 MyAsyncProcess ap = new MyAsyncProcess(conn, CONF); 760 SimpleRequestController controller = (SimpleRequestController) ap.requestController; 761 List<Put> puts = new ArrayList<>(1); 762 puts.add(createPut(1, true)); 763 764 for (int i = 0; i != controller.maxConcurrentTasksPerRegion; ++i) { 765 ap.incTaskCounters(Collections.singleton(hri1.getRegionName()), sn); 766 } 767 ap.submit(null, DUMMY_TABLE, puts, false, null, false); 768 Assert.assertEquals(puts.size(), 1); 769 770 ap.decTaskCounters(Collections.singleton(hri1.getRegionName()), sn); 771 ap.submit(null, DUMMY_TABLE, puts, false, null, false); 772 Assert.assertEquals(0, puts.size()); 773 if (defaultClazz != null) { 774 conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY, 775 defaultClazz); 776 } 777 } 778 779 780 @Test 781 public void testSubmitBusyRegionServer() throws Exception { 782 ClusterConnection conn = createHConnection(); 783 MyAsyncProcess ap = new MyAsyncProcess(conn, CONF); 784 final String defaultClazz = 785 conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY); 786 conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY, 787 SimpleRequestController.class.getName()); 788 SimpleRequestController controller = (SimpleRequestController) ap.requestController; 789 controller.taskCounterPerServer.put(sn2, 790 new AtomicInteger(controller.maxConcurrentTasksPerServer)); 791 792 List<Put> puts = new ArrayList<>(4); 793 puts.add(createPut(1, true)); 794 puts.add(createPut(3, true)); // <== this one won't be taken, the rs is busy 795 puts.add(createPut(1, true)); // <== this one will make it, the region is already in 796 puts.add(createPut(2, true)); // <== new region, but the rs is ok 797 798 ap.submit(null, DUMMY_TABLE, puts, false, null, false); 799 Assert.assertEquals(" puts=" + puts, 1, puts.size()); 800 801 controller.taskCounterPerServer.put(sn2, 802 new AtomicInteger(controller.maxConcurrentTasksPerServer - 1)); 803 ap.submit(null, DUMMY_TABLE, puts, false, null, false); 804 Assert.assertTrue(puts.isEmpty()); 805 if (defaultClazz != null) { 806 conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY, 807 defaultClazz); 808 } 809 } 810 811 @Test 812 public void testFail() throws Exception { 813 MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF); 814 815 List<Put> puts = new ArrayList<>(1); 816 Put p = createPut(1, false); 817 puts.add(p); 818 819 AsyncRequestFuture ars = ap.submit(null, DUMMY_TABLE, puts, false, null, true); 820 Assert.assertEquals(0, puts.size()); 821 ars.waitUntilDone(); 822 verifyResult(ars, false); 823 Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get()); 824 825 Assert.assertEquals(1, ars.getErrors().exceptions.size()); 826 Assert.assertTrue("was: " + ars.getErrors().exceptions.get(0), 827 failure.equals(ars.getErrors().exceptions.get(0))); 828 Assert.assertTrue("was: " + ars.getErrors().exceptions.get(0), 829 failure.equals(ars.getErrors().exceptions.get(0))); 830 831 Assert.assertEquals(1, ars.getFailedOperations().size()); 832 Assert.assertTrue("was: " + ars.getFailedOperations().get(0), 833 p.equals(ars.getFailedOperations().get(0))); 834 } 835 836 837 @Test 838 public void testSubmitTrue() throws IOException { 839 ClusterConnection conn = createHConnection(); 840 final MyAsyncProcess ap = new MyAsyncProcess(conn, CONF); 841 final String defaultClazz = 842 conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY); 843 conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY, 844 SimpleRequestController.class.getName()); 845 SimpleRequestController controller = (SimpleRequestController) ap.requestController; 846 controller.tasksInProgress.incrementAndGet(); 847 final AtomicInteger ai = new AtomicInteger(controller.maxConcurrentTasksPerRegion); 848 controller.taskCounterPerRegion.put(hri1.getRegionName(), ai); 849 850 final AtomicBoolean checkPoint = new AtomicBoolean(false); 851 final AtomicBoolean checkPoint2 = new AtomicBoolean(false); 852 853 Thread t = new Thread(){ 854 @Override 855 public void run(){ 856 Threads.sleep(1000); 857 Assert.assertFalse(checkPoint.get()); // TODO: this is timing-dependent 858 ai.decrementAndGet(); 859 controller.tasksInProgress.decrementAndGet(); 860 checkPoint2.set(true); 861 } 862 }; 863 864 List<Put> puts = new ArrayList<>(1); 865 Put p = createPut(1, true); 866 puts.add(p); 867 868 ap.submit(null, DUMMY_TABLE, puts, false, null, false); 869 Assert.assertFalse(puts.isEmpty()); 870 871 t.start(); 872 873 ap.submit(null, DUMMY_TABLE, puts, true, null, false); 874 Assert.assertTrue(puts.isEmpty()); 875 876 checkPoint.set(true); 877 while (!checkPoint2.get()){ 878 Threads.sleep(1); 879 } 880 if (defaultClazz != null) { 881 conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY, 882 defaultClazz); 883 } 884 } 885 886 @Test 887 public void testFailAndSuccess() throws Exception { 888 MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF); 889 890 List<Put> puts = new ArrayList<>(3); 891 puts.add(createPut(1, false)); 892 puts.add(createPut(1, true)); 893 puts.add(createPut(1, true)); 894 895 AsyncRequestFuture ars = ap.submit(null, DUMMY_TABLE, puts, false, null, true); 896 Assert.assertTrue(puts.isEmpty()); 897 ars.waitUntilDone(); 898 verifyResult(ars, false, true, true); 899 Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get()); 900 ap.callsCt.set(0); 901 Assert.assertEquals(1, ars.getErrors().actions.size()); 902 903 puts.add(createPut(1, true)); 904 // Wait for AP to be free. While ars might have the result, ap counters are decreased later. 905 ap.waitForMaximumCurrentTasks(0, null); 906 ars = ap.submit(null, DUMMY_TABLE, puts, false, null, true); 907 Assert.assertEquals(0, puts.size()); 908 ars.waitUntilDone(); 909 Assert.assertEquals(1, ap.callsCt.get()); 910 verifyResult(ars, true); 911 } 912 913 @Test 914 public void testFlush() throws Exception { 915 MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF); 916 917 List<Put> puts = new ArrayList<>(3); 918 puts.add(createPut(1, false)); 919 puts.add(createPut(1, true)); 920 puts.add(createPut(1, true)); 921 922 AsyncRequestFuture ars = ap.submit(null, DUMMY_TABLE, puts, false, null, true); 923 ars.waitUntilDone(); 924 verifyResult(ars, false, true, true); 925 Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get()); 926 927 Assert.assertEquals(1, ars.getFailedOperations().size()); 928 } 929 930 @Test 931 public void testTaskCountWithoutClientBackoffPolicy() throws IOException, InterruptedException { 932 ClusterConnection hc = createHConnection(); 933 MyAsyncProcess ap = new MyAsyncProcess(hc, CONF); 934 testTaskCount(ap); 935 } 936 937 @Test 938 public void testTaskCountWithClientBackoffPolicy() throws IOException, InterruptedException { 939 Configuration copyConf = new Configuration(CONF); 940 copyConf.setBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE, true); 941 MyClientBackoffPolicy bp = new MyClientBackoffPolicy(); 942 ClusterConnection conn = createHConnection(); 943 Mockito.when(conn.getConfiguration()).thenReturn(copyConf); 944 Mockito.when(conn.getStatisticsTracker()).thenReturn(ServerStatisticTracker.create(copyConf)); 945 Mockito.when(conn.getBackoffPolicy()).thenReturn(bp); 946 final String defaultClazz = 947 conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY); 948 conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY, 949 SimpleRequestController.class.getName()); 950 MyAsyncProcess ap = new MyAsyncProcess(conn, copyConf); 951 testTaskCount(ap); 952 if (defaultClazz != null) { 953 conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY, 954 defaultClazz); 955 } 956 } 957 958 private void testTaskCount(MyAsyncProcess ap) 959 throws InterruptedIOException, InterruptedException { 960 SimpleRequestController controller = (SimpleRequestController) ap.requestController; 961 List<Put> puts = new ArrayList<>(); 962 for (int i = 0; i != 3; ++i) { 963 puts.add(createPut(1, true)); 964 puts.add(createPut(2, true)); 965 puts.add(createPut(3, true)); 966 } 967 ap.submit(null, DUMMY_TABLE, puts, true, null, false); 968 ap.waitForMaximumCurrentTasks(0, null); 969 // More time to wait if there are incorrect task count. 970 TimeUnit.SECONDS.sleep(1); 971 assertEquals(0, controller.tasksInProgress.get()); 972 for (AtomicInteger count : controller.taskCounterPerRegion.values()) { 973 assertEquals(0, count.get()); 974 } 975 for (AtomicInteger count : controller.taskCounterPerServer.values()) { 976 assertEquals(0, count.get()); 977 } 978 } 979 980 @Test 981 public void testMaxTask() throws Exception { 982 ClusterConnection conn = createHConnection(); 983 final String defaultClazz = 984 conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY); 985 conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY, 986 SimpleRequestController.class.getName()); 987 final MyAsyncProcess ap = new MyAsyncProcess(conn, CONF); 988 SimpleRequestController controller = (SimpleRequestController) ap.requestController; 989 990 991 for (int i = 0; i < 1000; i++) { 992 ap.incTaskCounters(Collections.singleton(Bytes.toBytes("dummy")), sn); 993 } 994 995 final Thread myThread = Thread.currentThread(); 996 997 Thread t = new Thread() { 998 @Override 999 public void run() { 1000 Threads.sleep(2000); 1001 myThread.interrupt(); 1002 } 1003 }; 1004 1005 List<Put> puts = new ArrayList<>(1); 1006 puts.add(createPut(1, true)); 1007 1008 t.start(); 1009 1010 try { 1011 ap.submit(null, DUMMY_TABLE, puts, false, null, false); 1012 Assert.fail("We should have been interrupted."); 1013 } catch (InterruptedIOException expected) { 1014 } 1015 1016 final long sleepTime = 2000; 1017 1018 Thread t2 = new Thread() { 1019 @Override 1020 public void run() { 1021 Threads.sleep(sleepTime); 1022 while (controller.tasksInProgress.get() > 0) { 1023 ap.decTaskCounters(Collections.singleton(Bytes.toBytes("dummy")), sn); 1024 } 1025 } 1026 }; 1027 t2.start(); 1028 1029 long start = System.currentTimeMillis(); 1030 ap.submit(null, DUMMY_TABLE, new ArrayList<>(), false, null, false); 1031 long end = System.currentTimeMillis(); 1032 1033 //Adds 100 to secure us against approximate timing. 1034 Assert.assertTrue(start + 100L + sleepTime > end); 1035 if (defaultClazz != null) { 1036 conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY, 1037 defaultClazz); 1038 } 1039 } 1040 1041 private ClusterConnection createHConnection() throws IOException { 1042 ClusterConnection hc = createHConnectionCommon(); 1043 setMockLocation(hc, DUMMY_BYTES_1, new RegionLocations(loc1)); 1044 setMockLocation(hc, DUMMY_BYTES_2, new RegionLocations(loc2)); 1045 setMockLocation(hc, DUMMY_BYTES_3, new RegionLocations(loc3)); 1046 Mockito.when(hc.locateRegions(Mockito.eq(DUMMY_TABLE), Mockito.anyBoolean(), 1047 Mockito.anyBoolean())).thenReturn(Arrays.asList(loc1, loc2, loc3)); 1048 setMockLocation(hc, FAILS, new RegionLocations(loc2)); 1049 return hc; 1050 } 1051 1052 private ClusterConnection createHConnectionWithReplicas() throws IOException { 1053 ClusterConnection hc = createHConnectionCommon(); 1054 setMockLocation(hc, DUMMY_BYTES_1, hrls1); 1055 setMockLocation(hc, DUMMY_BYTES_2, hrls2); 1056 setMockLocation(hc, DUMMY_BYTES_3, hrls3); 1057 List<HRegionLocation> locations = new ArrayList<>(); 1058 for (HRegionLocation loc : hrls1.getRegionLocations()) { 1059 locations.add(loc); 1060 } 1061 for (HRegionLocation loc : hrls2.getRegionLocations()) { 1062 locations.add(loc); 1063 } 1064 for (HRegionLocation loc : hrls3.getRegionLocations()) { 1065 locations.add(loc); 1066 } 1067 Mockito.when(hc.locateRegions(Mockito.eq(DUMMY_TABLE), Mockito.anyBoolean(), 1068 Mockito.anyBoolean())).thenReturn(locations); 1069 return hc; 1070 } 1071 1072 private static void setMockLocation(ClusterConnection hc, byte[] row, 1073 RegionLocations result) throws IOException { 1074 Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE), Mockito.eq(row), 1075 Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyInt())).thenReturn(result); 1076 Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE), Mockito.eq(row), 1077 Mockito.anyBoolean(), Mockito.anyBoolean())).thenReturn(result); 1078 } 1079 1080 private ClusterConnection createHConnectionCommon() { 1081 ClusterConnection hc = Mockito.mock(ClusterConnection.class); 1082 NonceGenerator ng = Mockito.mock(NonceGenerator.class); 1083 Mockito.when(ng.getNonceGroup()).thenReturn(HConstants.NO_NONCE); 1084 Mockito.when(hc.getNonceGenerator()).thenReturn(ng); 1085 Mockito.when(hc.getConfiguration()).thenReturn(CONF); 1086 Mockito.when(hc.getConnectionConfiguration()).thenReturn(CONNECTION_CONFIG); 1087 return hc; 1088 } 1089 1090 @Test 1091 public void testHTablePutSuccess() throws Exception { 1092 ClusterConnection conn = createHConnection(); 1093 MyAsyncProcess ap = new MyAsyncProcess(conn, CONF); 1094 BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); 1095 BufferedMutatorImpl ht = new BufferedMutatorImpl(conn, bufferParam, ap); 1096 1097 Put put = createPut(1, true); 1098 1099 Assert.assertEquals(conn.getConnectionConfiguration().getWriteBufferSize(), 1100 ht.getWriteBufferSize()); 1101 Assert.assertEquals(0, ht.getCurrentWriteBufferSize()); 1102 ht.mutate(put); 1103 ht.flush(); 1104 Assert.assertEquals(0, ht.getCurrentWriteBufferSize()); 1105 } 1106 1107 @Test 1108 public void testSettingWriteBufferPeriodicFlushParameters() throws Exception { 1109 ClusterConnection conn = createHConnection(); 1110 MyAsyncProcess ap = new MyAsyncProcess(conn, CONF); 1111 1112 checkPeriodicFlushParameters(conn, ap, 1113 1234, 1234, 1114 1234, 1234); 1115 checkPeriodicFlushParameters(conn, ap, 1116 0, 0, 1117 0, BufferedMutator.MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS); 1118 checkPeriodicFlushParameters(conn, ap, 1119 -1234, 0, 1120 -1234, BufferedMutator.MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS); 1121 checkPeriodicFlushParameters(conn, ap, 1122 1, 1, 1123 1, BufferedMutator.MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS); 1124 } 1125 1126 private void checkPeriodicFlushParameters(ClusterConnection conn, 1127 MyAsyncProcess ap, 1128 long setTO, long expectTO, 1129 long setTT, long expectTT 1130 ) { 1131 BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); 1132 1133 // The BufferedMutatorParams does nothing with the value 1134 bufferParam.setWriteBufferPeriodicFlushTimeoutMs(setTO); 1135 bufferParam.setWriteBufferPeriodicFlushTimerTickMs(setTT); 1136 Assert.assertEquals(setTO, bufferParam.getWriteBufferPeriodicFlushTimeoutMs()); 1137 Assert.assertEquals(setTT, bufferParam.getWriteBufferPeriodicFlushTimerTickMs()); 1138 1139 // The BufferedMutatorImpl corrects illegal values (indirect via BufferedMutatorParams) 1140 BufferedMutatorImpl ht1 = new BufferedMutatorImpl(conn, bufferParam, ap); 1141 Assert.assertEquals(expectTO, ht1.getWriteBufferPeriodicFlushTimeoutMs()); 1142 Assert.assertEquals(expectTT, ht1.getWriteBufferPeriodicFlushTimerTickMs()); 1143 1144 // The BufferedMutatorImpl corrects illegal values (direct via setter) 1145 BufferedMutatorImpl ht2 = 1146 new BufferedMutatorImpl(conn, createBufferedMutatorParams(ap, DUMMY_TABLE), ap); 1147 ht2.setWriteBufferPeriodicFlush(setTO, setTT); 1148 Assert.assertEquals(expectTO, ht2.getWriteBufferPeriodicFlushTimeoutMs()); 1149 Assert.assertEquals(expectTT, ht2.getWriteBufferPeriodicFlushTimerTickMs()); 1150 1151 } 1152 1153 @Test 1154 public void testWriteBufferPeriodicFlushTimeoutMs() throws Exception { 1155 ClusterConnection conn = createHConnection(); 1156 MyAsyncProcess ap = new MyAsyncProcess(conn, CONF); 1157 BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); 1158 1159 bufferParam.setWriteBufferPeriodicFlushTimeoutMs(1); // Flush ASAP 1160 bufferParam.setWriteBufferPeriodicFlushTimerTickMs(1); // Check every 100ms 1161 bufferParam.writeBufferSize(10000); // Write buffer set to much larger than the single record 1162 1163 BufferedMutatorImpl ht = new BufferedMutatorImpl(conn, bufferParam, ap); 1164 1165 // Verify if BufferedMutator has the right settings. 1166 Assert.assertEquals(10000, ht.getWriteBufferSize()); 1167 Assert.assertEquals(1, ht.getWriteBufferPeriodicFlushTimeoutMs()); 1168 Assert.assertEquals(BufferedMutator.MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS, 1169 ht.getWriteBufferPeriodicFlushTimerTickMs()); 1170 1171 Put put = createPut(1, true); 1172 1173 Assert.assertEquals(0, ht.getExecutedWriteBufferPeriodicFlushes()); 1174 Assert.assertEquals(0, ht.getCurrentWriteBufferSize()); 1175 1176 // ----- Insert, flush immediately, MUST NOT flush automatically 1177 ht.mutate(put); 1178 ht.flush(); 1179 1180 Thread.sleep(1000); 1181 Assert.assertEquals(0, ht.getExecutedWriteBufferPeriodicFlushes()); 1182 Assert.assertEquals(0, ht.getCurrentWriteBufferSize()); 1183 1184 // ----- Insert, NO flush, MUST flush automatically 1185 ht.mutate(put); 1186 Assert.assertEquals(0, ht.getExecutedWriteBufferPeriodicFlushes()); 1187 Assert.assertTrue(ht.getCurrentWriteBufferSize() > 0); 1188 1189 // The timerTick should fire every 100ms, so after twice that we must have 1190 // seen at least 1 tick and we should see an automatic flush 1191 Thread.sleep(200); 1192 Assert.assertEquals(1, ht.getExecutedWriteBufferPeriodicFlushes()); 1193 Assert.assertEquals(0, ht.getCurrentWriteBufferSize()); 1194 1195 // Ensure it does not flush twice 1196 Thread.sleep(200); 1197 Assert.assertEquals(1, ht.getExecutedWriteBufferPeriodicFlushes()); 1198 Assert.assertEquals(0, ht.getCurrentWriteBufferSize()); 1199 1200 // ----- DISABLE AUTO FLUSH, Insert, NO flush, MUST NOT flush automatically 1201 ht.disableWriteBufferPeriodicFlush(); 1202 ht.mutate(put); 1203 Assert.assertEquals(1, ht.getExecutedWriteBufferPeriodicFlushes()); 1204 Assert.assertTrue(ht.getCurrentWriteBufferSize() > 0); 1205 1206 // Wait for at least 1 timerTick, we should see NO flushes. 1207 Thread.sleep(200); 1208 Assert.assertEquals(1, ht.getExecutedWriteBufferPeriodicFlushes()); 1209 Assert.assertTrue(ht.getCurrentWriteBufferSize() > 0); 1210 1211 // Reenable periodic flushing, a flush seems to take about 1 second 1212 // so we wait for 2 seconds and it should have finished the flush. 1213 ht.setWriteBufferPeriodicFlush(1, 100); 1214 Thread.sleep(2000); 1215 Assert.assertEquals(2, ht.getExecutedWriteBufferPeriodicFlushes()); 1216 Assert.assertEquals(0, ht.getCurrentWriteBufferSize()); 1217 } 1218 1219 1220 @Test 1221 public void testBufferedMutatorImplWithSharedPool() throws Exception { 1222 ClusterConnection conn = createHConnection(); 1223 MyAsyncProcess ap = new MyAsyncProcess(conn, CONF); 1224 BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); 1225 BufferedMutator ht = new BufferedMutatorImpl(conn, bufferParam, ap); 1226 1227 ht.close(); 1228 assertFalse(ap.service.isShutdown()); 1229 } 1230 1231 @Test 1232 public void testFailedPutAndNewPut() throws Exception { 1233 ClusterConnection conn = createHConnection(); 1234 MyAsyncProcess ap = new MyAsyncProcess(conn, CONF); 1235 BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE) 1236 .writeBufferSize(0); 1237 BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap); 1238 1239 Put p = createPut(1, false); 1240 try { 1241 mutator.mutate(p); 1242 Assert.fail(); 1243 } catch (RetriesExhaustedWithDetailsException expected) { 1244 assertEquals(1, expected.getNumExceptions()); 1245 assertTrue(expected.getRow(0) == p); 1246 } 1247 // Let's do all the retries. 1248 ap.waitForMaximumCurrentTasks(0, null); 1249 Assert.assertEquals(0, mutator.size()); 1250 1251 // There is no global error so the new put should not fail 1252 mutator.mutate(createPut(1, true)); 1253 Assert.assertEquals("the put should not been inserted.", 0, mutator.size()); 1254 } 1255 1256 @SuppressWarnings("SelfComparison") 1257 @Test 1258 public void testAction() { 1259 Action action_0 = new Action(new Put(Bytes.toBytes("abc")), 10); 1260 Action action_1 = new Action(new Put(Bytes.toBytes("ccc")), 10); 1261 Action action_2 = new Action(new Put(Bytes.toBytes("ccc")), 10); 1262 Action action_3 = new Action(new Delete(Bytes.toBytes("ccc")), 10); 1263 assertFalse(action_0.equals(action_1)); 1264 assertTrue(action_0.equals(action_0)); 1265 assertTrue(action_1.equals(action_2)); 1266 assertTrue(action_2.equals(action_1)); 1267 assertFalse(action_0.equals(new Put(Bytes.toBytes("abc")))); 1268 assertTrue(action_2.equals(action_3)); 1269 assertFalse(action_0.equals(action_3)); 1270 assertEquals(0, action_0.compareTo(action_0)); 1271 assertTrue(action_0.compareTo(action_1) < 0); 1272 assertTrue(action_1.compareTo(action_0) > 0); 1273 assertEquals(0, action_1.compareTo(action_2)); 1274 } 1275 1276 @Test 1277 public void testBatch() throws IOException, InterruptedException { 1278 ClusterConnection conn = new MyConnectionImpl(CONF); 1279 HTable ht = (HTable) conn.getTable(DUMMY_TABLE); 1280 ht.multiAp = new MyAsyncProcess(conn, CONF); 1281 1282 List<Put> puts = new ArrayList<>(7); 1283 puts.add(createPut(1, true)); 1284 puts.add(createPut(1, true)); 1285 puts.add(createPut(1, true)); 1286 puts.add(createPut(1, true)); 1287 puts.add(createPut(1, false)); // <=== the bad apple, position 4 1288 puts.add(createPut(1, true)); 1289 puts.add(createPut(1, false)); // <=== another bad apple, position 6 1290 1291 Object[] res = new Object[puts.size()]; 1292 try { 1293 ht.batch(puts, res); 1294 Assert.fail(); 1295 } catch (RetriesExhaustedException expected) { 1296 } 1297 1298 Assert.assertEquals(success, res[0]); 1299 Assert.assertEquals(success, res[1]); 1300 Assert.assertEquals(success, res[2]); 1301 Assert.assertEquals(success, res[3]); 1302 Assert.assertEquals(failure, res[4]); 1303 Assert.assertEquals(success, res[5]); 1304 Assert.assertEquals(failure, res[6]); 1305 } 1306 @Test 1307 public void testErrorsServers() throws IOException { 1308 Configuration configuration = new Configuration(CONF); 1309 ClusterConnection conn = new MyConnectionImpl(configuration); 1310 MyAsyncProcess ap = new MyAsyncProcess(conn, configuration); 1311 BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); 1312 BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap); 1313 configuration.setBoolean(ConnectionImplementation.RETRIES_BY_SERVER_KEY, true); 1314 1315 Assert.assertNotNull(ap.createServerErrorTracker()); 1316 Assert.assertTrue(ap.serverTrackerTimeout > 200L); 1317 ap.serverTrackerTimeout = 1L; 1318 1319 Put p = createPut(1, false); 1320 mutator.mutate(p); 1321 1322 try { 1323 mutator.flush(); 1324 Assert.fail(); 1325 } catch (RetriesExhaustedWithDetailsException expected) { 1326 assertEquals(1, expected.getNumExceptions()); 1327 assertTrue(expected.getRow(0) == p); 1328 } 1329 // Checking that the ErrorsServers came into play and didn't make us stop immediately 1330 Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get()); 1331 } 1332 1333 @Test 1334 public void testReadAndWriteTimeout() throws IOException { 1335 final long readTimeout = 10 * 1000; 1336 final long writeTimeout = 20 * 1000; 1337 Configuration copyConf = new Configuration(CONF); 1338 copyConf.setLong(HConstants.HBASE_RPC_READ_TIMEOUT_KEY, readTimeout); 1339 copyConf.setLong(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, writeTimeout); 1340 ClusterConnection conn = new MyConnectionImpl(copyConf); 1341 MyAsyncProcess ap = new MyAsyncProcess(conn, copyConf); 1342 try (HTable ht = (HTable) conn.getTable(DUMMY_TABLE)) { 1343 ht.multiAp = ap; 1344 List<Get> gets = new LinkedList<>(); 1345 gets.add(new Get(DUMMY_BYTES_1)); 1346 gets.add(new Get(DUMMY_BYTES_2)); 1347 try { 1348 ht.get(gets); 1349 } catch (ClassCastException e) { 1350 // No result response on this test. 1351 } 1352 assertEquals(readTimeout, ap.previousTimeout); 1353 ap.previousTimeout = -1; 1354 1355 try { 1356 ht.existsAll(gets); 1357 } catch (ClassCastException e) { 1358 // No result response on this test. 1359 } 1360 assertEquals(readTimeout, ap.previousTimeout); 1361 ap.previousTimeout = -1; 1362 1363 List<Delete> deletes = new LinkedList<>(); 1364 deletes.add(new Delete(DUMMY_BYTES_1)); 1365 deletes.add(new Delete(DUMMY_BYTES_2)); 1366 ht.delete(deletes); 1367 assertEquals(writeTimeout, ap.previousTimeout); 1368 } 1369 } 1370 1371 @Test 1372 public void testErrors() throws IOException { 1373 ClusterConnection conn = new MyConnectionImpl(CONF); 1374 AsyncProcessWithFailure ap = new AsyncProcessWithFailure(conn, CONF, new IOException("test")); 1375 BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); 1376 BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap); 1377 1378 Assert.assertNotNull(ap.createServerErrorTracker()); 1379 1380 Put p = createPut(1, true); 1381 mutator.mutate(p); 1382 1383 try { 1384 mutator.flush(); 1385 Assert.fail(); 1386 } catch (RetriesExhaustedWithDetailsException expected) { 1387 assertEquals(1, expected.getNumExceptions()); 1388 assertTrue(expected.getRow(0) == p); 1389 } 1390 // Checking that the ErrorsServers came into play and didn't make us stop immediately 1391 Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get()); 1392 } 1393 1394 1395 @Test 1396 public void testCallQueueTooLarge() throws IOException { 1397 ClusterConnection conn = new MyConnectionImpl(CONF); 1398 AsyncProcessWithFailure ap = 1399 new AsyncProcessWithFailure(conn, CONF, new CallQueueTooBigException()); 1400 BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); 1401 BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap); 1402 Assert.assertNotNull(ap.createServerErrorTracker()); 1403 Put p = createPut(1, true); 1404 mutator.mutate(p); 1405 1406 try { 1407 mutator.flush(); 1408 Assert.fail(); 1409 } catch (RetriesExhaustedWithDetailsException expected) { 1410 assertEquals(1, expected.getNumExceptions()); 1411 assertTrue(expected.getRow(0) == p); 1412 } 1413 // Checking that the ErrorsServers came into play and didn't make us stop immediately 1414 Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get()); 1415 } 1416 /** 1417 * This test simulates multiple regions on 2 servers. We should have 2 multi requests and 1418 * 2 threads: 1 per server, this whatever the number of regions. 1419 */ 1420 @Test 1421 public void testThreadCreation() throws Exception { 1422 final int NB_REGS = 100; 1423 List<HRegionLocation> hrls = new ArrayList<>(NB_REGS); 1424 List<Get> gets = new ArrayList<>(NB_REGS); 1425 for (int i = 0; i < NB_REGS; i++) { 1426 HRegionInfo hri = new HRegionInfo( 1427 DUMMY_TABLE, Bytes.toBytes(i * 10L), Bytes.toBytes(i * 10L + 9L), false, i); 1428 HRegionLocation hrl = new HRegionLocation(hri, i % 2 == 0 ? sn : sn2); 1429 hrls.add(hrl); 1430 1431 Get get = new Get(Bytes.toBytes(i * 10L)); 1432 gets.add(get); 1433 } 1434 1435 MyConnectionImpl2 con = new MyConnectionImpl2(hrls, CONF); 1436 MyAsyncProcess ap = new MyAsyncProcess(con, CONF, con.nbThreads); 1437 HTable ht = (HTable) con.getTable(DUMMY_TABLE, ap.service); 1438 ht.multiAp = ap; 1439 ht.batch(gets, null); 1440 1441 Assert.assertEquals(NB_REGS, ap.nbActions.get()); 1442 Assert.assertEquals("1 multi response per server", 2, ap.nbMultiResponse.get()); 1443 Assert.assertEquals("1 thread per server", 2, con.nbThreads.get()); 1444 1445 int nbReg = 0; 1446 for (int i =0; i<NB_REGS; i++){ 1447 if (con.usedRegions[i]) nbReg++; 1448 } 1449 Assert.assertEquals("nbReg=" + nbReg, NB_REGS, nbReg); 1450 } 1451 1452 @Test 1453 public void testReplicaReplicaSuccess() throws Exception { 1454 // Main call takes too long so replicas succeed, except for one region w/o replicas. 1455 // One region has no replica, so the main call succeeds for it. 1456 MyAsyncProcessWithReplicas ap = createReplicaAp(10, 1000, 0); 1457 List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2, DUMMY_BYTES_3); 1458 AsyncProcessTask task = AsyncProcessTask.newBuilder() 1459 .setPool(ap.service) 1460 .setRpcTimeout(RPC_TIMEOUT) 1461 .setOperationTimeout(OPERATION_TIMEOUT) 1462 .setTableName(DUMMY_TABLE) 1463 .setRowAccess(rows) 1464 .setResults(new Object[3]) 1465 .setSubmittedRows(SubmittedRows.ALL) 1466 .build(); 1467 AsyncRequestFuture ars = ap.submit(task); 1468 verifyReplicaResult(ars, RR.TRUE, RR.TRUE, RR.FALSE); 1469 Assert.assertEquals(2, ap.getReplicaCallCount()); 1470 } 1471 1472 @Test 1473 public void testReplicaPrimarySuccessWoReplicaCalls() throws Exception { 1474 // Main call succeeds before replica calls are kicked off. 1475 MyAsyncProcessWithReplicas ap = createReplicaAp(1000, 10, 0); 1476 List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2, DUMMY_BYTES_3); 1477 AsyncProcessTask task = AsyncProcessTask.newBuilder() 1478 .setPool(ap.service) 1479 .setRpcTimeout(RPC_TIMEOUT) 1480 .setOperationTimeout(OPERATION_TIMEOUT) 1481 .setTableName(DUMMY_TABLE) 1482 .setRowAccess(rows) 1483 .setResults(new Object[3]) 1484 .setSubmittedRows(SubmittedRows.ALL) 1485 .build(); 1486 AsyncRequestFuture ars = ap.submit(task); 1487 verifyReplicaResult(ars, RR.FALSE, RR.FALSE, RR.FALSE); 1488 Assert.assertEquals(0, ap.getReplicaCallCount()); 1489 } 1490 1491 @Test 1492 public void testReplicaParallelCallsSucceed() throws Exception { 1493 // Either main or replica can succeed. 1494 MyAsyncProcessWithReplicas ap = createReplicaAp(0, 0, 0); 1495 List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2); 1496 AsyncProcessTask task = AsyncProcessTask.newBuilder() 1497 .setPool(ap.service) 1498 .setRpcTimeout(RPC_TIMEOUT) 1499 .setOperationTimeout(OPERATION_TIMEOUT) 1500 .setTableName(DUMMY_TABLE) 1501 .setRowAccess(rows) 1502 .setResults(new Object[2]) 1503 .setSubmittedRows(SubmittedRows.ALL) 1504 .build(); 1505 AsyncRequestFuture ars = ap.submit(task); 1506 verifyReplicaResult(ars, RR.DONT_CARE, RR.DONT_CARE); 1507 long replicaCalls = ap.getReplicaCallCount(); 1508 Assert.assertTrue(replicaCalls >= 0); 1509 Assert.assertTrue(replicaCalls <= 2); 1510 } 1511 1512 @Test 1513 public void testReplicaPartialReplicaCall() throws Exception { 1514 // One server is slow, so the result for its region comes from replica, whereas 1515 // the result for other region comes from primary before replica calls happen. 1516 // There should be no replica call for that region at all. 1517 MyAsyncProcessWithReplicas ap = createReplicaAp(1000, 0, 0); 1518 ap.setPrimaryCallDelay(sn2, 2000); 1519 List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2); 1520 AsyncProcessTask task = AsyncProcessTask.newBuilder() 1521 .setPool(ap.service) 1522 .setRpcTimeout(RPC_TIMEOUT) 1523 .setOperationTimeout(OPERATION_TIMEOUT) 1524 .setTableName(DUMMY_TABLE) 1525 .setRowAccess(rows) 1526 .setResults(new Object[2]) 1527 .setSubmittedRows(SubmittedRows.ALL) 1528 .build(); 1529 AsyncRequestFuture ars = ap.submit(task); 1530 verifyReplicaResult(ars, RR.FALSE, RR.TRUE); 1531 Assert.assertEquals(1, ap.getReplicaCallCount()); 1532 } 1533 1534 @Test 1535 public void testReplicaMainFailsBeforeReplicaCalls() throws Exception { 1536 // Main calls fail before replica calls can start - this is currently not handled. 1537 // It would probably never happen if we can get location (due to retries), 1538 // and it would require additional synchronization. 1539 MyAsyncProcessWithReplicas ap = createReplicaAp(1000, 0, 0, 0); 1540 ap.addFailures(hri1, hri2); 1541 List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2); 1542 AsyncProcessTask task = AsyncProcessTask.newBuilder() 1543 .setPool(ap.service) 1544 .setRpcTimeout(RPC_TIMEOUT) 1545 .setOperationTimeout(OPERATION_TIMEOUT) 1546 .setTableName(DUMMY_TABLE) 1547 .setRowAccess(rows) 1548 .setResults(new Object[2]) 1549 .setSubmittedRows(SubmittedRows.ALL) 1550 .build(); 1551 AsyncRequestFuture ars = ap.submit(task); 1552 verifyReplicaResult(ars, RR.FAILED, RR.FAILED); 1553 Assert.assertEquals(0, ap.getReplicaCallCount()); 1554 } 1555 1556 @Test 1557 public void testReplicaReplicaSuccessWithParallelFailures() throws Exception { 1558 // Main calls fails after replica calls start. For two-replica region, one replica call 1559 // also fails. Regardless, we get replica results for both regions. 1560 MyAsyncProcessWithReplicas ap = createReplicaAp(0, 1000, 1000, 0); 1561 ap.addFailures(hri1, hri1r2, hri2); 1562 List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2); 1563 AsyncProcessTask task = AsyncProcessTask.newBuilder() 1564 .setPool(ap.service) 1565 .setRpcTimeout(RPC_TIMEOUT) 1566 .setOperationTimeout(OPERATION_TIMEOUT) 1567 .setTableName(DUMMY_TABLE) 1568 .setRowAccess(rows) 1569 .setResults(new Object[2]) 1570 .setSubmittedRows(SubmittedRows.ALL) 1571 .build(); 1572 AsyncRequestFuture ars = ap.submit(task); 1573 verifyReplicaResult(ars, RR.TRUE, RR.TRUE); 1574 Assert.assertEquals(2, ap.getReplicaCallCount()); 1575 } 1576 1577 @Test 1578 public void testReplicaAllCallsFailForOneRegion() throws Exception { 1579 // For one of the region, all 3, main and replica, calls fail. For the other, replica 1580 // call fails but its exception should not be visible as it did succeed. 1581 MyAsyncProcessWithReplicas ap = createReplicaAp(500, 1000, 0, 0); 1582 ap.addFailures(hri1, hri1r1, hri1r2, hri2r1); 1583 List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2); 1584 AsyncProcessTask task = AsyncProcessTask.newBuilder() 1585 .setPool(ap.service) 1586 .setRpcTimeout(RPC_TIMEOUT) 1587 .setOperationTimeout(OPERATION_TIMEOUT) 1588 .setTableName(DUMMY_TABLE) 1589 .setRowAccess(rows) 1590 .setResults(new Object[2]) 1591 .setSubmittedRows(SubmittedRows.ALL) 1592 .build(); 1593 AsyncRequestFuture ars = ap.submit(task); 1594 verifyReplicaResult(ars, RR.FAILED, RR.FALSE); 1595 // We should get 3 exceptions, for main + 2 replicas for DUMMY_BYTES_1 1596 Assert.assertEquals(3, ars.getErrors().getNumExceptions()); 1597 for (int i = 0; i < ars.getErrors().getNumExceptions(); ++i) { 1598 Assert.assertArrayEquals(DUMMY_BYTES_1, ars.getErrors().getRow(i).getRow()); 1599 } 1600 } 1601 1602 private MyAsyncProcessWithReplicas createReplicaAp( 1603 int replicaAfterMs, int primaryMs, int replicaMs) throws Exception { 1604 return createReplicaAp(replicaAfterMs, primaryMs, replicaMs, -1); 1605 } 1606 1607 private MyAsyncProcessWithReplicas createReplicaAp( 1608 int replicaAfterMs, int primaryMs, int replicaMs, int retries) throws Exception { 1609 // TODO: this is kind of timing dependent... perhaps it should detect from createCaller 1610 // that the replica call has happened and that way control the ordering. 1611 Configuration conf = new Configuration(); 1612 ClusterConnection conn = createHConnectionWithReplicas(); 1613 conf.setInt(AsyncProcess.PRIMARY_CALL_TIMEOUT_KEY, replicaAfterMs * 1000); 1614 if (retries >= 0) { 1615 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, retries); 1616 } 1617 MyAsyncProcessWithReplicas ap = new MyAsyncProcessWithReplicas(conn, conf); 1618 ap.setCallDelays(primaryMs, replicaMs); 1619 return ap; 1620 } 1621 1622 private BufferedMutatorParams createBufferedMutatorParams(MyAsyncProcess ap, 1623 TableName name) { 1624 return new BufferedMutatorParams(name) 1625 .pool(ap.service) 1626 .rpcTimeout(RPC_TIMEOUT) 1627 .opertationTimeout(OPERATION_TIMEOUT); 1628 } 1629 1630 private static List<Get> makeTimelineGets(byte[]... rows) { 1631 List<Get> result = new ArrayList<>(rows.length); 1632 for (byte[] row : rows) { 1633 Get get = new Get(row); 1634 get.setConsistency(Consistency.TIMELINE); 1635 result.add(get); 1636 } 1637 return result; 1638 } 1639 1640 private void verifyResult(AsyncRequestFuture ars, boolean... expected) throws Exception { 1641 Object[] actual = ars.getResults(); 1642 Assert.assertEquals(expected.length, actual.length); 1643 for (int i = 0; i < expected.length; ++i) { 1644 Assert.assertEquals(expected[i], !(actual[i] instanceof Throwable)); 1645 } 1646 } 1647 1648 /** After reading TheDailyWtf, I always wanted to create a MyBoolean enum like this! */ 1649 private enum RR { 1650 TRUE, 1651 FALSE, 1652 DONT_CARE, 1653 FAILED 1654 } 1655 1656 private void verifyReplicaResult(AsyncRequestFuture ars, RR... expecteds) throws Exception { 1657 Object[] actuals = ars.getResults(); 1658 Assert.assertEquals(expecteds.length, actuals.length); 1659 for (int i = 0; i < expecteds.length; ++i) { 1660 Object actual = actuals[i]; 1661 RR expected = expecteds[i]; 1662 Assert.assertEquals(actual.toString(), expected == RR.FAILED, actual instanceof Throwable); 1663 if (expected != RR.FAILED && expected != RR.DONT_CARE) { 1664 Assert.assertEquals(expected == RR.TRUE, ((Result)actual).isStale()); 1665 } 1666 } 1667 } 1668 1669 /** 1670 * @param regCnt the region: 1 to 3. 1671 * @param success if true, the put will succeed. 1672 * @return a put 1673 */ 1674 private Put createPut(int regCnt, boolean success) { 1675 Put p; 1676 if (!success) { 1677 p = new Put(FAILS); 1678 } else switch (regCnt){ 1679 case 1 : 1680 p = new Put(DUMMY_BYTES_1); 1681 break; 1682 case 2: 1683 p = new Put(DUMMY_BYTES_2); 1684 break; 1685 case 3: 1686 p = new Put(DUMMY_BYTES_3); 1687 break; 1688 default: 1689 throw new IllegalArgumentException("unknown " + regCnt); 1690 } 1691 1692 p.addColumn(DUMMY_BYTES_1, DUMMY_BYTES_1, DUMMY_BYTES_1); 1693 1694 return p; 1695 } 1696 1697 static class MyThreadPoolExecutor extends ThreadPoolExecutor { 1698 public MyThreadPoolExecutor(int coreThreads, int maxThreads, long keepAliveTime, 1699 TimeUnit timeunit, BlockingQueue<Runnable> blockingqueue) { 1700 super(coreThreads, maxThreads, keepAliveTime, timeunit, blockingqueue); 1701 } 1702 1703 @Override 1704 public Future submit(Runnable runnable) { 1705 throw new OutOfMemoryError("OutOfMemory error thrown by means"); 1706 } 1707 } 1708 1709 static class AsyncProcessForThrowableCheck extends AsyncProcess { 1710 public AsyncProcessForThrowableCheck(ClusterConnection hc, Configuration conf) { 1711 super(hc, conf, new RpcRetryingCallerFactory(conf), new RpcControllerFactory( 1712 conf)); 1713 } 1714 } 1715 1716 @Test 1717 public void testUncheckedException() throws Exception { 1718 // Test the case pool.submit throws unchecked exception 1719 ClusterConnection hc = createHConnection(); 1720 MyThreadPoolExecutor myPool = 1721 new MyThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS, 1722 new LinkedBlockingQueue<>(200)); 1723 AsyncProcess ap = new AsyncProcessForThrowableCheck(hc, CONF); 1724 1725 List<Put> puts = new ArrayList<>(1); 1726 puts.add(createPut(1, true)); 1727 AsyncProcessTask task = AsyncProcessTask.newBuilder() 1728 .setPool(myPool) 1729 .setRpcTimeout(RPC_TIMEOUT) 1730 .setOperationTimeout(OPERATION_TIMEOUT) 1731 .setTableName(DUMMY_TABLE) 1732 .setRowAccess(puts) 1733 .setSubmittedRows(SubmittedRows.NORMAL) 1734 .build(); 1735 ap.submit(task); 1736 Assert.assertTrue(puts.isEmpty()); 1737 } 1738 1739 /** 1740 * Test and make sure we could use a special pause setting when retry with 1741 * CallQueueTooBigException, see HBASE-17114 1742 * @throws Exception if unexpected error happened during test 1743 */ 1744 @Test 1745 public void testRetryPauseWithCallQueueTooBigException() throws Exception { 1746 Configuration myConf = new Configuration(CONF); 1747 final long specialPause = 500L; 1748 final int retries = 1; 1749 myConf.setLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, specialPause); 1750 myConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, retries); 1751 ClusterConnection conn = new MyConnectionImpl(myConf); 1752 AsyncProcessWithFailure ap = 1753 new AsyncProcessWithFailure(conn, myConf, new CallQueueTooBigException()); 1754 BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); 1755 BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap); 1756 1757 Assert.assertNotNull(mutator.getAsyncProcess().createServerErrorTracker()); 1758 1759 Put p = createPut(1, true); 1760 mutator.mutate(p); 1761 1762 long startTime = System.currentTimeMillis(); 1763 try { 1764 mutator.flush(); 1765 Assert.fail(); 1766 } catch (RetriesExhaustedWithDetailsException expected) { 1767 assertEquals(1, expected.getNumExceptions()); 1768 assertTrue(expected.getRow(0) == p); 1769 } 1770 long actualSleep = System.currentTimeMillis() - startTime; 1771 long expectedSleep = 0L; 1772 for (int i = 0; i < retries; i++) { 1773 expectedSleep += ConnectionUtils.getPauseTime(specialPause, i); 1774 // Prevent jitter in CollectionUtils#getPauseTime to affect result 1775 actualSleep += (long) (specialPause * 0.01f); 1776 } 1777 LOG.debug("Expected to sleep " + expectedSleep + "ms, actually slept " + actualSleep + "ms"); 1778 Assert.assertTrue("Expected to sleep " + expectedSleep + " but actually " + actualSleep + "ms", 1779 actualSleep >= expectedSleep); 1780 1781 // check and confirm normal IOE will use the normal pause 1782 final long normalPause = 1783 myConf.getLong(HConstants.HBASE_CLIENT_PAUSE, HConstants.DEFAULT_HBASE_CLIENT_PAUSE); 1784 ap = new AsyncProcessWithFailure(conn, myConf, new IOException()); 1785 bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); 1786 mutator = new BufferedMutatorImpl(conn, bufferParam, ap); 1787 Assert.assertNotNull(mutator.getAsyncProcess().createServerErrorTracker()); 1788 mutator.mutate(p); 1789 startTime = System.currentTimeMillis(); 1790 try { 1791 mutator.flush(); 1792 Assert.fail(); 1793 } catch (RetriesExhaustedWithDetailsException expected) { 1794 assertEquals(1, expected.getNumExceptions()); 1795 assertTrue(expected.getRow(0) == p); 1796 } 1797 actualSleep = System.currentTimeMillis() - startTime; 1798 expectedSleep = 0L; 1799 for (int i = 0; i < retries; i++) { 1800 expectedSleep += ConnectionUtils.getPauseTime(normalPause, i); 1801 } 1802 // plus an additional pause to balance the program execution time 1803 expectedSleep += normalPause; 1804 LOG.debug("Expected to sleep " + expectedSleep + "ms, actually slept " + actualSleep + "ms"); 1805 Assert.assertTrue("Slept for too long: " + actualSleep + "ms", actualSleep <= expectedSleep); 1806 } 1807 1808 @Test 1809 public void testRetryWithExceptionClearsMetaCache() throws Exception { 1810 ClusterConnection conn = createHConnection(); 1811 Configuration myConf = conn.getConfiguration(); 1812 myConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0); 1813 1814 AsyncProcessWithFailure ap = 1815 new AsyncProcessWithFailure(conn, myConf, new RegionOpeningException("test")); 1816 BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); 1817 BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap); 1818 1819 Assert.assertNotNull(mutator.getAsyncProcess().createServerErrorTracker()); 1820 1821 Assert.assertEquals( 1822 conn.locateRegion(DUMMY_TABLE, DUMMY_BYTES_1, true, true).toString(), 1823 new RegionLocations(loc1).toString()); 1824 1825 Mockito.verify(conn, Mockito.times(0)).clearCaches(Mockito.any()); 1826 1827 Put p = createPut(1, true); 1828 mutator.mutate(p); 1829 1830 try { 1831 mutator.flush(); 1832 Assert.fail(); 1833 } catch (RetriesExhaustedWithDetailsException expected) { 1834 assertEquals(1, expected.getNumExceptions()); 1835 assertTrue(expected.getRow(0) == p); 1836 } 1837 1838 Mockito.verify(conn, Mockito.times(1)).clearCaches(loc1.getServerName()); 1839 } 1840 1841 @Test 1842 public void testQueueRowAccess() throws Exception { 1843 ClusterConnection conn = createHConnection(); 1844 BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, null, null, 1845 new BufferedMutatorParams(DUMMY_TABLE).writeBufferSize(100000)); 1846 Put p0 = new Put(DUMMY_BYTES_1).addColumn(DUMMY_BYTES_1, DUMMY_BYTES_1, DUMMY_BYTES_1); 1847 Put p1 = new Put(DUMMY_BYTES_2).addColumn(DUMMY_BYTES_2, DUMMY_BYTES_2, DUMMY_BYTES_2); 1848 mutator.mutate(p0); 1849 BufferedMutatorImpl.QueueRowAccess ra0 = mutator.createQueueRowAccess(); 1850 // QueueRowAccess should take all undealt mutations 1851 assertEquals(0, mutator.size()); 1852 mutator.mutate(p1); 1853 assertEquals(1, mutator.size()); 1854 BufferedMutatorImpl.QueueRowAccess ra1 = mutator.createQueueRowAccess(); 1855 // QueueRowAccess should take all undealt mutations 1856 assertEquals(0, mutator.size()); 1857 assertEquals(1, ra0.size()); 1858 assertEquals(1, ra1.size()); 1859 Iterator<Row> iter0 = ra0.iterator(); 1860 Iterator<Row> iter1 = ra1.iterator(); 1861 assertTrue(iter0.hasNext()); 1862 assertTrue(iter1.hasNext()); 1863 // the next() will poll the mutation from inner buffer and update the buffer count 1864 assertTrue(iter0.next() == p0); 1865 assertEquals(1, mutator.getUnflushedSize()); 1866 assertEquals(p1.heapSize(), mutator.getCurrentWriteBufferSize()); 1867 assertTrue(iter1.next() == p1); 1868 assertEquals(0, mutator.getUnflushedSize()); 1869 assertEquals(0, mutator.getCurrentWriteBufferSize()); 1870 assertFalse(iter0.hasNext()); 1871 assertFalse(iter1.hasNext()); 1872 // ra0 doest handle the mutation so the mutation won't be pushed back to buffer 1873 iter0.remove(); 1874 ra0.close(); 1875 assertEquals(0, mutator.size()); 1876 assertEquals(0, mutator.getUnflushedSize()); 1877 assertEquals(0, mutator.getCurrentWriteBufferSize()); 1878 // ra1 doesn't handle the mutation so the mutation will be pushed back to buffer 1879 ra1.close(); 1880 assertEquals(1, mutator.size()); 1881 assertEquals(1, mutator.getUnflushedSize()); 1882 assertEquals(p1.heapSize(), mutator.getCurrentWriteBufferSize()); 1883 } 1884}