001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import com.codahale.metrics.Counter; 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.HashMap; 024import java.util.Iterator; 025import java.util.List; 026import java.util.Optional; 027import java.util.Set; 028import java.util.concurrent.CountDownLatch; 029import java.util.concurrent.TimeUnit; 030import java.util.concurrent.atomic.AtomicBoolean; 031import java.util.concurrent.atomic.AtomicInteger; 032import java.util.concurrent.atomic.AtomicLong; 033import java.util.concurrent.atomic.AtomicReference; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.hbase.Cell; 036import org.apache.hadoop.hbase.HBaseClassTestRule; 037import org.apache.hadoop.hbase.HBaseTestingUtility; 038import org.apache.hadoop.hbase.HConstants; 039import org.apache.hadoop.hbase.HRegionInfo; 040import org.apache.hadoop.hbase.HTableDescriptor; 041import org.apache.hadoop.hbase.KeyValue; 042import org.apache.hadoop.hbase.NotServingRegionException; 043import org.apache.hadoop.hbase.RegionLocations; 044import org.apache.hadoop.hbase.StartMiniClusterOption; 045import org.apache.hadoop.hbase.TableNotFoundException; 046import org.apache.hadoop.hbase.coprocessor.ObserverContext; 047import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 048import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 049import org.apache.hadoop.hbase.coprocessor.RegionObserver; 050import org.apache.hadoop.hbase.regionserver.HRegionServer; 051import org.apache.hadoop.hbase.regionserver.InternalScanner; 052import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore; 053import org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster; 054import org.apache.hadoop.hbase.testclassification.ClientTests; 055import org.apache.hadoop.hbase.testclassification.LargeTests; 056import org.apache.hadoop.hbase.util.Bytes; 057import org.apache.zookeeper.KeeperException; 058import org.junit.After; 059import org.junit.AfterClass; 060import org.junit.Assert; 061import org.junit.Before; 062import org.junit.BeforeClass; 063import org.junit.ClassRule; 064import org.junit.Ignore; 065import org.junit.Test; 066import org.junit.experimental.categories.Category; 067import org.slf4j.Logger; 068import org.slf4j.LoggerFactory; 069 070import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 071import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 072import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; 073 074/** 075 * Tests for region replicas. Sad that we cannot isolate these without bringing up a whole 076 * cluster. See {@link org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster}. 077 */ 078@Category({LargeTests.class, ClientTests.class}) 079@SuppressWarnings("deprecation") 080public class TestReplicasClient { 081 082 @ClassRule 083 public static final HBaseClassTestRule CLASS_RULE = 084 HBaseClassTestRule.forClass(TestReplicasClient.class); 085 086 private static final Logger LOG = LoggerFactory.getLogger(TestReplicasClient.class); 087 088 private static final int NB_SERVERS = 1; 089 private static Table table = null; 090 private static final byte[] row = TestReplicasClient.class.getName().getBytes(); 091 092 private static HRegionInfo hriPrimary; 093 private static HRegionInfo hriSecondary; 094 095 private static final HBaseTestingUtility HTU = new HBaseTestingUtility(); 096 private static final byte[] f = HConstants.CATALOG_FAMILY; 097 098 private final static int REFRESH_PERIOD = 1000; 099 100 /** 101 * This copro is used to synchronize the tests. 102 */ 103 public static class SlowMeCopro implements RegionCoprocessor, RegionObserver { 104 static final AtomicLong sleepTime = new AtomicLong(0); 105 static final AtomicBoolean slowDownNext = new AtomicBoolean(false); 106 static final AtomicInteger countOfNext = new AtomicInteger(0); 107 private static final AtomicReference<CountDownLatch> primaryCdl = 108 new AtomicReference<>(new CountDownLatch(0)); 109 private static final AtomicReference<CountDownLatch> secondaryCdl = 110 new AtomicReference<>(new CountDownLatch(0)); 111 public SlowMeCopro() { 112 } 113 114 @Override 115 public Optional<RegionObserver> getRegionObserver() { 116 return Optional.of(this); 117 } 118 119 @Override 120 public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e, 121 final Get get, final List<Cell> results) throws IOException { 122 slowdownCode(e); 123 } 124 125 @Override 126 public void preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e, 127 final Scan scan) throws IOException { 128 slowdownCode(e); 129 } 130 131 @Override 132 public boolean preScannerNext(final ObserverContext<RegionCoprocessorEnvironment> e, 133 final InternalScanner s, final List<Result> results, 134 final int limit, final boolean hasMore) throws IOException { 135 //this will slow down a certain next operation if the conditions are met. The slowness 136 //will allow the call to go to a replica 137 if (slowDownNext.get()) { 138 //have some "next" return successfully from the primary; hence countOfNext checked 139 if (countOfNext.incrementAndGet() == 2) { 140 sleepTime.set(2000); 141 slowdownCode(e); 142 } 143 } 144 return true; 145 } 146 147 private void slowdownCode(final ObserverContext<RegionCoprocessorEnvironment> e) { 148 if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() == 0) { 149 LOG.info("We're the primary replicas."); 150 CountDownLatch latch = getPrimaryCdl().get(); 151 try { 152 if (sleepTime.get() > 0) { 153 LOG.info("Sleeping for " + sleepTime.get() + " ms"); 154 Thread.sleep(sleepTime.get()); 155 } else if (latch.getCount() > 0) { 156 LOG.info("Waiting for the counterCountDownLatch"); 157 latch.await(2, TimeUnit.MINUTES); // To help the tests to finish. 158 if (latch.getCount() > 0) { 159 throw new RuntimeException("Can't wait more"); 160 } 161 } 162 } catch (InterruptedException e1) { 163 LOG.error(e1.toString(), e1); 164 } 165 } else { 166 LOG.info("We're not the primary replicas."); 167 CountDownLatch latch = getSecondaryCdl().get(); 168 try { 169 if (latch.getCount() > 0) { 170 LOG.info("Waiting for the secondary counterCountDownLatch"); 171 latch.await(2, TimeUnit.MINUTES); // To help the tests to finish. 172 if (latch.getCount() > 0) { 173 throw new RuntimeException("Can't wait more"); 174 } 175 } 176 } catch (InterruptedException e1) { 177 LOG.error(e1.toString(), e1); 178 } 179 } 180 } 181 182 public static AtomicReference<CountDownLatch> getPrimaryCdl() { 183 return primaryCdl; 184 } 185 186 public static AtomicReference<CountDownLatch> getSecondaryCdl() { 187 return secondaryCdl; 188 } 189 } 190 191 @BeforeClass 192 public static void beforeClass() throws Exception { 193 // enable store file refreshing 194 HTU.getConfiguration().setInt( 195 StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, REFRESH_PERIOD); 196 HTU.getConfiguration().setBoolean("hbase.client.log.scanner.activity", true); 197 HTU.getConfiguration().setBoolean(MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY, true); 198 ConnectionUtils.setupMasterlessConnection(HTU.getConfiguration()); 199 StartMiniClusterOption option = StartMiniClusterOption.builder().numRegionServers(1). 200 numAlwaysStandByMasters(1).numMasters(1).build(); 201 HTU.startMiniCluster(option); 202 203 // Create table then get the single region for our new table. 204 HTableDescriptor hdt = HTU.createTableDescriptor(TestReplicasClient.class.getSimpleName()); 205 hdt.addCoprocessor(SlowMeCopro.class.getName()); 206 table = HTU.createTable(hdt, new byte[][]{f}, null); 207 208 try (RegionLocator locator = HTU.getConnection().getRegionLocator(hdt.getTableName())) { 209 hriPrimary = locator.getRegionLocation(row, false).getRegionInfo(); 210 } 211 212 // mock a secondary region info to open 213 hriSecondary = new HRegionInfo(hriPrimary.getTable(), hriPrimary.getStartKey(), 214 hriPrimary.getEndKey(), hriPrimary.isSplit(), hriPrimary.getRegionId(), 1); 215 216 // No master 217 LOG.info("Master is going to be stopped"); 218 TestRegionServerNoMaster.stopMasterAndAssignMeta(HTU); 219 Configuration c = new Configuration(HTU.getConfiguration()); 220 c.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); 221 LOG.info("Master has stopped"); 222 } 223 224 @AfterClass 225 public static void afterClass() throws Exception { 226 HRegionServer.TEST_SKIP_REPORTING_TRANSITION = false; 227 if (table != null) table.close(); 228 HTU.shutdownMiniCluster(); 229 } 230 231 @Before 232 public void before() throws IOException { 233 ((ClusterConnection) HTU.getAdmin().getConnection()).clearRegionLocationCache(); 234 try { 235 openRegion(hriPrimary); 236 } catch (Exception ignored) { 237 } 238 try { 239 openRegion(hriSecondary); 240 } catch (Exception ignored) { 241 } 242 } 243 244 @After 245 public void after() throws IOException, KeeperException { 246 try { 247 closeRegion(hriSecondary); 248 } catch (Exception ignored) { 249 } 250 try { 251 closeRegion(hriPrimary); 252 } catch (Exception ignored) { 253 } 254 255 ((ClusterConnection) HTU.getAdmin().getConnection()).clearRegionLocationCache(); 256 } 257 258 private HRegionServer getRS() { 259 return HTU.getMiniHBaseCluster().getRegionServer(0); 260 } 261 262 private void openRegion(HRegionInfo hri) throws Exception { 263 try { 264 if (isRegionOpened(hri)) return; 265 } catch (Exception e){} 266 // first version is '0' 267 AdminProtos.OpenRegionRequest orr = RequestConverter.buildOpenRegionRequest( 268 getRS().getServerName(), hri, null); 269 AdminProtos.OpenRegionResponse responseOpen = getRS().getRSRpcServices().openRegion(null, orr); 270 Assert.assertEquals(1, responseOpen.getOpeningStateCount()); 271 Assert.assertEquals(AdminProtos.OpenRegionResponse.RegionOpeningState.OPENED, 272 responseOpen.getOpeningState(0)); 273 checkRegionIsOpened(hri); 274 } 275 276 private void closeRegion(HRegionInfo hri) throws Exception { 277 AdminProtos.CloseRegionRequest crr = ProtobufUtil.buildCloseRegionRequest( 278 getRS().getServerName(), hri.getRegionName()); 279 AdminProtos.CloseRegionResponse responseClose = getRS() 280 .getRSRpcServices().closeRegion(null, crr); 281 Assert.assertTrue(responseClose.getClosed()); 282 283 checkRegionIsClosed(hri.getEncodedName()); 284 } 285 286 private void checkRegionIsOpened(HRegionInfo hri) throws Exception { 287 while (!getRS().getRegionsInTransitionInRS().isEmpty()) { 288 Thread.sleep(1); 289 } 290 } 291 292 private boolean isRegionOpened(HRegionInfo hri) throws Exception { 293 return getRS().getRegionByEncodedName(hri.getEncodedName()).isAvailable(); 294 } 295 296 private void checkRegionIsClosed(String encodedRegionName) throws Exception { 297 298 while (!getRS().getRegionsInTransitionInRS().isEmpty()) { 299 Thread.sleep(1); 300 } 301 302 try { 303 Assert.assertFalse(getRS().getRegionByEncodedName(encodedRegionName).isAvailable()); 304 } catch (NotServingRegionException expected) { 305 // That's how it work: if the region is closed we have an exception. 306 } 307 308 // We don't delete the znode here, because there is not always a znode. 309 } 310 311 private void flushRegion(HRegionInfo regionInfo) throws IOException { 312 TestRegionServerNoMaster.flushRegion(HTU, regionInfo); 313 } 314 315 @Test 316 public void testUseRegionWithoutReplica() throws Exception { 317 byte[] b1 = "testUseRegionWithoutReplica".getBytes(); 318 openRegion(hriSecondary); 319 SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(0)); 320 try { 321 Get g = new Get(b1); 322 Result r = table.get(g); 323 Assert.assertFalse(r.isStale()); 324 } finally { 325 closeRegion(hriSecondary); 326 } 327 } 328 329 @Test 330 public void testLocations() throws Exception { 331 byte[] b1 = "testLocations".getBytes(); 332 openRegion(hriSecondary); 333 ClusterConnection hc = (ClusterConnection) HTU.getAdmin().getConnection(); 334 335 try { 336 hc.clearRegionLocationCache(); 337 RegionLocations rl = hc.locateRegion(table.getName(), b1, false, false); 338 Assert.assertEquals(2, rl.size()); 339 340 rl = hc.locateRegion(table.getName(), b1, true, false); 341 Assert.assertEquals(2, rl.size()); 342 343 hc.clearRegionLocationCache(); 344 rl = hc.locateRegion(table.getName(), b1, true, false); 345 Assert.assertEquals(2, rl.size()); 346 347 rl = hc.locateRegion(table.getName(), b1, false, false); 348 Assert.assertEquals(2, rl.size()); 349 } finally { 350 closeRegion(hriSecondary); 351 } 352 } 353 354 @Test 355 public void testGetNoResultNoStaleRegionWithReplica() throws Exception { 356 byte[] b1 = "testGetNoResultNoStaleRegionWithReplica".getBytes(); 357 openRegion(hriSecondary); 358 359 try { 360 // A get works and is not stale 361 Get g = new Get(b1); 362 Result r = table.get(g); 363 Assert.assertFalse(r.isStale()); 364 } finally { 365 closeRegion(hriSecondary); 366 } 367 } 368 369 370 @Test 371 public void testGetNoResultStaleRegionWithReplica() throws Exception { 372 byte[] b1 = "testGetNoResultStaleRegionWithReplica".getBytes(); 373 openRegion(hriSecondary); 374 375 SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1)); 376 try { 377 Get g = new Get(b1); 378 g.setConsistency(Consistency.TIMELINE); 379 Result r = table.get(g); 380 Assert.assertTrue(r.isStale()); 381 } finally { 382 SlowMeCopro.getPrimaryCdl().get().countDown(); 383 closeRegion(hriSecondary); 384 } 385 } 386 387 @Test 388 public void testGetNoResultNotStaleSleepRegionWithReplica() throws Exception { 389 byte[] b1 = "testGetNoResultNotStaleSleepRegionWithReplica".getBytes(); 390 openRegion(hriSecondary); 391 392 try { 393 // We sleep; but we won't go to the stale region as we don't get the stale by default. 394 SlowMeCopro.sleepTime.set(2000); 395 Get g = new Get(b1); 396 Result r = table.get(g); 397 Assert.assertFalse(r.isStale()); 398 399 } finally { 400 SlowMeCopro.sleepTime.set(0); 401 closeRegion(hriSecondary); 402 } 403 } 404 405 @Test 406 public void testFlushTable() throws Exception { 407 openRegion(hriSecondary); 408 try { 409 flushRegion(hriPrimary); 410 flushRegion(hriSecondary); 411 412 Put p = new Put(row); 413 p.addColumn(f, row, row); 414 table.put(p); 415 416 flushRegion(hriPrimary); 417 flushRegion(hriSecondary); 418 } finally { 419 Delete d = new Delete(row); 420 table.delete(d); 421 closeRegion(hriSecondary); 422 } 423 } 424 425 @Test 426 public void testFlushPrimary() throws Exception { 427 openRegion(hriSecondary); 428 429 try { 430 flushRegion(hriPrimary); 431 432 Put p = new Put(row); 433 p.addColumn(f, row, row); 434 table.put(p); 435 436 flushRegion(hriPrimary); 437 } finally { 438 Delete d = new Delete(row); 439 table.delete(d); 440 closeRegion(hriSecondary); 441 } 442 } 443 444 @Test 445 public void testFlushSecondary() throws Exception { 446 openRegion(hriSecondary); 447 try { 448 flushRegion(hriSecondary); 449 450 Put p = new Put(row); 451 p.addColumn(f, row, row); 452 table.put(p); 453 454 flushRegion(hriSecondary); 455 } catch (TableNotFoundException expected) { 456 } finally { 457 Delete d = new Delete(row); 458 table.delete(d); 459 closeRegion(hriSecondary); 460 } 461 } 462 463 @Test 464 public void testUseRegionWithReplica() throws Exception { 465 byte[] b1 = "testUseRegionWithReplica".getBytes(); 466 openRegion(hriSecondary); 467 468 try { 469 // A simple put works, even if there here a second replica 470 Put p = new Put(b1); 471 p.addColumn(f, b1, b1); 472 table.put(p); 473 LOG.info("Put done"); 474 475 // A get works and is not stale 476 Get g = new Get(b1); 477 Result r = table.get(g); 478 Assert.assertFalse(r.isStale()); 479 Assert.assertFalse(r.getColumnCells(f, b1).isEmpty()); 480 LOG.info("get works and is not stale done"); 481 482 // Even if it we have to wait a little on the main region 483 SlowMeCopro.sleepTime.set(2000); 484 g = new Get(b1); 485 r = table.get(g); 486 Assert.assertFalse(r.isStale()); 487 Assert.assertFalse(r.getColumnCells(f, b1).isEmpty()); 488 SlowMeCopro.sleepTime.set(0); 489 LOG.info("sleep and is not stale done"); 490 491 // But if we ask for stale we will get it 492 SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1)); 493 g = new Get(b1); 494 g.setConsistency(Consistency.TIMELINE); 495 r = table.get(g); 496 Assert.assertTrue(r.isStale()); 497 Assert.assertTrue(r.getColumnCells(f, b1).isEmpty()); 498 SlowMeCopro.getPrimaryCdl().get().countDown(); 499 500 LOG.info("stale done"); 501 502 // exists works and is not stale 503 g = new Get(b1); 504 g.setCheckExistenceOnly(true); 505 r = table.get(g); 506 Assert.assertFalse(r.isStale()); 507 Assert.assertTrue(r.getExists()); 508 LOG.info("exists not stale done"); 509 510 // exists works on stale but don't see the put 511 SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1)); 512 g = new Get(b1); 513 g.setCheckExistenceOnly(true); 514 g.setConsistency(Consistency.TIMELINE); 515 r = table.get(g); 516 Assert.assertTrue(r.isStale()); 517 Assert.assertFalse("The secondary has stale data", r.getExists()); 518 SlowMeCopro.getPrimaryCdl().get().countDown(); 519 LOG.info("exists stale before flush done"); 520 521 flushRegion(hriPrimary); 522 flushRegion(hriSecondary); 523 LOG.info("flush done"); 524 Thread.sleep(1000 + REFRESH_PERIOD * 2); 525 526 // get works and is not stale 527 SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1)); 528 g = new Get(b1); 529 g.setConsistency(Consistency.TIMELINE); 530 r = table.get(g); 531 Assert.assertTrue(r.isStale()); 532 Assert.assertFalse(r.isEmpty()); 533 SlowMeCopro.getPrimaryCdl().get().countDown(); 534 LOG.info("stale done"); 535 536 // exists works on stale and we see the put after the flush 537 SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1)); 538 g = new Get(b1); 539 g.setCheckExistenceOnly(true); 540 g.setConsistency(Consistency.TIMELINE); 541 r = table.get(g); 542 Assert.assertTrue(r.isStale()); 543 Assert.assertTrue(r.getExists()); 544 SlowMeCopro.getPrimaryCdl().get().countDown(); 545 LOG.info("exists stale after flush done"); 546 547 } finally { 548 SlowMeCopro.getPrimaryCdl().get().countDown(); 549 SlowMeCopro.sleepTime.set(0); 550 Delete d = new Delete(b1); 551 table.delete(d); 552 closeRegion(hriSecondary); 553 } 554 } 555 556 @Test 557 public void testHedgedRead() throws Exception { 558 byte[] b1 = "testHedgedRead".getBytes(); 559 openRegion(hriSecondary); 560 561 try { 562 // A simple put works, even if there here a second replica 563 Put p = new Put(b1); 564 p.addColumn(f, b1, b1); 565 table.put(p); 566 LOG.info("Put done"); 567 568 // A get works and is not stale 569 Get g = new Get(b1); 570 Result r = table.get(g); 571 Assert.assertFalse(r.isStale()); 572 Assert.assertFalse(r.getColumnCells(f, b1).isEmpty()); 573 LOG.info("get works and is not stale done"); 574 575 //reset 576 ClusterConnection connection = (ClusterConnection) HTU.getConnection(); 577 Counter hedgedReadOps = connection.getConnectionMetrics().hedgedReadOps; 578 Counter hedgedReadWin = connection.getConnectionMetrics().hedgedReadWin; 579 hedgedReadOps.dec(hedgedReadOps.getCount()); 580 hedgedReadWin.dec(hedgedReadWin.getCount()); 581 582 // Wait a little on the main region, just enough to happen once hedged read 583 // and hedged read did not returned faster 584 int primaryCallTimeoutMicroSecond = connection.getConnectionConfiguration().getPrimaryCallTimeoutMicroSecond(); 585 SlowMeCopro.sleepTime.set(TimeUnit.MICROSECONDS.toMillis(primaryCallTimeoutMicroSecond)); 586 SlowMeCopro.getSecondaryCdl().set(new CountDownLatch(1)); 587 g = new Get(b1); 588 g.setConsistency(Consistency.TIMELINE); 589 r = table.get(g); 590 Assert.assertFalse(r.isStale()); 591 Assert.assertFalse(r.getColumnCells(f, b1).isEmpty()); 592 Assert.assertEquals(1, hedgedReadOps.getCount()); 593 Assert.assertEquals(0, hedgedReadWin.getCount()); 594 SlowMeCopro.sleepTime.set(0); 595 SlowMeCopro.getSecondaryCdl().get().countDown(); 596 LOG.info("hedged read occurred but not faster"); 597 598 599 // But if we ask for stale we will get it and hedged read returned faster 600 SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1)); 601 g = new Get(b1); 602 g.setConsistency(Consistency.TIMELINE); 603 r = table.get(g); 604 Assert.assertTrue(r.isStale()); 605 Assert.assertTrue(r.getColumnCells(f, b1).isEmpty()); 606 Assert.assertEquals(2, hedgedReadOps.getCount()); 607 Assert.assertEquals(1, hedgedReadWin.getCount()); 608 SlowMeCopro.getPrimaryCdl().get().countDown(); 609 LOG.info("hedged read occurred and faster"); 610 611 } finally { 612 SlowMeCopro.getPrimaryCdl().get().countDown(); 613 SlowMeCopro.getSecondaryCdl().get().countDown(); 614 SlowMeCopro.sleepTime.set(0); 615 Delete d = new Delete(b1); 616 table.delete(d); 617 closeRegion(hriSecondary); 618 } 619 } 620 621 @Ignore // Disabled because it is flakey. Fails 17% on constrained GCE. %3 on Apache. 622 @Test 623 public void testCancelOfMultiGet() throws Exception { 624 openRegion(hriSecondary); 625 try { 626 List<Put> puts = new ArrayList<>(2); 627 byte[] b1 = Bytes.toBytes("testCancelOfMultiGet" + 0); 628 Put p = new Put(b1); 629 p.addColumn(f, b1, b1); 630 puts.add(p); 631 632 byte[] b2 = Bytes.toBytes("testCancelOfMultiGet" + 1); 633 p = new Put(b2); 634 p.addColumn(f, b2, b2); 635 puts.add(p); 636 table.put(puts); 637 LOG.debug("PUT done"); 638 flushRegion(hriPrimary); 639 LOG.info("flush done"); 640 641 Thread.sleep(1000 + REFRESH_PERIOD * 2); 642 643 AsyncProcess ap = ((ClusterConnection) HTU.getConnection()).getAsyncProcess(); 644 645 // Make primary slowdown 646 SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1)); 647 648 List<Get> gets = new ArrayList<>(); 649 Get g = new Get(b1); 650 g.setCheckExistenceOnly(true); 651 g.setConsistency(Consistency.TIMELINE); 652 gets.add(g); 653 g = new Get(b2); 654 g.setCheckExistenceOnly(true); 655 g.setConsistency(Consistency.TIMELINE); 656 gets.add(g); 657 Object[] results = new Object[2]; 658 659 int operationTimeout = ((ClusterConnection) HTU.getConnection()).getConnectionConfiguration().getOperationTimeout(); 660 int readTimeout = ((ClusterConnection) HTU.getConnection()).getConnectionConfiguration().getReadRpcTimeout(); 661 AsyncProcessTask task = AsyncProcessTask.newBuilder() 662 .setPool(HTable.getDefaultExecutor(HTU.getConfiguration())) 663 .setTableName(table.getName()) 664 .setRowAccess(gets) 665 .setResults(results) 666 .setOperationTimeout(operationTimeout) 667 .setRpcTimeout(readTimeout) 668 .build(); 669 AsyncRequestFuture reqs = ap.submit(task); 670 reqs.waitUntilDone(); 671 // verify we got the right results back 672 for (Object r : results) { 673 Assert.assertTrue(((Result)r).isStale()); 674 Assert.assertTrue(((Result)r).getExists()); 675 } 676 Set<CancellableRegionServerCallable> set = 677 ((AsyncRequestFutureImpl<?>)reqs).getCallsInProgress(); 678 // verify we did cancel unneeded calls 679 Assert.assertTrue(!set.isEmpty()); 680 for (CancellableRegionServerCallable m : set) { 681 Assert.assertTrue(m.isCancelled()); 682 } 683 } finally { 684 SlowMeCopro.getPrimaryCdl().get().countDown(); 685 SlowMeCopro.sleepTime.set(0); 686 SlowMeCopro.slowDownNext.set(false); 687 SlowMeCopro.countOfNext.set(0); 688 for (int i = 0; i < 2; i++) { 689 byte[] b1 = Bytes.toBytes("testCancelOfMultiGet" + i); 690 Delete d = new Delete(b1); 691 table.delete(d); 692 } 693 closeRegion(hriSecondary); 694 } 695 } 696 697 @Test 698 public void testScanWithReplicas() throws Exception { 699 //simple scan 700 runMultipleScansOfOneType(false, false); 701 } 702 703 @Test 704 public void testSmallScanWithReplicas() throws Exception { 705 //small scan 706 runMultipleScansOfOneType(false, true); 707 } 708 709 @Test 710 public void testReverseScanWithReplicas() throws Exception { 711 //reverse scan 712 runMultipleScansOfOneType(true, false); 713 } 714 715 @Test 716 public void testCancelOfScan() throws Exception { 717 openRegion(hriSecondary); 718 int NUMROWS = 100; 719 try { 720 for (int i = 0; i < NUMROWS; i++) { 721 byte[] b1 = Bytes.toBytes("testUseRegionWithReplica" + i); 722 Put p = new Put(b1); 723 p.addColumn(f, b1, b1); 724 table.put(p); 725 } 726 LOG.debug("PUT done"); 727 int caching = 20; 728 byte[] start; 729 start = Bytes.toBytes("testUseRegionWithReplica" + 0); 730 731 flushRegion(hriPrimary); 732 LOG.info("flush done"); 733 Thread.sleep(1000 + REFRESH_PERIOD * 2); 734 735 // now make some 'next' calls slow 736 SlowMeCopro.slowDownNext.set(true); 737 SlowMeCopro.countOfNext.set(0); 738 SlowMeCopro.sleepTime.set(5000); 739 740 Scan scan = new Scan(start); 741 scan.setCaching(caching); 742 scan.setConsistency(Consistency.TIMELINE); 743 ResultScanner scanner = table.getScanner(scan); 744 Iterator<Result> iter = scanner.iterator(); 745 iter.next(); 746 Assert.assertTrue(((ClientScanner)scanner).isAnyRPCcancelled()); 747 SlowMeCopro.slowDownNext.set(false); 748 SlowMeCopro.countOfNext.set(0); 749 } finally { 750 SlowMeCopro.getPrimaryCdl().get().countDown(); 751 SlowMeCopro.sleepTime.set(0); 752 SlowMeCopro.slowDownNext.set(false); 753 SlowMeCopro.countOfNext.set(0); 754 for (int i = 0; i < NUMROWS; i++) { 755 byte[] b1 = Bytes.toBytes("testUseRegionWithReplica" + i); 756 Delete d = new Delete(b1); 757 table.delete(d); 758 } 759 closeRegion(hriSecondary); 760 } 761 } 762 763 private void runMultipleScansOfOneType(boolean reversed, boolean small) throws Exception { 764 openRegion(hriSecondary); 765 int NUMROWS = 100; 766 int NUMCOLS = 10; 767 try { 768 for (int i = 0; i < NUMROWS; i++) { 769 byte[] b1 = Bytes.toBytes("testUseRegionWithReplica" + i); 770 for (int col = 0; col < NUMCOLS; col++) { 771 Put p = new Put(b1); 772 String qualifier = "qualifer" + col; 773 KeyValue kv = new KeyValue(b1, f, qualifier.getBytes()); 774 p.add(kv); 775 table.put(p); 776 } 777 } 778 LOG.debug("PUT done"); 779 int caching = 20; 780 long maxResultSize = Long.MAX_VALUE; 781 782 byte[] start; 783 if (reversed) start = Bytes.toBytes("testUseRegionWithReplica" + (NUMROWS - 1)); 784 else start = Bytes.toBytes("testUseRegionWithReplica" + 0); 785 786 scanWithReplicas(reversed, small, Consistency.TIMELINE, caching, maxResultSize, 787 start, NUMROWS, NUMCOLS, false, false); 788 789 // Even if we were to slow the server down, unless we ask for stale 790 // we won't get it 791 SlowMeCopro.sleepTime.set(5000); 792 scanWithReplicas(reversed, small, Consistency.STRONG, caching, maxResultSize, start, NUMROWS, 793 NUMCOLS, false, false); 794 SlowMeCopro.sleepTime.set(0); 795 796 flushRegion(hriPrimary); 797 LOG.info("flush done"); 798 Thread.sleep(1000 + REFRESH_PERIOD * 2); 799 800 //Now set the flag to get a response even if stale 801 SlowMeCopro.sleepTime.set(5000); 802 scanWithReplicas(reversed, small, Consistency.TIMELINE, caching, maxResultSize, 803 start, NUMROWS, NUMCOLS, true, false); 804 SlowMeCopro.sleepTime.set(0); 805 806 // now make some 'next' calls slow 807 SlowMeCopro.slowDownNext.set(true); 808 SlowMeCopro.countOfNext.set(0); 809 scanWithReplicas(reversed, small, Consistency.TIMELINE, caching, maxResultSize, start, 810 NUMROWS, NUMCOLS, true, true); 811 SlowMeCopro.slowDownNext.set(false); 812 SlowMeCopro.countOfNext.set(0); 813 814 // Make sure we do not get stale data.. 815 SlowMeCopro.sleepTime.set(5000); 816 scanWithReplicas(reversed, small, Consistency.STRONG, caching, maxResultSize, 817 start, NUMROWS, NUMCOLS, false, false); 818 SlowMeCopro.sleepTime.set(0); 819 820 // While the next calls are slow, set maxResultSize to 1 so that some partial results will be 821 // returned from the server before the replica switch occurs. 822 maxResultSize = 1; 823 SlowMeCopro.slowDownNext.set(true); 824 SlowMeCopro.countOfNext.set(0); 825 scanWithReplicas(reversed, small, Consistency.TIMELINE, caching, maxResultSize, start, 826 NUMROWS, NUMCOLS, true, true); 827 maxResultSize = Long.MAX_VALUE; 828 SlowMeCopro.slowDownNext.set(false); 829 SlowMeCopro.countOfNext.set(0); 830 } finally { 831 SlowMeCopro.getPrimaryCdl().get().countDown(); 832 SlowMeCopro.sleepTime.set(0); 833 SlowMeCopro.slowDownNext.set(false); 834 SlowMeCopro.countOfNext.set(0); 835 for (int i = 0; i < NUMROWS; i++) { 836 byte[] b1 = Bytes.toBytes("testUseRegionWithReplica" + i); 837 Delete d = new Delete(b1); 838 table.delete(d); 839 } 840 closeRegion(hriSecondary); 841 } 842 } 843 844 private void scanWithReplicas(boolean reversed, boolean small, Consistency consistency, 845 int caching, long maxResultSize, byte[] startRow, int numRows, int numCols, 846 boolean staleExpected, boolean slowNext) 847 throws Exception { 848 Scan scan = new Scan(startRow); 849 scan.setCaching(caching); 850 scan.setMaxResultSize(maxResultSize); 851 scan.setReversed(reversed); 852 scan.setSmall(small); 853 scan.setConsistency(consistency); 854 ResultScanner scanner = table.getScanner(scan); 855 Iterator<Result> iter = scanner.iterator(); 856 857 // Maps of row keys that we have seen so far 858 HashMap<String, Boolean> map = new HashMap<>(); 859 860 // Tracked metrics 861 int rowCount = 0; 862 int cellCount = 0; 863 int countOfStale = 0; 864 865 while (iter.hasNext()) { 866 rowCount++; 867 Result r = iter.next(); 868 String row = new String(r.getRow()); 869 870 if (map.containsKey(row)) { 871 throw new Exception("Unexpected scan result. Repeated row " + Bytes.toString(r.getRow())); 872 } 873 874 map.put(row, true); 875 876 for (Cell cell : r.rawCells()) { 877 cellCount++; 878 } 879 880 if (!slowNext) Assert.assertTrue(r.isStale() == staleExpected); 881 if (r.isStale()) countOfStale++; 882 } 883 Assert.assertTrue("Count of rows " + rowCount + " num rows expected " + numRows, 884 rowCount == numRows); 885 Assert.assertTrue("Count of cells: " + cellCount + " cells expected: " + numRows * numCols, 886 cellCount == (numRows * numCols)); 887 888 if (slowNext) { 889 LOG.debug("Count of Stale " + countOfStale); 890 Assert.assertTrue(countOfStale > 1); 891 892 // If the scan was configured in such a way that a full row was NOT retrieved before the 893 // replica switch occurred, then it is possible that all rows were stale 894 if (maxResultSize != Long.MAX_VALUE) { 895 Assert.assertTrue(countOfStale <= numRows); 896 } else { 897 Assert.assertTrue(countOfStale < numRows); 898 } 899 } 900 } 901}