001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import com.codahale.metrics.Counter; 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.HashMap; 024import java.util.Iterator; 025import java.util.List; 026import java.util.Optional; 027import java.util.Random; 028import java.util.Set; 029import java.util.concurrent.CountDownLatch; 030import java.util.concurrent.TimeUnit; 031import java.util.concurrent.atomic.AtomicBoolean; 032import java.util.concurrent.atomic.AtomicInteger; 033import java.util.concurrent.atomic.AtomicLong; 034import java.util.concurrent.atomic.AtomicReference; 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.hbase.Cell; 037import org.apache.hadoop.hbase.HBaseClassTestRule; 038import org.apache.hadoop.hbase.HBaseTestingUtility; 039import org.apache.hadoop.hbase.HConstants; 040import org.apache.hadoop.hbase.HRegionInfo; 041import org.apache.hadoop.hbase.HTableDescriptor; 042import org.apache.hadoop.hbase.KeyValue; 043import org.apache.hadoop.hbase.NotServingRegionException; 044import org.apache.hadoop.hbase.RegionLocations; 045import org.apache.hadoop.hbase.TableNotFoundException; 046import org.apache.hadoop.hbase.coprocessor.ObserverContext; 047import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 048import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 049import org.apache.hadoop.hbase.coprocessor.RegionObserver; 050import org.apache.hadoop.hbase.regionserver.HRegionServer; 051import org.apache.hadoop.hbase.regionserver.InternalScanner; 052import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore; 053import org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster; 054import org.apache.hadoop.hbase.testclassification.ClientTests; 055import org.apache.hadoop.hbase.testclassification.MediumTests; 056import org.apache.hadoop.hbase.util.Bytes; 057import org.apache.zookeeper.KeeperException; 058import org.junit.After; 059import org.junit.AfterClass; 060import org.junit.Assert; 061import org.junit.Before; 062import org.junit.BeforeClass; 063import org.junit.ClassRule; 064import org.junit.Ignore; 065import org.junit.Test; 066import org.junit.experimental.categories.Category; 067import org.slf4j.Logger; 068import org.slf4j.LoggerFactory; 069 070import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 071import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 072import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; 073 074/** 075 * Tests for region replicas. Sad that we cannot isolate these without bringing up a whole 076 * cluster. See {@link org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster}. 077 */ 078@Category({MediumTests.class, ClientTests.class}) 079@SuppressWarnings("deprecation") 080public class TestReplicasClient { 081 082 @ClassRule 083 public static final HBaseClassTestRule CLASS_RULE = 084 HBaseClassTestRule.forClass(TestReplicasClient.class); 085 086 private static final Logger LOG = LoggerFactory.getLogger(TestReplicasClient.class); 087 088 private static final int NB_SERVERS = 1; 089 private static Table table = null; 090 private static final byte[] row = TestReplicasClient.class.getName().getBytes(); 091 092 private static HRegionInfo hriPrimary; 093 private static HRegionInfo hriSecondary; 094 095 private static final HBaseTestingUtility HTU = new HBaseTestingUtility(); 096 private static final byte[] f = HConstants.CATALOG_FAMILY; 097 098 private final static int REFRESH_PERIOD = 1000; 099 100 /** 101 * This copro is used to synchronize the tests. 102 */ 103 public static class SlowMeCopro implements RegionCoprocessor, RegionObserver { 104 static final AtomicLong sleepTime = new AtomicLong(0); 105 static final AtomicBoolean slowDownNext = new AtomicBoolean(false); 106 static final AtomicInteger countOfNext = new AtomicInteger(0); 107 private static final AtomicReference<CountDownLatch> primaryCdl = 108 new AtomicReference<>(new CountDownLatch(0)); 109 private static final AtomicReference<CountDownLatch> secondaryCdl = 110 new AtomicReference<>(new CountDownLatch(0)); 111 Random r = new Random(); 112 public SlowMeCopro() { 113 } 114 115 @Override 116 public Optional<RegionObserver> getRegionObserver() { 117 return Optional.of(this); 118 } 119 120 @Override 121 public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e, 122 final Get get, final List<Cell> results) throws IOException { 123 slowdownCode(e); 124 } 125 126 @Override 127 public void preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e, 128 final Scan scan) throws IOException { 129 slowdownCode(e); 130 } 131 132 @Override 133 public boolean preScannerNext(final ObserverContext<RegionCoprocessorEnvironment> e, 134 final InternalScanner s, final List<Result> results, 135 final int limit, final boolean hasMore) throws IOException { 136 //this will slow down a certain next operation if the conditions are met. The slowness 137 //will allow the call to go to a replica 138 if (slowDownNext.get()) { 139 //have some "next" return successfully from the primary; hence countOfNext checked 140 if (countOfNext.incrementAndGet() == 2) { 141 sleepTime.set(2000); 142 slowdownCode(e); 143 } 144 } 145 return true; 146 } 147 148 private void slowdownCode(final ObserverContext<RegionCoprocessorEnvironment> e) { 149 if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() == 0) { 150 LOG.info("We're the primary replicas."); 151 CountDownLatch latch = getPrimaryCdl().get(); 152 try { 153 if (sleepTime.get() > 0) { 154 LOG.info("Sleeping for " + sleepTime.get() + " ms"); 155 Thread.sleep(sleepTime.get()); 156 } else if (latch.getCount() > 0) { 157 LOG.info("Waiting for the counterCountDownLatch"); 158 latch.await(2, TimeUnit.MINUTES); // To help the tests to finish. 159 if (latch.getCount() > 0) { 160 throw new RuntimeException("Can't wait more"); 161 } 162 } 163 } catch (InterruptedException e1) { 164 LOG.error(e1.toString(), e1); 165 } 166 } else { 167 LOG.info("We're not the primary replicas."); 168 CountDownLatch latch = getSecondaryCdl().get(); 169 try { 170 if (latch.getCount() > 0) { 171 LOG.info("Waiting for the secondary counterCountDownLatch"); 172 latch.await(2, TimeUnit.MINUTES); // To help the tests to finish. 173 if (latch.getCount() > 0) { 174 throw new RuntimeException("Can't wait more"); 175 } 176 } 177 } catch (InterruptedException e1) { 178 LOG.error(e1.toString(), e1); 179 } 180 } 181 } 182 183 public static AtomicReference<CountDownLatch> getPrimaryCdl() { 184 return primaryCdl; 185 } 186 187 public static AtomicReference<CountDownLatch> getSecondaryCdl() { 188 return secondaryCdl; 189 } 190 } 191 192 @BeforeClass 193 public static void beforeClass() throws Exception { 194 // enable store file refreshing 195 HTU.getConfiguration().setInt( 196 StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, REFRESH_PERIOD); 197 HTU.getConfiguration().setBoolean("hbase.client.log.scanner.activity", true); 198 HTU.getConfiguration().setBoolean(MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY, true); 199 ConnectionUtils.setupMasterlessConnection(HTU.getConfiguration()); 200 HTU.startMiniCluster(NB_SERVERS); 201 202 // Create table then get the single region for our new table. 203 HTableDescriptor hdt = HTU.createTableDescriptor(TestReplicasClient.class.getSimpleName()); 204 hdt.addCoprocessor(SlowMeCopro.class.getName()); 205 table = HTU.createTable(hdt, new byte[][]{f}, null); 206 207 try (RegionLocator locator = HTU.getConnection().getRegionLocator(hdt.getTableName())) { 208 hriPrimary = locator.getRegionLocation(row, false).getRegionInfo(); 209 } 210 211 // mock a secondary region info to open 212 hriSecondary = new HRegionInfo(hriPrimary.getTable(), hriPrimary.getStartKey(), 213 hriPrimary.getEndKey(), hriPrimary.isSplit(), hriPrimary.getRegionId(), 1); 214 215 // No master 216 LOG.info("Master is going to be stopped"); 217 TestRegionServerNoMaster.stopMasterAndAssignMeta(HTU); 218 Configuration c = new Configuration(HTU.getConfiguration()); 219 c.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); 220 LOG.info("Master has stopped"); 221 } 222 223 @AfterClass 224 public static void afterClass() throws Exception { 225 HRegionServer.TEST_SKIP_REPORTING_TRANSITION = false; 226 if (table != null) table.close(); 227 HTU.shutdownMiniCluster(); 228 } 229 230 @Before 231 public void before() throws IOException { 232 ((ClusterConnection) HTU.getAdmin().getConnection()).clearRegionCache(); 233 try { 234 openRegion(hriPrimary); 235 } catch (Exception ignored) { 236 } 237 try { 238 openRegion(hriSecondary); 239 } catch (Exception ignored) { 240 } 241 } 242 243 @After 244 public void after() throws IOException, KeeperException { 245 try { 246 closeRegion(hriSecondary); 247 } catch (Exception ignored) { 248 } 249 try { 250 closeRegion(hriPrimary); 251 } catch (Exception ignored) { 252 } 253 254 ((ClusterConnection) HTU.getAdmin().getConnection()).clearRegionCache(); 255 } 256 257 private HRegionServer getRS() { 258 return HTU.getMiniHBaseCluster().getRegionServer(0); 259 } 260 261 private void openRegion(HRegionInfo hri) throws Exception { 262 try { 263 if (isRegionOpened(hri)) return; 264 } catch (Exception e){} 265 // first version is '0' 266 AdminProtos.OpenRegionRequest orr = RequestConverter.buildOpenRegionRequest( 267 getRS().getServerName(), hri, null); 268 AdminProtos.OpenRegionResponse responseOpen = getRS().getRSRpcServices().openRegion(null, orr); 269 Assert.assertEquals(1, responseOpen.getOpeningStateCount()); 270 Assert.assertEquals(AdminProtos.OpenRegionResponse.RegionOpeningState.OPENED, 271 responseOpen.getOpeningState(0)); 272 checkRegionIsOpened(hri); 273 } 274 275 private void closeRegion(HRegionInfo hri) throws Exception { 276 AdminProtos.CloseRegionRequest crr = ProtobufUtil.buildCloseRegionRequest( 277 getRS().getServerName(), hri.getRegionName()); 278 AdminProtos.CloseRegionResponse responseClose = getRS() 279 .getRSRpcServices().closeRegion(null, crr); 280 Assert.assertTrue(responseClose.getClosed()); 281 282 checkRegionIsClosed(hri.getEncodedName()); 283 } 284 285 private void checkRegionIsOpened(HRegionInfo hri) throws Exception { 286 while (!getRS().getRegionsInTransitionInRS().isEmpty()) { 287 Thread.sleep(1); 288 } 289 } 290 291 private boolean isRegionOpened(HRegionInfo hri) throws Exception { 292 return getRS().getRegionByEncodedName(hri.getEncodedName()).isAvailable(); 293 } 294 295 private void checkRegionIsClosed(String encodedRegionName) throws Exception { 296 297 while (!getRS().getRegionsInTransitionInRS().isEmpty()) { 298 Thread.sleep(1); 299 } 300 301 try { 302 Assert.assertFalse(getRS().getRegionByEncodedName(encodedRegionName).isAvailable()); 303 } catch (NotServingRegionException expected) { 304 // That's how it work: if the region is closed we have an exception. 305 } 306 307 // We don't delete the znode here, because there is not always a znode. 308 } 309 310 private void flushRegion(HRegionInfo regionInfo) throws IOException { 311 TestRegionServerNoMaster.flushRegion(HTU, regionInfo); 312 } 313 314 @Test 315 public void testUseRegionWithoutReplica() throws Exception { 316 byte[] b1 = "testUseRegionWithoutReplica".getBytes(); 317 openRegion(hriSecondary); 318 SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(0)); 319 try { 320 Get g = new Get(b1); 321 Result r = table.get(g); 322 Assert.assertFalse(r.isStale()); 323 } finally { 324 closeRegion(hriSecondary); 325 } 326 } 327 328 @Test 329 public void testLocations() throws Exception { 330 byte[] b1 = "testLocations".getBytes(); 331 openRegion(hriSecondary); 332 ClusterConnection hc = (ClusterConnection) HTU.getAdmin().getConnection(); 333 334 try { 335 hc.clearRegionCache(); 336 RegionLocations rl = hc.locateRegion(table.getName(), b1, false, false); 337 Assert.assertEquals(2, rl.size()); 338 339 rl = hc.locateRegion(table.getName(), b1, true, false); 340 Assert.assertEquals(2, rl.size()); 341 342 hc.clearRegionCache(); 343 rl = hc.locateRegion(table.getName(), b1, true, false); 344 Assert.assertEquals(2, rl.size()); 345 346 rl = hc.locateRegion(table.getName(), b1, false, false); 347 Assert.assertEquals(2, rl.size()); 348 } finally { 349 closeRegion(hriSecondary); 350 } 351 } 352 353 @Test 354 public void testGetNoResultNoStaleRegionWithReplica() throws Exception { 355 byte[] b1 = "testGetNoResultNoStaleRegionWithReplica".getBytes(); 356 openRegion(hriSecondary); 357 358 try { 359 // A get works and is not stale 360 Get g = new Get(b1); 361 Result r = table.get(g); 362 Assert.assertFalse(r.isStale()); 363 } finally { 364 closeRegion(hriSecondary); 365 } 366 } 367 368 369 @Test 370 public void testGetNoResultStaleRegionWithReplica() throws Exception { 371 byte[] b1 = "testGetNoResultStaleRegionWithReplica".getBytes(); 372 openRegion(hriSecondary); 373 374 SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1)); 375 try { 376 Get g = new Get(b1); 377 g.setConsistency(Consistency.TIMELINE); 378 Result r = table.get(g); 379 Assert.assertTrue(r.isStale()); 380 } finally { 381 SlowMeCopro.getPrimaryCdl().get().countDown(); 382 closeRegion(hriSecondary); 383 } 384 } 385 386 @Test 387 public void testGetNoResultNotStaleSleepRegionWithReplica() throws Exception { 388 byte[] b1 = "testGetNoResultNotStaleSleepRegionWithReplica".getBytes(); 389 openRegion(hriSecondary); 390 391 try { 392 // We sleep; but we won't go to the stale region as we don't get the stale by default. 393 SlowMeCopro.sleepTime.set(2000); 394 Get g = new Get(b1); 395 Result r = table.get(g); 396 Assert.assertFalse(r.isStale()); 397 398 } finally { 399 SlowMeCopro.sleepTime.set(0); 400 closeRegion(hriSecondary); 401 } 402 } 403 404 @Test 405 public void testFlushTable() throws Exception { 406 openRegion(hriSecondary); 407 try { 408 flushRegion(hriPrimary); 409 flushRegion(hriSecondary); 410 411 Put p = new Put(row); 412 p.addColumn(f, row, row); 413 table.put(p); 414 415 flushRegion(hriPrimary); 416 flushRegion(hriSecondary); 417 } finally { 418 Delete d = new Delete(row); 419 table.delete(d); 420 closeRegion(hriSecondary); 421 } 422 } 423 424 @Test 425 public void testFlushPrimary() throws Exception { 426 openRegion(hriSecondary); 427 428 try { 429 flushRegion(hriPrimary); 430 431 Put p = new Put(row); 432 p.addColumn(f, row, row); 433 table.put(p); 434 435 flushRegion(hriPrimary); 436 } finally { 437 Delete d = new Delete(row); 438 table.delete(d); 439 closeRegion(hriSecondary); 440 } 441 } 442 443 @Test 444 public void testFlushSecondary() throws Exception { 445 openRegion(hriSecondary); 446 try { 447 flushRegion(hriSecondary); 448 449 Put p = new Put(row); 450 p.addColumn(f, row, row); 451 table.put(p); 452 453 flushRegion(hriSecondary); 454 } catch (TableNotFoundException expected) { 455 } finally { 456 Delete d = new Delete(row); 457 table.delete(d); 458 closeRegion(hriSecondary); 459 } 460 } 461 462 @Test 463 public void testUseRegionWithReplica() throws Exception { 464 byte[] b1 = "testUseRegionWithReplica".getBytes(); 465 openRegion(hriSecondary); 466 467 try { 468 // A simple put works, even if there here a second replica 469 Put p = new Put(b1); 470 p.addColumn(f, b1, b1); 471 table.put(p); 472 LOG.info("Put done"); 473 474 // A get works and is not stale 475 Get g = new Get(b1); 476 Result r = table.get(g); 477 Assert.assertFalse(r.isStale()); 478 Assert.assertFalse(r.getColumnCells(f, b1).isEmpty()); 479 LOG.info("get works and is not stale done"); 480 481 // Even if it we have to wait a little on the main region 482 SlowMeCopro.sleepTime.set(2000); 483 g = new Get(b1); 484 r = table.get(g); 485 Assert.assertFalse(r.isStale()); 486 Assert.assertFalse(r.getColumnCells(f, b1).isEmpty()); 487 SlowMeCopro.sleepTime.set(0); 488 LOG.info("sleep and is not stale done"); 489 490 // But if we ask for stale we will get it 491 SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1)); 492 g = new Get(b1); 493 g.setConsistency(Consistency.TIMELINE); 494 r = table.get(g); 495 Assert.assertTrue(r.isStale()); 496 Assert.assertTrue(r.getColumnCells(f, b1).isEmpty()); 497 SlowMeCopro.getPrimaryCdl().get().countDown(); 498 499 LOG.info("stale done"); 500 501 // exists works and is not stale 502 g = new Get(b1); 503 g.setCheckExistenceOnly(true); 504 r = table.get(g); 505 Assert.assertFalse(r.isStale()); 506 Assert.assertTrue(r.getExists()); 507 LOG.info("exists not stale done"); 508 509 // exists works on stale but don't see the put 510 SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1)); 511 g = new Get(b1); 512 g.setCheckExistenceOnly(true); 513 g.setConsistency(Consistency.TIMELINE); 514 r = table.get(g); 515 Assert.assertTrue(r.isStale()); 516 Assert.assertFalse("The secondary has stale data", r.getExists()); 517 SlowMeCopro.getPrimaryCdl().get().countDown(); 518 LOG.info("exists stale before flush done"); 519 520 flushRegion(hriPrimary); 521 flushRegion(hriSecondary); 522 LOG.info("flush done"); 523 Thread.sleep(1000 + REFRESH_PERIOD * 2); 524 525 // get works and is not stale 526 SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1)); 527 g = new Get(b1); 528 g.setConsistency(Consistency.TIMELINE); 529 r = table.get(g); 530 Assert.assertTrue(r.isStale()); 531 Assert.assertFalse(r.isEmpty()); 532 SlowMeCopro.getPrimaryCdl().get().countDown(); 533 LOG.info("stale done"); 534 535 // exists works on stale and we see the put after the flush 536 SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1)); 537 g = new Get(b1); 538 g.setCheckExistenceOnly(true); 539 g.setConsistency(Consistency.TIMELINE); 540 r = table.get(g); 541 Assert.assertTrue(r.isStale()); 542 Assert.assertTrue(r.getExists()); 543 SlowMeCopro.getPrimaryCdl().get().countDown(); 544 LOG.info("exists stale after flush done"); 545 546 } finally { 547 SlowMeCopro.getPrimaryCdl().get().countDown(); 548 SlowMeCopro.sleepTime.set(0); 549 Delete d = new Delete(b1); 550 table.delete(d); 551 closeRegion(hriSecondary); 552 } 553 } 554 555 @Test 556 public void testHedgedRead() throws Exception { 557 byte[] b1 = "testHedgedRead".getBytes(); 558 openRegion(hriSecondary); 559 560 try { 561 // A simple put works, even if there here a second replica 562 Put p = new Put(b1); 563 p.addColumn(f, b1, b1); 564 table.put(p); 565 LOG.info("Put done"); 566 567 // A get works and is not stale 568 Get g = new Get(b1); 569 Result r = table.get(g); 570 Assert.assertFalse(r.isStale()); 571 Assert.assertFalse(r.getColumnCells(f, b1).isEmpty()); 572 LOG.info("get works and is not stale done"); 573 574 //reset 575 ClusterConnection connection = (ClusterConnection) HTU.getConnection(); 576 Counter hedgedReadOps = connection.getConnectionMetrics().hedgedReadOps; 577 Counter hedgedReadWin = connection.getConnectionMetrics().hedgedReadWin; 578 hedgedReadOps.dec(hedgedReadOps.getCount()); 579 hedgedReadWin.dec(hedgedReadWin.getCount()); 580 581 // Wait a little on the main region, just enough to happen once hedged read 582 // and hedged read did not returned faster 583 int primaryCallTimeoutMicroSecond = connection.getConnectionConfiguration().getPrimaryCallTimeoutMicroSecond(); 584 SlowMeCopro.sleepTime.set(TimeUnit.MICROSECONDS.toMillis(primaryCallTimeoutMicroSecond)); 585 SlowMeCopro.getSecondaryCdl().set(new CountDownLatch(1)); 586 g = new Get(b1); 587 g.setConsistency(Consistency.TIMELINE); 588 r = table.get(g); 589 Assert.assertFalse(r.isStale()); 590 Assert.assertFalse(r.getColumnCells(f, b1).isEmpty()); 591 Assert.assertEquals(1, hedgedReadOps.getCount()); 592 Assert.assertEquals(0, hedgedReadWin.getCount()); 593 SlowMeCopro.sleepTime.set(0); 594 SlowMeCopro.getSecondaryCdl().get().countDown(); 595 LOG.info("hedged read occurred but not faster"); 596 597 598 // But if we ask for stale we will get it and hedged read returned faster 599 SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1)); 600 g = new Get(b1); 601 g.setConsistency(Consistency.TIMELINE); 602 r = table.get(g); 603 Assert.assertTrue(r.isStale()); 604 Assert.assertTrue(r.getColumnCells(f, b1).isEmpty()); 605 Assert.assertEquals(2, hedgedReadOps.getCount()); 606 Assert.assertEquals(1, hedgedReadWin.getCount()); 607 SlowMeCopro.getPrimaryCdl().get().countDown(); 608 LOG.info("hedged read occurred and faster"); 609 610 } finally { 611 SlowMeCopro.getPrimaryCdl().get().countDown(); 612 SlowMeCopro.getSecondaryCdl().get().countDown(); 613 SlowMeCopro.sleepTime.set(0); 614 Delete d = new Delete(b1); 615 table.delete(d); 616 closeRegion(hriSecondary); 617 } 618 } 619 620 @Ignore // Disabled because it is flakey. Fails 17% on constrained GCE. %3 on Apache. 621 @Test 622 public void testCancelOfMultiGet() throws Exception { 623 openRegion(hriSecondary); 624 try { 625 List<Put> puts = new ArrayList<>(2); 626 byte[] b1 = Bytes.toBytes("testCancelOfMultiGet" + 0); 627 Put p = new Put(b1); 628 p.addColumn(f, b1, b1); 629 puts.add(p); 630 631 byte[] b2 = Bytes.toBytes("testCancelOfMultiGet" + 1); 632 p = new Put(b2); 633 p.addColumn(f, b2, b2); 634 puts.add(p); 635 table.put(puts); 636 LOG.debug("PUT done"); 637 flushRegion(hriPrimary); 638 LOG.info("flush done"); 639 640 Thread.sleep(1000 + REFRESH_PERIOD * 2); 641 642 AsyncProcess ap = ((ClusterConnection) HTU.getConnection()).getAsyncProcess(); 643 644 // Make primary slowdown 645 SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1)); 646 647 List<Get> gets = new ArrayList<>(); 648 Get g = new Get(b1); 649 g.setCheckExistenceOnly(true); 650 g.setConsistency(Consistency.TIMELINE); 651 gets.add(g); 652 g = new Get(b2); 653 g.setCheckExistenceOnly(true); 654 g.setConsistency(Consistency.TIMELINE); 655 gets.add(g); 656 Object[] results = new Object[2]; 657 658 int operationTimeout = ((ClusterConnection) HTU.getConnection()).getConnectionConfiguration().getOperationTimeout(); 659 int readTimeout = ((ClusterConnection) HTU.getConnection()).getConnectionConfiguration().getReadRpcTimeout(); 660 AsyncProcessTask task = AsyncProcessTask.newBuilder() 661 .setPool(HTable.getDefaultExecutor(HTU.getConfiguration())) 662 .setTableName(table.getName()) 663 .setRowAccess(gets) 664 .setResults(results) 665 .setOperationTimeout(operationTimeout) 666 .setRpcTimeout(readTimeout) 667 .build(); 668 AsyncRequestFuture reqs = ap.submit(task); 669 reqs.waitUntilDone(); 670 // verify we got the right results back 671 for (Object r : results) { 672 Assert.assertTrue(((Result)r).isStale()); 673 Assert.assertTrue(((Result)r).getExists()); 674 } 675 Set<CancellableRegionServerCallable> set = 676 ((AsyncRequestFutureImpl<?>)reqs).getCallsInProgress(); 677 // verify we did cancel unneeded calls 678 Assert.assertTrue(!set.isEmpty()); 679 for (CancellableRegionServerCallable m : set) { 680 Assert.assertTrue(m.isCancelled()); 681 } 682 } finally { 683 SlowMeCopro.getPrimaryCdl().get().countDown(); 684 SlowMeCopro.sleepTime.set(0); 685 SlowMeCopro.slowDownNext.set(false); 686 SlowMeCopro.countOfNext.set(0); 687 for (int i = 0; i < 2; i++) { 688 byte[] b1 = Bytes.toBytes("testCancelOfMultiGet" + i); 689 Delete d = new Delete(b1); 690 table.delete(d); 691 } 692 closeRegion(hriSecondary); 693 } 694 } 695 696 @Test 697 public void testScanWithReplicas() throws Exception { 698 //simple scan 699 runMultipleScansOfOneType(false, false); 700 } 701 702 @Test 703 public void testSmallScanWithReplicas() throws Exception { 704 //small scan 705 runMultipleScansOfOneType(false, true); 706 } 707 708 @Test 709 public void testReverseScanWithReplicas() throws Exception { 710 //reverse scan 711 runMultipleScansOfOneType(true, false); 712 } 713 714 @Test 715 public void testCancelOfScan() throws Exception { 716 openRegion(hriSecondary); 717 int NUMROWS = 100; 718 try { 719 for (int i = 0; i < NUMROWS; i++) { 720 byte[] b1 = Bytes.toBytes("testUseRegionWithReplica" + i); 721 Put p = new Put(b1); 722 p.addColumn(f, b1, b1); 723 table.put(p); 724 } 725 LOG.debug("PUT done"); 726 int caching = 20; 727 byte[] start; 728 start = Bytes.toBytes("testUseRegionWithReplica" + 0); 729 730 flushRegion(hriPrimary); 731 LOG.info("flush done"); 732 Thread.sleep(1000 + REFRESH_PERIOD * 2); 733 734 // now make some 'next' calls slow 735 SlowMeCopro.slowDownNext.set(true); 736 SlowMeCopro.countOfNext.set(0); 737 SlowMeCopro.sleepTime.set(5000); 738 739 Scan scan = new Scan(start); 740 scan.setCaching(caching); 741 scan.setConsistency(Consistency.TIMELINE); 742 ResultScanner scanner = table.getScanner(scan); 743 Iterator<Result> iter = scanner.iterator(); 744 iter.next(); 745 Assert.assertTrue(((ClientScanner)scanner).isAnyRPCcancelled()); 746 SlowMeCopro.slowDownNext.set(false); 747 SlowMeCopro.countOfNext.set(0); 748 } finally { 749 SlowMeCopro.getPrimaryCdl().get().countDown(); 750 SlowMeCopro.sleepTime.set(0); 751 SlowMeCopro.slowDownNext.set(false); 752 SlowMeCopro.countOfNext.set(0); 753 for (int i = 0; i < NUMROWS; i++) { 754 byte[] b1 = Bytes.toBytes("testUseRegionWithReplica" + i); 755 Delete d = new Delete(b1); 756 table.delete(d); 757 } 758 closeRegion(hriSecondary); 759 } 760 } 761 762 private void runMultipleScansOfOneType(boolean reversed, boolean small) throws Exception { 763 openRegion(hriSecondary); 764 int NUMROWS = 100; 765 int NUMCOLS = 10; 766 try { 767 for (int i = 0; i < NUMROWS; i++) { 768 byte[] b1 = Bytes.toBytes("testUseRegionWithReplica" + i); 769 for (int col = 0; col < NUMCOLS; col++) { 770 Put p = new Put(b1); 771 String qualifier = "qualifer" + col; 772 KeyValue kv = new KeyValue(b1, f, qualifier.getBytes()); 773 p.add(kv); 774 table.put(p); 775 } 776 } 777 LOG.debug("PUT done"); 778 int caching = 20; 779 long maxResultSize = Long.MAX_VALUE; 780 781 byte[] start; 782 if (reversed) start = Bytes.toBytes("testUseRegionWithReplica" + (NUMROWS - 1)); 783 else start = Bytes.toBytes("testUseRegionWithReplica" + 0); 784 785 scanWithReplicas(reversed, small, Consistency.TIMELINE, caching, maxResultSize, 786 start, NUMROWS, NUMCOLS, false, false); 787 788 // Even if we were to slow the server down, unless we ask for stale 789 // we won't get it 790 SlowMeCopro.sleepTime.set(5000); 791 scanWithReplicas(reversed, small, Consistency.STRONG, caching, maxResultSize, start, NUMROWS, 792 NUMCOLS, false, false); 793 SlowMeCopro.sleepTime.set(0); 794 795 flushRegion(hriPrimary); 796 LOG.info("flush done"); 797 Thread.sleep(1000 + REFRESH_PERIOD * 2); 798 799 //Now set the flag to get a response even if stale 800 SlowMeCopro.sleepTime.set(5000); 801 scanWithReplicas(reversed, small, Consistency.TIMELINE, caching, maxResultSize, 802 start, NUMROWS, NUMCOLS, true, false); 803 SlowMeCopro.sleepTime.set(0); 804 805 // now make some 'next' calls slow 806 SlowMeCopro.slowDownNext.set(true); 807 SlowMeCopro.countOfNext.set(0); 808 scanWithReplicas(reversed, small, Consistency.TIMELINE, caching, maxResultSize, start, 809 NUMROWS, NUMCOLS, true, true); 810 SlowMeCopro.slowDownNext.set(false); 811 SlowMeCopro.countOfNext.set(0); 812 813 // Make sure we do not get stale data.. 814 SlowMeCopro.sleepTime.set(5000); 815 scanWithReplicas(reversed, small, Consistency.STRONG, caching, maxResultSize, 816 start, NUMROWS, NUMCOLS, false, false); 817 SlowMeCopro.sleepTime.set(0); 818 819 // While the next calls are slow, set maxResultSize to 1 so that some partial results will be 820 // returned from the server before the replica switch occurs. 821 maxResultSize = 1; 822 SlowMeCopro.slowDownNext.set(true); 823 SlowMeCopro.countOfNext.set(0); 824 scanWithReplicas(reversed, small, Consistency.TIMELINE, caching, maxResultSize, start, 825 NUMROWS, NUMCOLS, true, true); 826 maxResultSize = Long.MAX_VALUE; 827 SlowMeCopro.slowDownNext.set(false); 828 SlowMeCopro.countOfNext.set(0); 829 } finally { 830 SlowMeCopro.getPrimaryCdl().get().countDown(); 831 SlowMeCopro.sleepTime.set(0); 832 SlowMeCopro.slowDownNext.set(false); 833 SlowMeCopro.countOfNext.set(0); 834 for (int i = 0; i < NUMROWS; i++) { 835 byte[] b1 = Bytes.toBytes("testUseRegionWithReplica" + i); 836 Delete d = new Delete(b1); 837 table.delete(d); 838 } 839 closeRegion(hriSecondary); 840 } 841 } 842 843 private void scanWithReplicas(boolean reversed, boolean small, Consistency consistency, 844 int caching, long maxResultSize, byte[] startRow, int numRows, int numCols, 845 boolean staleExpected, boolean slowNext) 846 throws Exception { 847 Scan scan = new Scan(startRow); 848 scan.setCaching(caching); 849 scan.setMaxResultSize(maxResultSize); 850 scan.setReversed(reversed); 851 scan.setSmall(small); 852 scan.setConsistency(consistency); 853 ResultScanner scanner = table.getScanner(scan); 854 Iterator<Result> iter = scanner.iterator(); 855 856 // Maps of row keys that we have seen so far 857 HashMap<String, Boolean> map = new HashMap<>(); 858 859 // Tracked metrics 860 int rowCount = 0; 861 int cellCount = 0; 862 int countOfStale = 0; 863 864 while (iter.hasNext()) { 865 rowCount++; 866 Result r = iter.next(); 867 String row = new String(r.getRow()); 868 869 if (map.containsKey(row)) { 870 throw new Exception("Unexpected scan result. Repeated row " + Bytes.toString(r.getRow())); 871 } 872 873 map.put(row, true); 874 875 for (Cell cell : r.rawCells()) { 876 cellCount++; 877 } 878 879 if (!slowNext) Assert.assertTrue(r.isStale() == staleExpected); 880 if (r.isStale()) countOfStale++; 881 } 882 Assert.assertTrue("Count of rows " + rowCount + " num rows expected " + numRows, 883 rowCount == numRows); 884 Assert.assertTrue("Count of cells: " + cellCount + " cells expected: " + numRows * numCols, 885 cellCount == (numRows * numCols)); 886 887 if (slowNext) { 888 LOG.debug("Count of Stale " + countOfStale); 889 Assert.assertTrue(countOfStale > 1); 890 891 // If the scan was configured in such a way that a full row was NOT retrieved before the 892 // replica switch occurred, then it is possible that all rows were stale 893 if (maxResultSize != Long.MAX_VALUE) { 894 Assert.assertTrue(countOfStale <= numRows); 895 } else { 896 Assert.assertTrue(countOfStale < numRows); 897 } 898 } 899 } 900}