001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023 024import java.io.IOException; 025import java.io.InterruptedIOException; 026import java.util.ArrayList; 027import java.util.Arrays; 028import java.util.Collections; 029import java.util.HashMap; 030import java.util.Iterator; 031import java.util.LinkedList; 032import java.util.List; 033import java.util.Map; 034import java.util.Random; 035import java.util.Set; 036import java.util.TreeSet; 037import java.util.concurrent.BlockingQueue; 038import java.util.concurrent.CompletableFuture; 039import java.util.concurrent.ExecutorService; 040import java.util.concurrent.Executors; 041import java.util.concurrent.Future; 042import java.util.concurrent.LinkedBlockingQueue; 043import java.util.concurrent.SynchronousQueue; 044import java.util.concurrent.ThreadFactory; 045import java.util.concurrent.ThreadPoolExecutor; 046import java.util.concurrent.TimeUnit; 047import java.util.concurrent.atomic.AtomicBoolean; 048import java.util.concurrent.atomic.AtomicInteger; 049import java.util.concurrent.atomic.AtomicLong; 050import org.apache.hadoop.conf.Configuration; 051import org.apache.hadoop.hbase.CallQueueTooBigException; 052import org.apache.hadoop.hbase.Cell; 053import org.apache.hadoop.hbase.HBaseClassTestRule; 054import org.apache.hadoop.hbase.HConstants; 055import org.apache.hadoop.hbase.HRegionInfo; 056import org.apache.hadoop.hbase.HRegionLocation; 057import org.apache.hadoop.hbase.RegionLocations; 058import org.apache.hadoop.hbase.ServerName; 059import org.apache.hadoop.hbase.TableName; 060import org.apache.hadoop.hbase.client.AsyncProcessTask.ListRowAccess; 061import org.apache.hadoop.hbase.client.AsyncProcessTask.SubmittedRows; 062import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy; 063import org.apache.hadoop.hbase.client.backoff.ServerStatistics; 064import org.apache.hadoop.hbase.client.coprocessor.Batch; 065import org.apache.hadoop.hbase.exceptions.RegionOpeningException; 066import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 067import org.apache.hadoop.hbase.testclassification.ClientTests; 068import org.apache.hadoop.hbase.testclassification.LargeTests; 069import org.apache.hadoop.hbase.util.Bytes; 070import org.apache.hadoop.hbase.util.Threads; 071import org.junit.Assert; 072import org.junit.Before; 073import org.junit.ClassRule; 074import org.junit.Test; 075import org.junit.experimental.categories.Category; 076import org.mockito.Mockito; 077import org.slf4j.Logger; 078import org.slf4j.LoggerFactory; 079 080@Category({ClientTests.class, LargeTests.class}) 081public class TestAsyncProcess { 082 083 @ClassRule 084 public static final HBaseClassTestRule CLASS_RULE = 085 HBaseClassTestRule.forClass(TestAsyncProcess.class); 086 087 private static final Logger LOG = LoggerFactory.getLogger(TestAsyncProcess.class); 088 private static final TableName DUMMY_TABLE = 089 TableName.valueOf("DUMMY_TABLE"); 090 private static final byte[] DUMMY_BYTES_1 = Bytes.toBytes("DUMMY_BYTES_1"); 091 private static final byte[] DUMMY_BYTES_2 = Bytes.toBytes("DUMMY_BYTES_2"); 092 private static final byte[] DUMMY_BYTES_3 = Bytes.toBytes("DUMMY_BYTES_3"); 093 private static final byte[] FAILS = Bytes.toBytes("FAILS"); 094 private Configuration CONF; 095 private ConnectionConfiguration CONNECTION_CONFIG; 096 private static final ServerName sn = ServerName.valueOf("s1,1,1"); 097 private static final ServerName sn2 = ServerName.valueOf("s2,2,2"); 098 private static final ServerName sn3 = ServerName.valueOf("s3,3,3"); 099 private static final HRegionInfo hri1 = 100 new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_1, DUMMY_BYTES_2, false, 1); 101 private static final HRegionInfo hri2 = 102 new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_2, HConstants.EMPTY_END_ROW, false, 2); 103 private static final HRegionInfo hri3 = 104 new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_3, HConstants.EMPTY_END_ROW, false, 3); 105 private static final HRegionLocation loc1 = new HRegionLocation(hri1, sn); 106 private static final HRegionLocation loc2 = new HRegionLocation(hri2, sn); 107 private static final HRegionLocation loc3 = new HRegionLocation(hri3, sn2); 108 109 // Replica stuff 110 private static final RegionInfo hri1r1 = RegionReplicaUtil.getRegionInfoForReplica(hri1, 1); 111 private static final RegionInfo hri1r2 = RegionReplicaUtil.getRegionInfoForReplica(hri1, 2); 112 private static final RegionInfo hri2r1 = RegionReplicaUtil.getRegionInfoForReplica(hri2, 1); 113 private static final RegionLocations hrls1 = new RegionLocations(new HRegionLocation(hri1, sn), 114 new HRegionLocation(hri1r1, sn2), new HRegionLocation(hri1r2, sn3)); 115 private static final RegionLocations hrls2 = new RegionLocations(new HRegionLocation(hri2, sn2), 116 new HRegionLocation(hri2r1, sn3)); 117 private static final RegionLocations hrls3 = 118 new RegionLocations(new HRegionLocation(hri3, sn3), null); 119 120 private static final String success = "success"; 121 private static Exception failure = new Exception("failure"); 122 123 private static final int NB_RETRIES = 3; 124 125 private int RPC_TIMEOUT; 126 private int OPERATION_TIMEOUT; 127 128 @Before 129 public void beforeEach() { 130 this.CONF = new Configuration(); 131 CONF.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, NB_RETRIES); 132 this.CONNECTION_CONFIG = new ConnectionConfiguration(CONF); 133 this.RPC_TIMEOUT = CONF.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 134 HConstants.DEFAULT_HBASE_RPC_TIMEOUT); 135 this.OPERATION_TIMEOUT = CONF.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 136 HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); 137 } 138 139 static class CountingThreadFactory implements ThreadFactory { 140 final AtomicInteger nbThreads; 141 ThreadFactory realFactory = Threads.newDaemonThreadFactory("test-TestAsyncProcess"); 142 @Override 143 public Thread newThread(Runnable r) { 144 nbThreads.incrementAndGet(); 145 return realFactory.newThread(r); 146 } 147 148 CountingThreadFactory(AtomicInteger nbThreads){ 149 this.nbThreads = nbThreads; 150 } 151 } 152 153 static class MyAsyncProcess extends AsyncProcess { 154 final AtomicInteger nbMultiResponse = new AtomicInteger(); 155 final AtomicInteger nbActions = new AtomicInteger(); 156 public List<AsyncRequestFuture> allReqs = new ArrayList<>(); 157 public AtomicInteger callsCt = new AtomicInteger(); 158 private Configuration conf; 159 160 private long previousTimeout = -1; 161 final ExecutorService service; 162 @Override 163 protected <Res> AsyncRequestFutureImpl<Res> createAsyncRequestFuture( 164 AsyncProcessTask task, List<Action> actions, long nonceGroup) { 165 // Test HTable has tableName of null, so pass DUMMY_TABLE 166 AsyncProcessTask wrap = new AsyncProcessTask(task){ 167 @Override 168 public TableName getTableName() { 169 return DUMMY_TABLE; 170 } 171 }; 172 AsyncRequestFutureImpl<Res> r = new MyAsyncRequestFutureImpl<>( 173 wrap, actions, nonceGroup, this); 174 allReqs.add(r); 175 return r; 176 } 177 178 public MyAsyncProcess(ClusterConnection hc, Configuration conf) { 179 super(hc, conf, 180 new RpcRetryingCallerFactory(conf), new RpcControllerFactory(conf)); 181 service = Executors.newFixedThreadPool(5); 182 this.conf = conf; 183 } 184 185 public MyAsyncProcess(ClusterConnection hc, Configuration conf, AtomicInteger nbThreads) { 186 super(hc, conf, new RpcRetryingCallerFactory(conf), new RpcControllerFactory(conf)); 187 service = new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS, 188 new SynchronousQueue<>(), new CountingThreadFactory(nbThreads)); 189 } 190 191 public <CResult> AsyncRequestFuture submit(ExecutorService pool, TableName tableName, 192 List<? extends Row> rows, boolean atLeastOne, Batch.Callback<CResult> callback, 193 boolean needResults) throws InterruptedIOException { 194 AsyncProcessTask task = AsyncProcessTask.newBuilder(callback) 195 .setPool(pool == null ? service : pool) 196 .setTableName(tableName) 197 .setRowAccess(rows) 198 .setSubmittedRows(atLeastOne ? SubmittedRows.AT_LEAST_ONE : SubmittedRows.NORMAL) 199 .setNeedResults(needResults) 200 .setRpcTimeout(conf.getInt(HConstants.HBASE_RPC_READ_TIMEOUT_KEY, 201 HConstants.DEFAULT_HBASE_RPC_TIMEOUT)) 202 .setOperationTimeout(conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 203 HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT)) 204 .build(); 205 return submit(task); 206 } 207 208 public <CResult> AsyncRequestFuture submit(TableName tableName, 209 final List<? extends Row> rows, boolean atLeastOne, Batch.Callback<CResult> callback, 210 boolean needResults) throws InterruptedIOException { 211 return submit(null, tableName, rows, atLeastOne, callback, needResults); 212 } 213 214 @Override 215 public <Res> AsyncRequestFuture submit(AsyncProcessTask<Res> task) 216 throws InterruptedIOException { 217 previousTimeout = task.getRpcTimeout(); 218 // We use results in tests to check things, so override to always save them. 219 AsyncProcessTask<Res> wrap = new AsyncProcessTask<Res>(task) { 220 @Override 221 public boolean getNeedResults() { 222 return true; 223 } 224 }; 225 return super.submit(wrap); 226 } 227 228 @Override 229 protected RpcRetryingCaller<AbstractResponse> createCaller( 230 CancellableRegionServerCallable callable, int rpcTimeout) { 231 callsCt.incrementAndGet(); 232 MultiServerCallable callable1 = (MultiServerCallable) callable; 233 final MultiResponse mr = createMultiResponse( 234 callable1.getMulti(), nbMultiResponse, nbActions, 235 new ResponseGenerator() { 236 @Override 237 public void addResponse(MultiResponse mr, byte[] regionName, Action a) { 238 if (Arrays.equals(FAILS, a.getAction().getRow())) { 239 mr.add(regionName, a.getOriginalIndex(), failure); 240 } else { 241 mr.add(regionName, a.getOriginalIndex(), success); 242 } 243 } 244 }); 245 246 return new RpcRetryingCallerImpl<AbstractResponse>(100, 500, 10, 9) { 247 @Override 248 public AbstractResponse callWithoutRetries(RetryingCallable<AbstractResponse> callable, 249 int callTimeout) 250 throws IOException, RuntimeException { 251 try { 252 // sleep one second in order for threadpool to start another thread instead of reusing 253 // existing one. 254 Thread.sleep(1000); 255 } catch (InterruptedException e) { 256 // ignore error 257 } 258 return mr; 259 } 260 }; 261 } 262 263 264 } 265 266 static class MyAsyncRequestFutureImpl<Res> extends AsyncRequestFutureImpl<Res> { 267 private final Map<ServerName, List<Long>> heapSizesByServer = new HashMap<>(); 268 public MyAsyncRequestFutureImpl(AsyncProcessTask task, List<Action> actions, 269 long nonceGroup, AsyncProcess asyncProcess) { 270 super(task, actions, nonceGroup, asyncProcess); 271 } 272 273 @Override 274 protected void updateStats(ServerName server, MultiResponse resp) { 275 // Do nothing for avoiding the NPE if we test the ClientBackofPolicy. 276 } 277 278 Map<ServerName, List<Long>> getRequestHeapSize() { 279 return heapSizesByServer; 280 } 281 282 @Override 283 SingleServerRequestRunnable createSingleServerRequest( 284 MultiAction multiAction, int numAttempt, ServerName server, 285 Set<CancellableRegionServerCallable> callsInProgress) { 286 SingleServerRequestRunnable rq = new SingleServerRequestRunnable( 287 multiAction, numAttempt, server, callsInProgress); 288 List<Long> heapCount = heapSizesByServer.get(server); 289 if (heapCount == null) { 290 heapCount = new ArrayList<>(); 291 heapSizesByServer.put(server, heapCount); 292 } 293 heapCount.add(heapSizeOf(multiAction)); 294 return rq; 295 } 296 297 private long heapSizeOf(MultiAction multiAction) { 298 return multiAction.actions.values().stream() 299 .flatMap(v -> v.stream()) 300 .map(action -> action.getAction()) 301 .filter(row -> row instanceof Mutation) 302 .mapToLong(row -> ((Mutation) row).heapSize()) 303 .sum(); 304 } 305 } 306 307 static class CallerWithFailure extends RpcRetryingCallerImpl<AbstractResponse>{ 308 309 private final IOException e; 310 311 public CallerWithFailure(IOException e) { 312 super(100, 500, 100, 9); 313 this.e = e; 314 } 315 316 @Override 317 public AbstractResponse callWithoutRetries(RetryingCallable<AbstractResponse> callable, 318 int callTimeout) 319 throws IOException, RuntimeException { 320 throw e; 321 } 322 } 323 324 325 static class AsyncProcessWithFailure extends MyAsyncProcess { 326 327 private final IOException ioe; 328 329 public AsyncProcessWithFailure(ClusterConnection hc, Configuration conf, IOException ioe) { 330 super(hc, conf); 331 this.ioe = ioe; 332 serverTrackerTimeout = 1L; 333 } 334 335 @Override 336 protected RpcRetryingCaller<AbstractResponse> createCaller( 337 CancellableRegionServerCallable callable, int rpcTimeout) { 338 callsCt.incrementAndGet(); 339 return new CallerWithFailure(ioe); 340 } 341 } 342 343 /** 344 * Make the backoff time always different on each call. 345 */ 346 static class MyClientBackoffPolicy implements ClientBackoffPolicy { 347 private final Map<ServerName, AtomicInteger> count = new HashMap<>(); 348 @Override 349 public long getBackoffTime(ServerName serverName, byte[] region, ServerStatistics stats) { 350 AtomicInteger inc = count.get(serverName); 351 if (inc == null) { 352 inc = new AtomicInteger(0); 353 count.put(serverName, inc); 354 } 355 return inc.getAndIncrement(); 356 } 357 } 358 359 static class MyAsyncProcessWithReplicas extends MyAsyncProcess { 360 private Set<byte[]> failures = new TreeSet<>(new Bytes.ByteArrayComparator()); 361 private long primarySleepMs = 0, replicaSleepMs = 0; 362 private Map<ServerName, Long> customPrimarySleepMs = new HashMap<>(); 363 private final AtomicLong replicaCalls = new AtomicLong(0); 364 365 public void addFailures(RegionInfo... hris) { 366 for (RegionInfo hri : hris) { 367 failures.add(hri.getRegionName()); 368 } 369 } 370 371 public long getReplicaCallCount() { 372 return replicaCalls.get(); 373 } 374 375 public void setPrimaryCallDelay(ServerName server, long primaryMs) { 376 customPrimarySleepMs.put(server, primaryMs); 377 } 378 379 public MyAsyncProcessWithReplicas(ClusterConnection hc, Configuration conf) { 380 super(hc, conf); 381 } 382 383 public void setCallDelays(long primaryMs, long replicaMs) { 384 this.primarySleepMs = primaryMs; 385 this.replicaSleepMs = replicaMs; 386 } 387 388 @Override 389 protected RpcRetryingCaller<AbstractResponse> createCaller( 390 CancellableRegionServerCallable payloadCallable, int rpcTimeout) { 391 MultiServerCallable callable = (MultiServerCallable) payloadCallable; 392 final MultiResponse mr = createMultiResponse( 393 callable.getMulti(), nbMultiResponse, nbActions, new ResponseGenerator() { 394 @Override 395 public void addResponse(MultiResponse mr, byte[] regionName, Action a) { 396 if (failures.contains(regionName)) { 397 mr.add(regionName, a.getOriginalIndex(), failure); 398 } else { 399 boolean isStale = !RegionReplicaUtil.isDefaultReplica(a.getReplicaId()); 400 mr.add(regionName, a.getOriginalIndex(), 401 Result.create(new Cell[0], null, isStale)); 402 } 403 } 404 }); 405 // Currently AsyncProcess either sends all-replica, or all-primary request. 406 final boolean isDefault = RegionReplicaUtil.isDefaultReplica( 407 callable.getMulti().actions.values().iterator().next().iterator().next().getReplicaId()); 408 final ServerName server = ((MultiServerCallable)callable).getServerName(); 409 String debugMsg = "Call to " + server + ", primary=" + isDefault + " with " 410 + callable.getMulti().actions.size() + " entries: "; 411 for (byte[] region : callable.getMulti().actions.keySet()) { 412 debugMsg += "[" + Bytes.toStringBinary(region) + "], "; 413 } 414 LOG.debug(debugMsg); 415 if (!isDefault) { 416 replicaCalls.incrementAndGet(); 417 } 418 419 return new RpcRetryingCallerImpl<AbstractResponse>(100, 500, 10, 9) { 420 @Override 421 public MultiResponse callWithoutRetries(RetryingCallable<AbstractResponse> callable, 422 int callTimeout) 423 throws IOException, RuntimeException { 424 long sleep = -1; 425 if (isDefault) { 426 Long customSleep = customPrimarySleepMs.get(server); 427 sleep = (customSleep == null ? primarySleepMs : customSleep.longValue()); 428 } else { 429 sleep = replicaSleepMs; 430 } 431 if (sleep != 0) { 432 try { 433 Thread.sleep(sleep); 434 } catch (InterruptedException e) { 435 } 436 } 437 return mr; 438 } 439 }; 440 } 441 } 442 443 static MultiResponse createMultiResponse(final MultiAction multi, 444 AtomicInteger nbMultiResponse, AtomicInteger nbActions, ResponseGenerator gen) { 445 final MultiResponse mr = new MultiResponse(); 446 nbMultiResponse.incrementAndGet(); 447 for (Map.Entry<byte[], List<Action>> entry : multi.actions.entrySet()) { 448 byte[] regionName = entry.getKey(); 449 for (Action a : entry.getValue()) { 450 nbActions.incrementAndGet(); 451 gen.addResponse(mr, regionName, a); 452 } 453 } 454 return mr; 455 } 456 457 private static interface ResponseGenerator { 458 void addResponse(final MultiResponse mr, byte[] regionName, Action a); 459 } 460 461 /** 462 * Returns our async process. 463 */ 464 static class MyConnectionImpl extends ConnectionImplementation { 465 public static class TestRegistry extends DoNothingConnectionRegistry { 466 467 public TestRegistry(Configuration conf) { 468 super(conf); 469 } 470 471 @Override 472 public CompletableFuture<String> getClusterId() { 473 return CompletableFuture.completedFuture("testClusterId"); 474 } 475 } 476 477 final AtomicInteger nbThreads = new AtomicInteger(0); 478 479 protected MyConnectionImpl(Configuration conf) throws IOException { 480 super(setupConf(conf), null, null); 481 } 482 483 private static Configuration setupConf(Configuration conf) { 484 conf.setClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, 485 TestRegistry.class, ConnectionRegistry.class); 486 return conf; 487 } 488 489 @Override 490 public RegionLocations locateRegion(TableName tableName, 491 byte[] row, boolean useCache, boolean retry, int replicaId) throws IOException { 492 return new RegionLocations(loc1); 493 } 494 495 @Override 496 public boolean hasCellBlockSupport() { 497 return false; 498 } 499 } 500 501 /** 502 * Returns our async process. 503 */ 504 static class MyConnectionImpl2 extends MyConnectionImpl { 505 List<HRegionLocation> hrl; 506 final boolean usedRegions[]; 507 508 protected MyConnectionImpl2(List<HRegionLocation> hrl, Configuration conf) throws IOException { 509 super(conf); 510 this.hrl = hrl; 511 this.usedRegions = new boolean[hrl.size()]; 512 } 513 514 @Override 515 public RegionLocations locateRegion(TableName tableName, 516 byte[] row, boolean useCache, boolean retry, int replicaId) throws IOException { 517 int i = 0; 518 for (HRegionLocation hr : hrl){ 519 if (Arrays.equals(row, hr.getRegionInfo().getStartKey())) { 520 usedRegions[i] = true; 521 return new RegionLocations(hr); 522 } 523 i++; 524 } 525 return null; 526 } 527 } 528 @Test 529 public void testListRowAccess() { 530 int count = 10; 531 List<String> values = new LinkedList<>(); 532 for (int i = 0; i != count; ++i) { 533 values.add(String.valueOf(i)); 534 } 535 536 ListRowAccess<String> taker = new ListRowAccess(values); 537 assertEquals(count, taker.size()); 538 539 int restoreCount = 0; 540 int takeCount = 0; 541 Iterator<String> it = taker.iterator(); 542 while (it.hasNext()) { 543 String v = it.next(); 544 assertEquals(String.valueOf(takeCount), v); 545 ++takeCount; 546 it.remove(); 547 if (Math.random() >= 0.5) { 548 break; 549 } 550 } 551 assertEquals(count, taker.size() + takeCount); 552 553 it = taker.iterator(); 554 while (it.hasNext()) { 555 String v = it.next(); 556 assertEquals(String.valueOf(takeCount), v); 557 ++takeCount; 558 it.remove(); 559 } 560 assertEquals(0, taker.size()); 561 assertEquals(count, takeCount); 562 } 563 private static long calculateRequestCount(long putSizePerServer, long maxHeapSizePerRequest) { 564 if (putSizePerServer <= maxHeapSizePerRequest) { 565 return 1; 566 } else if (putSizePerServer % maxHeapSizePerRequest == 0) { 567 return putSizePerServer / maxHeapSizePerRequest; 568 } else { 569 return putSizePerServer / maxHeapSizePerRequest + 1; 570 } 571 } 572 573 @Test 574 public void testSubmitSameSizeOfRequest() throws Exception { 575 long writeBuffer = 2 * 1024 * 1024; 576 long putsHeapSize = writeBuffer; 577 doSubmitRequest(writeBuffer, putsHeapSize); 578 } 579 580 @Test 581 public void testSubmitLargeRequestWithUnlimitedSize() throws Exception { 582 long maxHeapSizePerRequest = Long.MAX_VALUE; 583 long putsHeapSize = 2 * 1024 * 1024; 584 doSubmitRequest(maxHeapSizePerRequest, putsHeapSize); 585 } 586 587 @Test 588 public void testSubmitRandomSizeRequest() throws Exception { 589 Random rn = new Random(); 590 final long limit = 10 * 1024 * 1024; 591 final int requestCount = 1 + (int) (rn.nextDouble() * 3); 592 long n = rn.nextLong(); 593 if (n < 0) { 594 n = -n; 595 } else if (n == 0) { 596 n = 1; 597 } 598 long putsHeapSize = n % limit; 599 long maxHeapSizePerRequest = putsHeapSize / requestCount; 600 LOG.info("[testSubmitRandomSizeRequest] maxHeapSizePerRequest=" + maxHeapSizePerRequest + 601 ", putsHeapSize=" + putsHeapSize); 602 doSubmitRequest(maxHeapSizePerRequest, putsHeapSize); 603 } 604 605 @Test 606 public void testSubmitSmallRequest() throws Exception { 607 long maxHeapSizePerRequest = 2 * 1024 * 1024; 608 long putsHeapSize = 100; 609 doSubmitRequest(maxHeapSizePerRequest, putsHeapSize); 610 } 611 612 @Test 613 public void testSubmitLargeRequest() throws Exception { 614 long maxHeapSizePerRequest = 2 * 1024 * 1024; 615 long putsHeapSize = maxHeapSizePerRequest * 2; 616 doSubmitRequest(maxHeapSizePerRequest, putsHeapSize); 617 } 618 619 private void doSubmitRequest(long maxHeapSizePerRequest, long putsHeapSize) throws Exception { 620 ClusterConnection conn = createHConnection(); 621 final String defaultClazz = 622 conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY); 623 final long defaultHeapSizePerRequest = conn.getConfiguration().getLong( 624 SimpleRequestController.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE, 625 SimpleRequestController.DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE); 626 conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY, 627 SimpleRequestController.class.getName()); 628 conn.getConfiguration().setLong(SimpleRequestController.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE, 629 maxHeapSizePerRequest); 630 631 // sn has two regions 632 long putSizeSN = 0; 633 long putSizeSN2 = 0; 634 List<Put> puts = new ArrayList<>(); 635 while ((putSizeSN + putSizeSN2) <= putsHeapSize) { 636 Put put1 = new Put(DUMMY_BYTES_1); 637 put1.addColumn(DUMMY_BYTES_1, DUMMY_BYTES_1, DUMMY_BYTES_1); 638 Put put2 = new Put(DUMMY_BYTES_2); 639 put2.addColumn(DUMMY_BYTES_2, DUMMY_BYTES_2, DUMMY_BYTES_2); 640 Put put3 = new Put(DUMMY_BYTES_3); 641 put3.addColumn(DUMMY_BYTES_3, DUMMY_BYTES_3, DUMMY_BYTES_3); 642 putSizeSN += (put1.heapSize() + put2.heapSize()); 643 putSizeSN2 += put3.heapSize(); 644 puts.add(put1); 645 puts.add(put2); 646 puts.add(put3); 647 } 648 649 int minCountSnRequest = (int) calculateRequestCount(putSizeSN, maxHeapSizePerRequest); 650 int minCountSn2Request = (int) calculateRequestCount(putSizeSN2, maxHeapSizePerRequest); 651 LOG.info("Total put count:" + puts.size() + ", putSizeSN:"+ putSizeSN 652 + ", putSizeSN2:" + putSizeSN2 653 + ", maxHeapSizePerRequest:" + maxHeapSizePerRequest 654 + ", minCountSnRequest:" + minCountSnRequest 655 + ", minCountSn2Request:" + minCountSn2Request); 656 657 MyAsyncProcess ap = new MyAsyncProcess(conn, CONF); 658 BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); 659 try (BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap)) { 660 mutator.mutate(puts); 661 mutator.flush(); 662 List<AsyncRequestFuture> reqs = ap.allReqs; 663 664 int actualSnReqCount = 0; 665 int actualSn2ReqCount = 0; 666 for (AsyncRequestFuture req : reqs) { 667 if (!(req instanceof AsyncRequestFutureImpl)) { 668 continue; 669 } 670 MyAsyncRequestFutureImpl ars = (MyAsyncRequestFutureImpl) req; 671 if (ars.getRequestHeapSize().containsKey(sn)) { 672 ++actualSnReqCount; 673 } 674 if (ars.getRequestHeapSize().containsKey(sn2)) { 675 ++actualSn2ReqCount; 676 } 677 } 678 // If the server is busy, the actual count may be incremented. 679 assertEquals(true, minCountSnRequest <= actualSnReqCount); 680 assertEquals(true, minCountSn2Request <= actualSn2ReqCount); 681 Map<ServerName, Long> sizePerServers = new HashMap<>(); 682 for (AsyncRequestFuture req : reqs) { 683 if (!(req instanceof AsyncRequestFutureImpl)) { 684 continue; 685 } 686 MyAsyncRequestFutureImpl ars = (MyAsyncRequestFutureImpl) req; 687 Map<ServerName, List<Long>> requestHeapSize = ars.getRequestHeapSize(); 688 for (Map.Entry<ServerName, List<Long>> entry : requestHeapSize.entrySet()) { 689 long sum = 0; 690 for (long size : entry.getValue()) { 691 assertEquals(true, size <= maxHeapSizePerRequest); 692 sum += size; 693 } 694 assertEquals(true, sum <= maxHeapSizePerRequest); 695 long value = sizePerServers.getOrDefault(entry.getKey(), 0L); 696 sizePerServers.put(entry.getKey(), value + sum); 697 } 698 } 699 assertEquals(true, sizePerServers.containsKey(sn)); 700 assertEquals(true, sizePerServers.containsKey(sn2)); 701 assertEquals(false, sizePerServers.containsKey(sn3)); 702 assertEquals(putSizeSN, (long) sizePerServers.get(sn)); 703 assertEquals(putSizeSN2, (long) sizePerServers.get(sn2)); 704 } 705 // restore config. 706 conn.getConfiguration().setLong(SimpleRequestController.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE, 707 defaultHeapSizePerRequest); 708 if (defaultClazz != null) { 709 conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY, 710 defaultClazz); 711 } 712 } 713 714 @Test 715 public void testSubmit() throws Exception { 716 ClusterConnection hc = createHConnection(); 717 MyAsyncProcess ap = new MyAsyncProcess(hc, CONF); 718 719 List<Put> puts = new ArrayList<>(1); 720 puts.add(createPut(1, true)); 721 722 ap.submit(null, DUMMY_TABLE, puts, false, null, false); 723 Assert.assertTrue(puts.isEmpty()); 724 } 725 726 @Test 727 public void testSubmitWithCB() throws Exception { 728 ClusterConnection hc = createHConnection(); 729 final AtomicInteger updateCalled = new AtomicInteger(0); 730 Batch.Callback<Object> cb = new Batch.Callback<Object>() { 731 @Override 732 public void update(byte[] region, byte[] row, Object result) { 733 updateCalled.incrementAndGet(); 734 } 735 }; 736 MyAsyncProcess ap = new MyAsyncProcess(hc, CONF); 737 738 List<Put> puts = new ArrayList<>(1); 739 puts.add(createPut(1, true)); 740 741 final AsyncRequestFuture ars = ap.submit(null, DUMMY_TABLE, puts, false, cb, false); 742 Assert.assertTrue(puts.isEmpty()); 743 ars.waitUntilDone(); 744 Assert.assertEquals(1, updateCalled.get()); 745 } 746 747 @Test 748 public void testSubmitBusyRegion() throws Exception { 749 ClusterConnection conn = createHConnection(); 750 final String defaultClazz = 751 conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY); 752 conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY, 753 SimpleRequestController.class.getName()); 754 MyAsyncProcess ap = new MyAsyncProcess(conn, CONF); 755 SimpleRequestController controller = (SimpleRequestController) ap.requestController; 756 List<Put> puts = new ArrayList<>(1); 757 puts.add(createPut(1, true)); 758 759 for (int i = 0; i != controller.maxConcurrentTasksPerRegion; ++i) { 760 ap.incTaskCounters(Collections.singleton(hri1.getRegionName()), sn); 761 } 762 ap.submit(null, DUMMY_TABLE, puts, false, null, false); 763 Assert.assertEquals(puts.size(), 1); 764 765 ap.decTaskCounters(Collections.singleton(hri1.getRegionName()), sn); 766 ap.submit(null, DUMMY_TABLE, puts, false, null, false); 767 Assert.assertEquals(0, puts.size()); 768 if (defaultClazz != null) { 769 conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY, 770 defaultClazz); 771 } 772 } 773 774 775 @Test 776 public void testSubmitBusyRegionServer() throws Exception { 777 ClusterConnection conn = createHConnection(); 778 MyAsyncProcess ap = new MyAsyncProcess(conn, CONF); 779 final String defaultClazz = 780 conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY); 781 conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY, 782 SimpleRequestController.class.getName()); 783 SimpleRequestController controller = (SimpleRequestController) ap.requestController; 784 controller.taskCounterPerServer.put(sn2, 785 new AtomicInteger(controller.maxConcurrentTasksPerServer)); 786 787 List<Put> puts = new ArrayList<>(4); 788 puts.add(createPut(1, true)); 789 puts.add(createPut(3, true)); // <== this one won't be taken, the rs is busy 790 puts.add(createPut(1, true)); // <== this one will make it, the region is already in 791 puts.add(createPut(2, true)); // <== new region, but the rs is ok 792 793 ap.submit(null, DUMMY_TABLE, puts, false, null, false); 794 Assert.assertEquals(" puts=" + puts, 1, puts.size()); 795 796 controller.taskCounterPerServer.put(sn2, 797 new AtomicInteger(controller.maxConcurrentTasksPerServer - 1)); 798 ap.submit(null, DUMMY_TABLE, puts, false, null, false); 799 Assert.assertTrue(puts.isEmpty()); 800 if (defaultClazz != null) { 801 conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY, 802 defaultClazz); 803 } 804 } 805 806 @Test 807 public void testFail() throws Exception { 808 MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF); 809 810 List<Put> puts = new ArrayList<>(1); 811 Put p = createPut(1, false); 812 puts.add(p); 813 814 AsyncRequestFuture ars = ap.submit(null, DUMMY_TABLE, puts, false, null, true); 815 Assert.assertEquals(0, puts.size()); 816 ars.waitUntilDone(); 817 verifyResult(ars, false); 818 Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get()); 819 820 Assert.assertEquals(1, ars.getErrors().exceptions.size()); 821 Assert.assertTrue("was: " + ars.getErrors().exceptions.get(0), 822 failure.equals(ars.getErrors().exceptions.get(0))); 823 Assert.assertTrue("was: " + ars.getErrors().exceptions.get(0), 824 failure.equals(ars.getErrors().exceptions.get(0))); 825 826 Assert.assertEquals(1, ars.getFailedOperations().size()); 827 Assert.assertTrue("was: " + ars.getFailedOperations().get(0), 828 p.equals(ars.getFailedOperations().get(0))); 829 } 830 831 832 @Test 833 public void testSubmitTrue() throws IOException { 834 ClusterConnection conn = createHConnection(); 835 final MyAsyncProcess ap = new MyAsyncProcess(conn, CONF); 836 final String defaultClazz = 837 conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY); 838 conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY, 839 SimpleRequestController.class.getName()); 840 SimpleRequestController controller = (SimpleRequestController) ap.requestController; 841 controller.tasksInProgress.incrementAndGet(); 842 final AtomicInteger ai = new AtomicInteger(controller.maxConcurrentTasksPerRegion); 843 controller.taskCounterPerRegion.put(hri1.getRegionName(), ai); 844 845 final AtomicBoolean checkPoint = new AtomicBoolean(false); 846 final AtomicBoolean checkPoint2 = new AtomicBoolean(false); 847 848 Thread t = new Thread(){ 849 @Override 850 public void run(){ 851 Threads.sleep(1000); 852 Assert.assertFalse(checkPoint.get()); // TODO: this is timing-dependent 853 ai.decrementAndGet(); 854 controller.tasksInProgress.decrementAndGet(); 855 checkPoint2.set(true); 856 } 857 }; 858 859 List<Put> puts = new ArrayList<>(1); 860 Put p = createPut(1, true); 861 puts.add(p); 862 863 ap.submit(null, DUMMY_TABLE, puts, false, null, false); 864 Assert.assertFalse(puts.isEmpty()); 865 866 t.start(); 867 868 ap.submit(null, DUMMY_TABLE, puts, true, null, false); 869 Assert.assertTrue(puts.isEmpty()); 870 871 checkPoint.set(true); 872 while (!checkPoint2.get()){ 873 Threads.sleep(1); 874 } 875 if (defaultClazz != null) { 876 conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY, 877 defaultClazz); 878 } 879 } 880 881 @Test 882 public void testFailAndSuccess() throws Exception { 883 MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF); 884 885 List<Put> puts = new ArrayList<>(3); 886 puts.add(createPut(1, false)); 887 puts.add(createPut(1, true)); 888 puts.add(createPut(1, true)); 889 890 AsyncRequestFuture ars = ap.submit(null, DUMMY_TABLE, puts, false, null, true); 891 Assert.assertTrue(puts.isEmpty()); 892 ars.waitUntilDone(); 893 verifyResult(ars, false, true, true); 894 Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get()); 895 ap.callsCt.set(0); 896 Assert.assertEquals(1, ars.getErrors().actions.size()); 897 898 puts.add(createPut(1, true)); 899 // Wait for AP to be free. While ars might have the result, ap counters are decreased later. 900 ap.waitForMaximumCurrentTasks(0, null); 901 ars = ap.submit(null, DUMMY_TABLE, puts, false, null, true); 902 Assert.assertEquals(0, puts.size()); 903 ars.waitUntilDone(); 904 Assert.assertEquals(1, ap.callsCt.get()); 905 verifyResult(ars, true); 906 } 907 908 @Test 909 public void testFlush() throws Exception { 910 MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF); 911 912 List<Put> puts = new ArrayList<>(3); 913 puts.add(createPut(1, false)); 914 puts.add(createPut(1, true)); 915 puts.add(createPut(1, true)); 916 917 AsyncRequestFuture ars = ap.submit(null, DUMMY_TABLE, puts, false, null, true); 918 ars.waitUntilDone(); 919 verifyResult(ars, false, true, true); 920 Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get()); 921 922 Assert.assertEquals(1, ars.getFailedOperations().size()); 923 } 924 925 @Test 926 public void testTaskCountWithoutClientBackoffPolicy() throws IOException, InterruptedException { 927 ClusterConnection hc = createHConnection(); 928 MyAsyncProcess ap = new MyAsyncProcess(hc, CONF); 929 testTaskCount(ap); 930 } 931 932 @Test 933 public void testTaskCountWithClientBackoffPolicy() throws IOException, InterruptedException { 934 Configuration copyConf = new Configuration(CONF); 935 copyConf.setBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE, true); 936 MyClientBackoffPolicy bp = new MyClientBackoffPolicy(); 937 ClusterConnection conn = createHConnection(); 938 Mockito.when(conn.getConfiguration()).thenReturn(copyConf); 939 Mockito.when(conn.getStatisticsTracker()).thenReturn(ServerStatisticTracker.create(copyConf)); 940 Mockito.when(conn.getBackoffPolicy()).thenReturn(bp); 941 final String defaultClazz = 942 conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY); 943 conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY, 944 SimpleRequestController.class.getName()); 945 MyAsyncProcess ap = new MyAsyncProcess(conn, copyConf); 946 testTaskCount(ap); 947 if (defaultClazz != null) { 948 conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY, 949 defaultClazz); 950 } 951 } 952 953 private void testTaskCount(MyAsyncProcess ap) 954 throws InterruptedIOException, InterruptedException { 955 SimpleRequestController controller = (SimpleRequestController) ap.requestController; 956 List<Put> puts = new ArrayList<>(); 957 for (int i = 0; i != 3; ++i) { 958 puts.add(createPut(1, true)); 959 puts.add(createPut(2, true)); 960 puts.add(createPut(3, true)); 961 } 962 ap.submit(null, DUMMY_TABLE, puts, true, null, false); 963 ap.waitForMaximumCurrentTasks(0, null); 964 // More time to wait if there are incorrect task count. 965 TimeUnit.SECONDS.sleep(1); 966 assertEquals(0, controller.tasksInProgress.get()); 967 for (AtomicInteger count : controller.taskCounterPerRegion.values()) { 968 assertEquals(0, count.get()); 969 } 970 for (AtomicInteger count : controller.taskCounterPerServer.values()) { 971 assertEquals(0, count.get()); 972 } 973 } 974 975 @Test 976 public void testMaxTask() throws Exception { 977 ClusterConnection conn = createHConnection(); 978 final String defaultClazz = 979 conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY); 980 conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY, 981 SimpleRequestController.class.getName()); 982 final MyAsyncProcess ap = new MyAsyncProcess(conn, CONF); 983 SimpleRequestController controller = (SimpleRequestController) ap.requestController; 984 985 986 for (int i = 0; i < 1000; i++) { 987 ap.incTaskCounters(Collections.singleton(Bytes.toBytes("dummy")), sn); 988 } 989 990 final Thread myThread = Thread.currentThread(); 991 992 Thread t = new Thread() { 993 @Override 994 public void run() { 995 Threads.sleep(2000); 996 myThread.interrupt(); 997 } 998 }; 999 1000 List<Put> puts = new ArrayList<>(1); 1001 puts.add(createPut(1, true)); 1002 1003 t.start(); 1004 1005 try { 1006 ap.submit(null, DUMMY_TABLE, puts, false, null, false); 1007 Assert.fail("We should have been interrupted."); 1008 } catch (InterruptedIOException expected) { 1009 } 1010 1011 final long sleepTime = 2000; 1012 1013 Thread t2 = new Thread() { 1014 @Override 1015 public void run() { 1016 Threads.sleep(sleepTime); 1017 while (controller.tasksInProgress.get() > 0) { 1018 ap.decTaskCounters(Collections.singleton(Bytes.toBytes("dummy")), sn); 1019 } 1020 } 1021 }; 1022 t2.start(); 1023 1024 long start = System.currentTimeMillis(); 1025 ap.submit(null, DUMMY_TABLE, new ArrayList<>(), false, null, false); 1026 long end = System.currentTimeMillis(); 1027 1028 //Adds 100 to secure us against approximate timing. 1029 Assert.assertTrue(start + 100L + sleepTime > end); 1030 if (defaultClazz != null) { 1031 conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY, 1032 defaultClazz); 1033 } 1034 } 1035 1036 private ClusterConnection createHConnection() throws IOException { 1037 ClusterConnection hc = createHConnectionCommon(); 1038 setMockLocation(hc, DUMMY_BYTES_1, new RegionLocations(loc1)); 1039 setMockLocation(hc, DUMMY_BYTES_2, new RegionLocations(loc2)); 1040 setMockLocation(hc, DUMMY_BYTES_3, new RegionLocations(loc3)); 1041 Mockito.when(hc.locateRegions(Mockito.eq(DUMMY_TABLE), Mockito.anyBoolean(), 1042 Mockito.anyBoolean())).thenReturn(Arrays.asList(loc1, loc2, loc3)); 1043 setMockLocation(hc, FAILS, new RegionLocations(loc2)); 1044 return hc; 1045 } 1046 1047 private ClusterConnection createHConnectionWithReplicas() throws IOException { 1048 ClusterConnection hc = createHConnectionCommon(); 1049 setMockLocation(hc, DUMMY_BYTES_1, hrls1); 1050 setMockLocation(hc, DUMMY_BYTES_2, hrls2); 1051 setMockLocation(hc, DUMMY_BYTES_3, hrls3); 1052 List<HRegionLocation> locations = new ArrayList<>(); 1053 for (HRegionLocation loc : hrls1.getRegionLocations()) { 1054 locations.add(loc); 1055 } 1056 for (HRegionLocation loc : hrls2.getRegionLocations()) { 1057 locations.add(loc); 1058 } 1059 for (HRegionLocation loc : hrls3.getRegionLocations()) { 1060 locations.add(loc); 1061 } 1062 Mockito.when(hc.locateRegions(Mockito.eq(DUMMY_TABLE), Mockito.anyBoolean(), 1063 Mockito.anyBoolean())).thenReturn(locations); 1064 return hc; 1065 } 1066 1067 private static void setMockLocation(ClusterConnection hc, byte[] row, 1068 RegionLocations result) throws IOException { 1069 Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE), Mockito.eq(row), 1070 Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyInt())).thenReturn(result); 1071 Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE), Mockito.eq(row), 1072 Mockito.anyBoolean(), Mockito.anyBoolean())).thenReturn(result); 1073 } 1074 1075 private ClusterConnection createHConnectionCommon() { 1076 ClusterConnection hc = Mockito.mock(ClusterConnection.class); 1077 NonceGenerator ng = Mockito.mock(NonceGenerator.class); 1078 Mockito.when(ng.getNonceGroup()).thenReturn(HConstants.NO_NONCE); 1079 Mockito.when(hc.getNonceGenerator()).thenReturn(ng); 1080 Mockito.when(hc.getConfiguration()).thenReturn(CONF); 1081 Mockito.when(hc.getConnectionConfiguration()).thenReturn(CONNECTION_CONFIG); 1082 return hc; 1083 } 1084 1085 @Test 1086 public void testHTablePutSuccess() throws Exception { 1087 ClusterConnection conn = createHConnection(); 1088 MyAsyncProcess ap = new MyAsyncProcess(conn, CONF); 1089 BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); 1090 BufferedMutatorImpl ht = new BufferedMutatorImpl(conn, bufferParam, ap); 1091 1092 Put put = createPut(1, true); 1093 1094 Assert.assertEquals(conn.getConnectionConfiguration().getWriteBufferSize(), 1095 ht.getWriteBufferSize()); 1096 Assert.assertEquals(0, ht.getCurrentWriteBufferSize()); 1097 ht.mutate(put); 1098 ht.flush(); 1099 Assert.assertEquals(0, ht.getCurrentWriteBufferSize()); 1100 } 1101 1102 @Test 1103 public void testSettingWriteBufferPeriodicFlushParameters() throws Exception { 1104 ClusterConnection conn = createHConnection(); 1105 MyAsyncProcess ap = new MyAsyncProcess(conn, CONF); 1106 1107 checkPeriodicFlushParameters(conn, ap, 1108 1234, 1234, 1109 1234, 1234); 1110 checkPeriodicFlushParameters(conn, ap, 1111 0, 0, 1112 0, BufferedMutator.MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS); 1113 checkPeriodicFlushParameters(conn, ap, 1114 -1234, 0, 1115 -1234, BufferedMutator.MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS); 1116 checkPeriodicFlushParameters(conn, ap, 1117 1, 1, 1118 1, BufferedMutator.MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS); 1119 } 1120 1121 private void checkPeriodicFlushParameters(ClusterConnection conn, 1122 MyAsyncProcess ap, 1123 long setTO, long expectTO, 1124 long setTT, long expectTT 1125 ) { 1126 BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); 1127 1128 // The BufferedMutatorParams does nothing with the value 1129 bufferParam.setWriteBufferPeriodicFlushTimeoutMs(setTO); 1130 bufferParam.setWriteBufferPeriodicFlushTimerTickMs(setTT); 1131 Assert.assertEquals(setTO, bufferParam.getWriteBufferPeriodicFlushTimeoutMs()); 1132 Assert.assertEquals(setTT, bufferParam.getWriteBufferPeriodicFlushTimerTickMs()); 1133 1134 // The BufferedMutatorImpl corrects illegal values (indirect via BufferedMutatorParams) 1135 BufferedMutatorImpl ht1 = new BufferedMutatorImpl(conn, bufferParam, ap); 1136 Assert.assertEquals(expectTO, ht1.getWriteBufferPeriodicFlushTimeoutMs()); 1137 Assert.assertEquals(expectTT, ht1.getWriteBufferPeriodicFlushTimerTickMs()); 1138 1139 // The BufferedMutatorImpl corrects illegal values (direct via setter) 1140 BufferedMutatorImpl ht2 = 1141 new BufferedMutatorImpl(conn, createBufferedMutatorParams(ap, DUMMY_TABLE), ap); 1142 ht2.setWriteBufferPeriodicFlush(setTO, setTT); 1143 Assert.assertEquals(expectTO, ht2.getWriteBufferPeriodicFlushTimeoutMs()); 1144 Assert.assertEquals(expectTT, ht2.getWriteBufferPeriodicFlushTimerTickMs()); 1145 1146 } 1147 1148 @Test 1149 public void testWriteBufferPeriodicFlushTimeoutMs() throws Exception { 1150 ClusterConnection conn = createHConnection(); 1151 MyAsyncProcess ap = new MyAsyncProcess(conn, CONF); 1152 BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); 1153 1154 bufferParam.setWriteBufferPeriodicFlushTimeoutMs(1); // Flush ASAP 1155 bufferParam.setWriteBufferPeriodicFlushTimerTickMs(1); // Check every 100ms 1156 bufferParam.writeBufferSize(10000); // Write buffer set to much larger than the single record 1157 1158 BufferedMutatorImpl ht = new BufferedMutatorImpl(conn, bufferParam, ap); 1159 1160 // Verify if BufferedMutator has the right settings. 1161 Assert.assertEquals(10000, ht.getWriteBufferSize()); 1162 Assert.assertEquals(1, ht.getWriteBufferPeriodicFlushTimeoutMs()); 1163 Assert.assertEquals(BufferedMutator.MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS, 1164 ht.getWriteBufferPeriodicFlushTimerTickMs()); 1165 1166 Put put = createPut(1, true); 1167 1168 Assert.assertEquals(0, ht.getExecutedWriteBufferPeriodicFlushes()); 1169 Assert.assertEquals(0, ht.getCurrentWriteBufferSize()); 1170 1171 // ----- Insert, flush immediately, MUST NOT flush automatically 1172 ht.mutate(put); 1173 ht.flush(); 1174 1175 Thread.sleep(1000); 1176 Assert.assertEquals(0, ht.getExecutedWriteBufferPeriodicFlushes()); 1177 Assert.assertEquals(0, ht.getCurrentWriteBufferSize()); 1178 1179 // ----- Insert, NO flush, MUST flush automatically 1180 ht.mutate(put); 1181 Assert.assertEquals(0, ht.getExecutedWriteBufferPeriodicFlushes()); 1182 Assert.assertTrue(ht.getCurrentWriteBufferSize() > 0); 1183 1184 // The timerTick should fire every 100ms, so after twice that we must have 1185 // seen at least 1 tick and we should see an automatic flush 1186 Thread.sleep(200); 1187 Assert.assertEquals(1, ht.getExecutedWriteBufferPeriodicFlushes()); 1188 Assert.assertEquals(0, ht.getCurrentWriteBufferSize()); 1189 1190 // Ensure it does not flush twice 1191 Thread.sleep(200); 1192 Assert.assertEquals(1, ht.getExecutedWriteBufferPeriodicFlushes()); 1193 Assert.assertEquals(0, ht.getCurrentWriteBufferSize()); 1194 1195 // ----- DISABLE AUTO FLUSH, Insert, NO flush, MUST NOT flush automatically 1196 ht.disableWriteBufferPeriodicFlush(); 1197 ht.mutate(put); 1198 Assert.assertEquals(1, ht.getExecutedWriteBufferPeriodicFlushes()); 1199 Assert.assertTrue(ht.getCurrentWriteBufferSize() > 0); 1200 1201 // Wait for at least 1 timerTick, we should see NO flushes. 1202 Thread.sleep(200); 1203 Assert.assertEquals(1, ht.getExecutedWriteBufferPeriodicFlushes()); 1204 Assert.assertTrue(ht.getCurrentWriteBufferSize() > 0); 1205 1206 // Reenable periodic flushing, a flush seems to take about 1 second 1207 // so we wait for 2 seconds and it should have finished the flush. 1208 ht.setWriteBufferPeriodicFlush(1, 100); 1209 Thread.sleep(2000); 1210 Assert.assertEquals(2, ht.getExecutedWriteBufferPeriodicFlushes()); 1211 Assert.assertEquals(0, ht.getCurrentWriteBufferSize()); 1212 } 1213 1214 1215 @Test 1216 public void testBufferedMutatorImplWithSharedPool() throws Exception { 1217 ClusterConnection conn = createHConnection(); 1218 MyAsyncProcess ap = new MyAsyncProcess(conn, CONF); 1219 BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); 1220 BufferedMutator ht = new BufferedMutatorImpl(conn, bufferParam, ap); 1221 1222 ht.close(); 1223 assertFalse(ap.service.isShutdown()); 1224 } 1225 1226 @Test 1227 public void testFailedPutAndNewPut() throws Exception { 1228 ClusterConnection conn = createHConnection(); 1229 MyAsyncProcess ap = new MyAsyncProcess(conn, CONF); 1230 BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE) 1231 .writeBufferSize(0); 1232 BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap); 1233 1234 Put p = createPut(1, false); 1235 try { 1236 mutator.mutate(p); 1237 Assert.fail(); 1238 } catch (RetriesExhaustedWithDetailsException expected) { 1239 assertEquals(1, expected.getNumExceptions()); 1240 assertTrue(expected.getRow(0) == p); 1241 } 1242 // Let's do all the retries. 1243 ap.waitForMaximumCurrentTasks(0, null); 1244 Assert.assertEquals(0, mutator.size()); 1245 1246 // There is no global error so the new put should not fail 1247 mutator.mutate(createPut(1, true)); 1248 Assert.assertEquals("the put should not been inserted.", 0, mutator.size()); 1249 } 1250 1251 @SuppressWarnings("SelfComparison") 1252 @Test 1253 public void testAction() { 1254 Action action_0 = new Action(new Put(Bytes.toBytes("abc")), 10); 1255 Action action_1 = new Action(new Put(Bytes.toBytes("ccc")), 10); 1256 Action action_2 = new Action(new Put(Bytes.toBytes("ccc")), 10); 1257 Action action_3 = new Action(new Delete(Bytes.toBytes("ccc")), 10); 1258 assertFalse(action_0.equals(action_1)); 1259 assertTrue(action_0.equals(action_0)); 1260 assertTrue(action_1.equals(action_2)); 1261 assertTrue(action_2.equals(action_1)); 1262 assertFalse(action_0.equals(new Put(Bytes.toBytes("abc")))); 1263 assertTrue(action_2.equals(action_3)); 1264 assertFalse(action_0.equals(action_3)); 1265 assertEquals(0, action_0.compareTo(action_0)); 1266 assertTrue(action_0.compareTo(action_1) < 0); 1267 assertTrue(action_1.compareTo(action_0) > 0); 1268 assertEquals(0, action_1.compareTo(action_2)); 1269 } 1270 1271 @Test 1272 public void testBatch() throws IOException, InterruptedException { 1273 ClusterConnection conn = new MyConnectionImpl(CONF); 1274 HTable ht = (HTable) conn.getTable(DUMMY_TABLE); 1275 ht.multiAp = new MyAsyncProcess(conn, CONF); 1276 1277 List<Put> puts = new ArrayList<>(7); 1278 puts.add(createPut(1, true)); 1279 puts.add(createPut(1, true)); 1280 puts.add(createPut(1, true)); 1281 puts.add(createPut(1, true)); 1282 puts.add(createPut(1, false)); // <=== the bad apple, position 4 1283 puts.add(createPut(1, true)); 1284 puts.add(createPut(1, false)); // <=== another bad apple, position 6 1285 1286 Object[] res = new Object[puts.size()]; 1287 try { 1288 ht.batch(puts, res); 1289 Assert.fail(); 1290 } catch (RetriesExhaustedException expected) { 1291 } 1292 1293 Assert.assertEquals(success, res[0]); 1294 Assert.assertEquals(success, res[1]); 1295 Assert.assertEquals(success, res[2]); 1296 Assert.assertEquals(success, res[3]); 1297 Assert.assertEquals(failure, res[4]); 1298 Assert.assertEquals(success, res[5]); 1299 Assert.assertEquals(failure, res[6]); 1300 } 1301 @Test 1302 public void testErrorsServers() throws IOException { 1303 Configuration configuration = new Configuration(CONF); 1304 ClusterConnection conn = new MyConnectionImpl(configuration); 1305 MyAsyncProcess ap = new MyAsyncProcess(conn, configuration); 1306 BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); 1307 BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap); 1308 configuration.setBoolean(ConnectionImplementation.RETRIES_BY_SERVER_KEY, true); 1309 1310 Assert.assertNotNull(ap.createServerErrorTracker()); 1311 Assert.assertTrue(ap.serverTrackerTimeout > 200L); 1312 ap.serverTrackerTimeout = 1L; 1313 1314 Put p = createPut(1, false); 1315 mutator.mutate(p); 1316 1317 try { 1318 mutator.flush(); 1319 Assert.fail(); 1320 } catch (RetriesExhaustedWithDetailsException expected) { 1321 assertEquals(1, expected.getNumExceptions()); 1322 assertTrue(expected.getRow(0) == p); 1323 } 1324 // Checking that the ErrorsServers came into play and didn't make us stop immediately 1325 Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get()); 1326 } 1327 1328 @Test 1329 public void testReadAndWriteTimeout() throws IOException { 1330 final long readTimeout = 10 * 1000; 1331 final long writeTimeout = 20 * 1000; 1332 Configuration copyConf = new Configuration(CONF); 1333 copyConf.setLong(HConstants.HBASE_RPC_READ_TIMEOUT_KEY, readTimeout); 1334 copyConf.setLong(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, writeTimeout); 1335 ClusterConnection conn = new MyConnectionImpl(copyConf); 1336 MyAsyncProcess ap = new MyAsyncProcess(conn, copyConf); 1337 try (HTable ht = (HTable) conn.getTable(DUMMY_TABLE)) { 1338 ht.multiAp = ap; 1339 List<Get> gets = new LinkedList<>(); 1340 gets.add(new Get(DUMMY_BYTES_1)); 1341 gets.add(new Get(DUMMY_BYTES_2)); 1342 try { 1343 ht.get(gets); 1344 } catch (ClassCastException e) { 1345 // No result response on this test. 1346 } 1347 assertEquals(readTimeout, ap.previousTimeout); 1348 ap.previousTimeout = -1; 1349 1350 try { 1351 ht.existsAll(gets); 1352 } catch (ClassCastException e) { 1353 // No result response on this test. 1354 } 1355 assertEquals(readTimeout, ap.previousTimeout); 1356 ap.previousTimeout = -1; 1357 1358 List<Delete> deletes = new LinkedList<>(); 1359 deletes.add(new Delete(DUMMY_BYTES_1)); 1360 deletes.add(new Delete(DUMMY_BYTES_2)); 1361 ht.delete(deletes); 1362 assertEquals(writeTimeout, ap.previousTimeout); 1363 } 1364 } 1365 1366 @Test 1367 public void testErrors() throws IOException { 1368 ClusterConnection conn = new MyConnectionImpl(CONF); 1369 AsyncProcessWithFailure ap = new AsyncProcessWithFailure(conn, CONF, new IOException("test")); 1370 BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); 1371 BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap); 1372 1373 Assert.assertNotNull(ap.createServerErrorTracker()); 1374 1375 Put p = createPut(1, true); 1376 mutator.mutate(p); 1377 1378 try { 1379 mutator.flush(); 1380 Assert.fail(); 1381 } catch (RetriesExhaustedWithDetailsException expected) { 1382 assertEquals(1, expected.getNumExceptions()); 1383 assertTrue(expected.getRow(0) == p); 1384 } 1385 // Checking that the ErrorsServers came into play and didn't make us stop immediately 1386 Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get()); 1387 } 1388 1389 1390 @Test 1391 public void testCallQueueTooLarge() throws IOException { 1392 ClusterConnection conn = new MyConnectionImpl(CONF); 1393 AsyncProcessWithFailure ap = 1394 new AsyncProcessWithFailure(conn, CONF, new CallQueueTooBigException()); 1395 BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); 1396 BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap); 1397 Assert.assertNotNull(ap.createServerErrorTracker()); 1398 Put p = createPut(1, true); 1399 mutator.mutate(p); 1400 1401 try { 1402 mutator.flush(); 1403 Assert.fail(); 1404 } catch (RetriesExhaustedWithDetailsException expected) { 1405 assertEquals(1, expected.getNumExceptions()); 1406 assertTrue(expected.getRow(0) == p); 1407 } 1408 // Checking that the ErrorsServers came into play and didn't make us stop immediately 1409 Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get()); 1410 } 1411 /** 1412 * This test simulates multiple regions on 2 servers. We should have 2 multi requests and 1413 * 2 threads: 1 per server, this whatever the number of regions. 1414 */ 1415 @Test 1416 public void testThreadCreation() throws Exception { 1417 final int NB_REGS = 100; 1418 List<HRegionLocation> hrls = new ArrayList<>(NB_REGS); 1419 List<Get> gets = new ArrayList<>(NB_REGS); 1420 for (int i = 0; i < NB_REGS; i++) { 1421 HRegionInfo hri = new HRegionInfo( 1422 DUMMY_TABLE, Bytes.toBytes(i * 10L), Bytes.toBytes(i * 10L + 9L), false, i); 1423 HRegionLocation hrl = new HRegionLocation(hri, i % 2 == 0 ? sn : sn2); 1424 hrls.add(hrl); 1425 1426 Get get = new Get(Bytes.toBytes(i * 10L)); 1427 gets.add(get); 1428 } 1429 1430 MyConnectionImpl2 con = new MyConnectionImpl2(hrls, CONF); 1431 MyAsyncProcess ap = new MyAsyncProcess(con, CONF, con.nbThreads); 1432 HTable ht = (HTable) con.getTable(DUMMY_TABLE, ap.service); 1433 ht.multiAp = ap; 1434 ht.batch(gets, null); 1435 1436 Assert.assertEquals(NB_REGS, ap.nbActions.get()); 1437 Assert.assertEquals("1 multi response per server", 2, ap.nbMultiResponse.get()); 1438 Assert.assertEquals("1 thread per server", 2, con.nbThreads.get()); 1439 1440 int nbReg = 0; 1441 for (int i =0; i<NB_REGS; i++){ 1442 if (con.usedRegions[i]) nbReg++; 1443 } 1444 Assert.assertEquals("nbReg=" + nbReg, NB_REGS, nbReg); 1445 } 1446 1447 @Test 1448 public void testReplicaReplicaSuccess() throws Exception { 1449 // Main call takes too long so replicas succeed, except for one region w/o replicas. 1450 // One region has no replica, so the main call succeeds for it. 1451 MyAsyncProcessWithReplicas ap = createReplicaAp(10, 1000, 0); 1452 List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2, DUMMY_BYTES_3); 1453 AsyncProcessTask task = AsyncProcessTask.newBuilder() 1454 .setPool(ap.service) 1455 .setRpcTimeout(RPC_TIMEOUT) 1456 .setOperationTimeout(OPERATION_TIMEOUT) 1457 .setTableName(DUMMY_TABLE) 1458 .setRowAccess(rows) 1459 .setResults(new Object[3]) 1460 .setSubmittedRows(SubmittedRows.ALL) 1461 .build(); 1462 AsyncRequestFuture ars = ap.submit(task); 1463 verifyReplicaResult(ars, RR.TRUE, RR.TRUE, RR.FALSE); 1464 Assert.assertEquals(2, ap.getReplicaCallCount()); 1465 } 1466 1467 @Test 1468 public void testReplicaPrimarySuccessWoReplicaCalls() throws Exception { 1469 // Main call succeeds before replica calls are kicked off. 1470 MyAsyncProcessWithReplicas ap = createReplicaAp(1000, 10, 0); 1471 List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2, DUMMY_BYTES_3); 1472 AsyncProcessTask task = AsyncProcessTask.newBuilder() 1473 .setPool(ap.service) 1474 .setRpcTimeout(RPC_TIMEOUT) 1475 .setOperationTimeout(OPERATION_TIMEOUT) 1476 .setTableName(DUMMY_TABLE) 1477 .setRowAccess(rows) 1478 .setResults(new Object[3]) 1479 .setSubmittedRows(SubmittedRows.ALL) 1480 .build(); 1481 AsyncRequestFuture ars = ap.submit(task); 1482 verifyReplicaResult(ars, RR.FALSE, RR.FALSE, RR.FALSE); 1483 Assert.assertEquals(0, ap.getReplicaCallCount()); 1484 } 1485 1486 @Test 1487 public void testReplicaParallelCallsSucceed() throws Exception { 1488 // Either main or replica can succeed. 1489 MyAsyncProcessWithReplicas ap = createReplicaAp(0, 0, 0); 1490 List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2); 1491 AsyncProcessTask task = AsyncProcessTask.newBuilder() 1492 .setPool(ap.service) 1493 .setRpcTimeout(RPC_TIMEOUT) 1494 .setOperationTimeout(OPERATION_TIMEOUT) 1495 .setTableName(DUMMY_TABLE) 1496 .setRowAccess(rows) 1497 .setResults(new Object[2]) 1498 .setSubmittedRows(SubmittedRows.ALL) 1499 .build(); 1500 AsyncRequestFuture ars = ap.submit(task); 1501 verifyReplicaResult(ars, RR.DONT_CARE, RR.DONT_CARE); 1502 long replicaCalls = ap.getReplicaCallCount(); 1503 Assert.assertTrue(replicaCalls >= 0); 1504 Assert.assertTrue(replicaCalls <= 2); 1505 } 1506 1507 @Test 1508 public void testReplicaPartialReplicaCall() throws Exception { 1509 // One server is slow, so the result for its region comes from replica, whereas 1510 // the result for other region comes from primary before replica calls happen. 1511 // There should be no replica call for that region at all. 1512 MyAsyncProcessWithReplicas ap = createReplicaAp(1000, 0, 0); 1513 ap.setPrimaryCallDelay(sn2, 2000); 1514 List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2); 1515 AsyncProcessTask task = AsyncProcessTask.newBuilder() 1516 .setPool(ap.service) 1517 .setRpcTimeout(RPC_TIMEOUT) 1518 .setOperationTimeout(OPERATION_TIMEOUT) 1519 .setTableName(DUMMY_TABLE) 1520 .setRowAccess(rows) 1521 .setResults(new Object[2]) 1522 .setSubmittedRows(SubmittedRows.ALL) 1523 .build(); 1524 AsyncRequestFuture ars = ap.submit(task); 1525 verifyReplicaResult(ars, RR.FALSE, RR.TRUE); 1526 Assert.assertEquals(1, ap.getReplicaCallCount()); 1527 } 1528 1529 @Test 1530 public void testReplicaMainFailsBeforeReplicaCalls() throws Exception { 1531 // Main calls fail before replica calls can start - this is currently not handled. 1532 // It would probably never happen if we can get location (due to retries), 1533 // and it would require additional synchronization. 1534 MyAsyncProcessWithReplicas ap = createReplicaAp(1000, 0, 0, 0); 1535 ap.addFailures(hri1, hri2); 1536 List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2); 1537 AsyncProcessTask task = AsyncProcessTask.newBuilder() 1538 .setPool(ap.service) 1539 .setRpcTimeout(RPC_TIMEOUT) 1540 .setOperationTimeout(OPERATION_TIMEOUT) 1541 .setTableName(DUMMY_TABLE) 1542 .setRowAccess(rows) 1543 .setResults(new Object[2]) 1544 .setSubmittedRows(SubmittedRows.ALL) 1545 .build(); 1546 AsyncRequestFuture ars = ap.submit(task); 1547 verifyReplicaResult(ars, RR.FAILED, RR.FAILED); 1548 Assert.assertEquals(0, ap.getReplicaCallCount()); 1549 } 1550 1551 @Test 1552 public void testReplicaReplicaSuccessWithParallelFailures() throws Exception { 1553 // Main calls fails after replica calls start. For two-replica region, one replica call 1554 // also fails. Regardless, we get replica results for both regions. 1555 MyAsyncProcessWithReplicas ap = createReplicaAp(0, 1000, 1000, 0); 1556 ap.addFailures(hri1, hri1r2, hri2); 1557 List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2); 1558 AsyncProcessTask task = AsyncProcessTask.newBuilder() 1559 .setPool(ap.service) 1560 .setRpcTimeout(RPC_TIMEOUT) 1561 .setOperationTimeout(OPERATION_TIMEOUT) 1562 .setTableName(DUMMY_TABLE) 1563 .setRowAccess(rows) 1564 .setResults(new Object[2]) 1565 .setSubmittedRows(SubmittedRows.ALL) 1566 .build(); 1567 AsyncRequestFuture ars = ap.submit(task); 1568 verifyReplicaResult(ars, RR.TRUE, RR.TRUE); 1569 Assert.assertEquals(2, ap.getReplicaCallCount()); 1570 } 1571 1572 @Test 1573 public void testReplicaAllCallsFailForOneRegion() throws Exception { 1574 // For one of the region, all 3, main and replica, calls fail. For the other, replica 1575 // call fails but its exception should not be visible as it did succeed. 1576 MyAsyncProcessWithReplicas ap = createReplicaAp(500, 1000, 0, 0); 1577 ap.addFailures(hri1, hri1r1, hri1r2, hri2r1); 1578 List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2); 1579 AsyncProcessTask task = AsyncProcessTask.newBuilder() 1580 .setPool(ap.service) 1581 .setRpcTimeout(RPC_TIMEOUT) 1582 .setOperationTimeout(OPERATION_TIMEOUT) 1583 .setTableName(DUMMY_TABLE) 1584 .setRowAccess(rows) 1585 .setResults(new Object[2]) 1586 .setSubmittedRows(SubmittedRows.ALL) 1587 .build(); 1588 AsyncRequestFuture ars = ap.submit(task); 1589 verifyReplicaResult(ars, RR.FAILED, RR.FALSE); 1590 // We should get 3 exceptions, for main + 2 replicas for DUMMY_BYTES_1 1591 Assert.assertEquals(3, ars.getErrors().getNumExceptions()); 1592 for (int i = 0; i < ars.getErrors().getNumExceptions(); ++i) { 1593 Assert.assertArrayEquals(DUMMY_BYTES_1, ars.getErrors().getRow(i).getRow()); 1594 } 1595 } 1596 1597 private MyAsyncProcessWithReplicas createReplicaAp( 1598 int replicaAfterMs, int primaryMs, int replicaMs) throws Exception { 1599 return createReplicaAp(replicaAfterMs, primaryMs, replicaMs, -1); 1600 } 1601 1602 private MyAsyncProcessWithReplicas createReplicaAp( 1603 int replicaAfterMs, int primaryMs, int replicaMs, int retries) throws Exception { 1604 // TODO: this is kind of timing dependent... perhaps it should detect from createCaller 1605 // that the replica call has happened and that way control the ordering. 1606 Configuration conf = new Configuration(); 1607 ClusterConnection conn = createHConnectionWithReplicas(); 1608 conf.setInt(AsyncProcess.PRIMARY_CALL_TIMEOUT_KEY, replicaAfterMs * 1000); 1609 if (retries >= 0) { 1610 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, retries); 1611 } 1612 MyAsyncProcessWithReplicas ap = new MyAsyncProcessWithReplicas(conn, conf); 1613 ap.setCallDelays(primaryMs, replicaMs); 1614 return ap; 1615 } 1616 1617 private BufferedMutatorParams createBufferedMutatorParams(MyAsyncProcess ap, 1618 TableName name) { 1619 return new BufferedMutatorParams(name) 1620 .pool(ap.service) 1621 .rpcTimeout(RPC_TIMEOUT) 1622 .opertationTimeout(OPERATION_TIMEOUT); 1623 } 1624 1625 private static List<Get> makeTimelineGets(byte[]... rows) { 1626 List<Get> result = new ArrayList<>(rows.length); 1627 for (byte[] row : rows) { 1628 Get get = new Get(row); 1629 get.setConsistency(Consistency.TIMELINE); 1630 result.add(get); 1631 } 1632 return result; 1633 } 1634 1635 private void verifyResult(AsyncRequestFuture ars, boolean... expected) throws Exception { 1636 Object[] actual = ars.getResults(); 1637 Assert.assertEquals(expected.length, actual.length); 1638 for (int i = 0; i < expected.length; ++i) { 1639 Assert.assertEquals(expected[i], !(actual[i] instanceof Throwable)); 1640 } 1641 } 1642 1643 /** After reading TheDailyWtf, I always wanted to create a MyBoolean enum like this! */ 1644 private enum RR { 1645 TRUE, 1646 FALSE, 1647 DONT_CARE, 1648 FAILED 1649 } 1650 1651 private void verifyReplicaResult(AsyncRequestFuture ars, RR... expecteds) throws Exception { 1652 Object[] actuals = ars.getResults(); 1653 Assert.assertEquals(expecteds.length, actuals.length); 1654 for (int i = 0; i < expecteds.length; ++i) { 1655 Object actual = actuals[i]; 1656 RR expected = expecteds[i]; 1657 Assert.assertEquals(actual.toString(), expected == RR.FAILED, actual instanceof Throwable); 1658 if (expected != RR.FAILED && expected != RR.DONT_CARE) { 1659 Assert.assertEquals(expected == RR.TRUE, ((Result)actual).isStale()); 1660 } 1661 } 1662 } 1663 1664 /** 1665 * @param regCnt the region: 1 to 3. 1666 * @param success if true, the put will succeed. 1667 * @return a put 1668 */ 1669 private Put createPut(int regCnt, boolean success) { 1670 Put p; 1671 if (!success) { 1672 p = new Put(FAILS); 1673 } else switch (regCnt){ 1674 case 1 : 1675 p = new Put(DUMMY_BYTES_1); 1676 break; 1677 case 2: 1678 p = new Put(DUMMY_BYTES_2); 1679 break; 1680 case 3: 1681 p = new Put(DUMMY_BYTES_3); 1682 break; 1683 default: 1684 throw new IllegalArgumentException("unknown " + regCnt); 1685 } 1686 1687 p.addColumn(DUMMY_BYTES_1, DUMMY_BYTES_1, DUMMY_BYTES_1); 1688 1689 return p; 1690 } 1691 1692 static class MyThreadPoolExecutor extends ThreadPoolExecutor { 1693 public MyThreadPoolExecutor(int coreThreads, int maxThreads, long keepAliveTime, 1694 TimeUnit timeunit, BlockingQueue<Runnable> blockingqueue) { 1695 super(coreThreads, maxThreads, keepAliveTime, timeunit, blockingqueue); 1696 } 1697 1698 @Override 1699 public Future submit(Runnable runnable) { 1700 throw new OutOfMemoryError("OutOfMemory error thrown by means"); 1701 } 1702 } 1703 1704 static class AsyncProcessForThrowableCheck extends AsyncProcess { 1705 public AsyncProcessForThrowableCheck(ClusterConnection hc, Configuration conf) { 1706 super(hc, conf, new RpcRetryingCallerFactory(conf), new RpcControllerFactory( 1707 conf)); 1708 } 1709 } 1710 1711 @Test 1712 public void testUncheckedException() throws Exception { 1713 // Test the case pool.submit throws unchecked exception 1714 ClusterConnection hc = createHConnection(); 1715 MyThreadPoolExecutor myPool = 1716 new MyThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS, 1717 new LinkedBlockingQueue<>(200)); 1718 AsyncProcess ap = new AsyncProcessForThrowableCheck(hc, CONF); 1719 1720 List<Put> puts = new ArrayList<>(1); 1721 puts.add(createPut(1, true)); 1722 AsyncProcessTask task = AsyncProcessTask.newBuilder() 1723 .setPool(myPool) 1724 .setRpcTimeout(RPC_TIMEOUT) 1725 .setOperationTimeout(OPERATION_TIMEOUT) 1726 .setTableName(DUMMY_TABLE) 1727 .setRowAccess(puts) 1728 .setSubmittedRows(SubmittedRows.NORMAL) 1729 .build(); 1730 ap.submit(task); 1731 Assert.assertTrue(puts.isEmpty()); 1732 } 1733 1734 /** 1735 * Test and make sure we could use a special pause setting when retry with 1736 * CallQueueTooBigException, see HBASE-17114 1737 * @throws Exception if unexpected error happened during test 1738 */ 1739 @Test 1740 public void testRetryPauseWithCallQueueTooBigException() throws Exception { 1741 Configuration myConf = new Configuration(CONF); 1742 final long specialPause = 500L; 1743 final int retries = 1; 1744 myConf.setLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, specialPause); 1745 myConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, retries); 1746 ClusterConnection conn = new MyConnectionImpl(myConf); 1747 AsyncProcessWithFailure ap = 1748 new AsyncProcessWithFailure(conn, myConf, new CallQueueTooBigException()); 1749 BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); 1750 BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap); 1751 1752 Assert.assertNotNull(mutator.getAsyncProcess().createServerErrorTracker()); 1753 1754 Put p = createPut(1, true); 1755 mutator.mutate(p); 1756 1757 long startTime = System.currentTimeMillis(); 1758 try { 1759 mutator.flush(); 1760 Assert.fail(); 1761 } catch (RetriesExhaustedWithDetailsException expected) { 1762 assertEquals(1, expected.getNumExceptions()); 1763 assertTrue(expected.getRow(0) == p); 1764 } 1765 long actualSleep = System.currentTimeMillis() - startTime; 1766 long expectedSleep = 0L; 1767 for (int i = 0; i < retries; i++) { 1768 expectedSleep += ConnectionUtils.getPauseTime(specialPause, i); 1769 // Prevent jitter in ConcurrentMapUtils#getPauseTime to affect result 1770 actualSleep += (long) (specialPause * 0.01f); 1771 } 1772 LOG.debug("Expected to sleep " + expectedSleep + "ms, actually slept " + actualSleep + "ms"); 1773 Assert.assertTrue("Expected to sleep " + expectedSleep + " but actually " + actualSleep + "ms", 1774 actualSleep >= expectedSleep); 1775 1776 // check and confirm normal IOE will use the normal pause 1777 final long normalPause = 1778 myConf.getLong(HConstants.HBASE_CLIENT_PAUSE, HConstants.DEFAULT_HBASE_CLIENT_PAUSE); 1779 ap = new AsyncProcessWithFailure(conn, myConf, new IOException()); 1780 bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); 1781 mutator = new BufferedMutatorImpl(conn, bufferParam, ap); 1782 Assert.assertNotNull(mutator.getAsyncProcess().createServerErrorTracker()); 1783 mutator.mutate(p); 1784 startTime = System.currentTimeMillis(); 1785 try { 1786 mutator.flush(); 1787 Assert.fail(); 1788 } catch (RetriesExhaustedWithDetailsException expected) { 1789 assertEquals(1, expected.getNumExceptions()); 1790 assertTrue(expected.getRow(0) == p); 1791 } 1792 actualSleep = System.currentTimeMillis() - startTime; 1793 expectedSleep = 0L; 1794 for (int i = 0; i < retries; i++) { 1795 expectedSleep += ConnectionUtils.getPauseTime(normalPause, i); 1796 } 1797 // plus an additional pause to balance the program execution time 1798 expectedSleep += normalPause; 1799 LOG.debug("Expected to sleep " + expectedSleep + "ms, actually slept " + actualSleep + "ms"); 1800 Assert.assertTrue("Slept for too long: " + actualSleep + "ms", actualSleep <= expectedSleep); 1801 } 1802 1803 @Test 1804 public void testRetryWithExceptionClearsMetaCache() throws Exception { 1805 ClusterConnection conn = createHConnection(); 1806 Configuration myConf = conn.getConfiguration(); 1807 myConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0); 1808 1809 AsyncProcessWithFailure ap = 1810 new AsyncProcessWithFailure(conn, myConf, new RegionOpeningException("test")); 1811 BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); 1812 BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap); 1813 1814 Assert.assertNotNull(mutator.getAsyncProcess().createServerErrorTracker()); 1815 1816 Assert.assertEquals( 1817 conn.locateRegion(DUMMY_TABLE, DUMMY_BYTES_1, true, true).toString(), 1818 new RegionLocations(loc1).toString()); 1819 1820 Mockito.verify(conn, Mockito.times(0)).clearCaches(Mockito.any()); 1821 1822 Put p = createPut(1, true); 1823 mutator.mutate(p); 1824 1825 try { 1826 mutator.flush(); 1827 Assert.fail(); 1828 } catch (RetriesExhaustedWithDetailsException expected) { 1829 assertEquals(1, expected.getNumExceptions()); 1830 assertTrue(expected.getRow(0) == p); 1831 } 1832 1833 Mockito.verify(conn, Mockito.times(1)).clearCaches(loc1.getServerName()); 1834 } 1835 1836 @Test 1837 public void testQueueRowAccess() throws Exception { 1838 ClusterConnection conn = createHConnection(); 1839 BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, null, null, 1840 new BufferedMutatorParams(DUMMY_TABLE).writeBufferSize(100000)); 1841 Put p0 = new Put(DUMMY_BYTES_1).addColumn(DUMMY_BYTES_1, DUMMY_BYTES_1, DUMMY_BYTES_1); 1842 Put p1 = new Put(DUMMY_BYTES_2).addColumn(DUMMY_BYTES_2, DUMMY_BYTES_2, DUMMY_BYTES_2); 1843 mutator.mutate(p0); 1844 BufferedMutatorImpl.QueueRowAccess ra0 = mutator.createQueueRowAccess(); 1845 // QueueRowAccess should take all undealt mutations 1846 assertEquals(0, mutator.size()); 1847 mutator.mutate(p1); 1848 assertEquals(1, mutator.size()); 1849 BufferedMutatorImpl.QueueRowAccess ra1 = mutator.createQueueRowAccess(); 1850 // QueueRowAccess should take all undealt mutations 1851 assertEquals(0, mutator.size()); 1852 assertEquals(1, ra0.size()); 1853 assertEquals(1, ra1.size()); 1854 Iterator<Row> iter0 = ra0.iterator(); 1855 Iterator<Row> iter1 = ra1.iterator(); 1856 assertTrue(iter0.hasNext()); 1857 assertTrue(iter1.hasNext()); 1858 // the next() will poll the mutation from inner buffer and update the buffer count 1859 assertTrue(iter0.next() == p0); 1860 assertEquals(1, mutator.getUnflushedSize()); 1861 assertEquals(p1.heapSize(), mutator.getCurrentWriteBufferSize()); 1862 assertTrue(iter1.next() == p1); 1863 assertEquals(0, mutator.getUnflushedSize()); 1864 assertEquals(0, mutator.getCurrentWriteBufferSize()); 1865 assertFalse(iter0.hasNext()); 1866 assertFalse(iter1.hasNext()); 1867 // ra0 doest handle the mutation so the mutation won't be pushed back to buffer 1868 iter0.remove(); 1869 ra0.close(); 1870 assertEquals(0, mutator.size()); 1871 assertEquals(0, mutator.getUnflushedSize()); 1872 assertEquals(0, mutator.getCurrentWriteBufferSize()); 1873 // ra1 doesn't handle the mutation so the mutation will be pushed back to buffer 1874 ra1.close(); 1875 assertEquals(1, mutator.size()); 1876 assertEquals(1, mutator.getUnflushedSize()); 1877 assertEquals(p1.heapSize(), mutator.getCurrentWriteBufferSize()); 1878 } 1879}