001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.regionserver.RSRpcServices.DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA; 021import static org.junit.jupiter.api.Assertions.assertEquals; 022import static org.junit.jupiter.api.Assertions.assertTrue; 023import static org.junit.jupiter.api.Assertions.fail; 024import static org.mockito.Mockito.when; 025 026import java.io.IOException; 027import java.util.ArrayList; 028import java.util.List; 029import java.util.concurrent.Callable; 030import org.apache.commons.lang3.exception.ExceptionUtils; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.fs.FileSystem; 033import org.apache.hadoop.fs.Path; 034import org.apache.hadoop.hbase.Cell; 035import org.apache.hadoop.hbase.CellComparator; 036import org.apache.hadoop.hbase.CellComparatorImpl; 037import org.apache.hadoop.hbase.CellUtil; 038import org.apache.hadoop.hbase.ExtendedCell; 039import org.apache.hadoop.hbase.HBaseTestingUtil; 040import org.apache.hadoop.hbase.HConstants; 041import org.apache.hadoop.hbase.HTestConst; 042import org.apache.hadoop.hbase.KeyValue; 043import org.apache.hadoop.hbase.TableName; 044import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer; 045import org.apache.hadoop.hbase.client.AsyncConnection; 046import org.apache.hadoop.hbase.client.AsyncTable; 047import org.apache.hadoop.hbase.client.ConnectionFactory; 048import org.apache.hadoop.hbase.client.Put; 049import org.apache.hadoop.hbase.client.RegionInfo; 050import org.apache.hadoop.hbase.client.Result; 051import org.apache.hadoop.hbase.client.ResultScanner; 052import org.apache.hadoop.hbase.client.Scan; 053import org.apache.hadoop.hbase.client.ScanPerNextResultScanner; 054import org.apache.hadoop.hbase.client.Table; 055import org.apache.hadoop.hbase.client.TableDescriptor; 056import org.apache.hadoop.hbase.filter.Filter; 057import org.apache.hadoop.hbase.filter.FilterBase; 058import org.apache.hadoop.hbase.ipc.HBaseRpcController; 059import org.apache.hadoop.hbase.ipc.RpcCall; 060import org.apache.hadoop.hbase.testclassification.LargeTests; 061import org.apache.hadoop.hbase.util.Bytes; 062import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 063import org.apache.hadoop.hbase.util.Threads; 064import org.apache.hadoop.hbase.wal.WAL; 065import org.junit.jupiter.api.AfterAll; 066import org.junit.jupiter.api.AfterEach; 067import org.junit.jupiter.api.BeforeAll; 068import org.junit.jupiter.api.BeforeEach; 069import org.junit.jupiter.api.Tag; 070import org.junit.jupiter.api.Test; 071import org.mockito.Mockito; 072 073import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 074import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 075import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 076 077import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; 078import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; 079 080/** 081 * Here we test to make sure that scans return the expected Results when the server is sending the 082 * Client heartbeat messages. Heartbeat messages are essentially keep-alive messages (they prevent 083 * the scanner on the client side from timing out). A heartbeat message is sent from the server to 084 * the client when the server has exceeded the time limit during the processing of the scan. When 085 * the time limit is reached, the server will return to the Client whatever Results it has 086 * accumulated (potentially empty). 087 */ 088@Tag(LargeTests.TAG) 089public class TestScannerHeartbeatMessages { 090 091 private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 092 093 private static AsyncConnection CONN; 094 095 /** 096 * Table configuration 097 */ 098 private static TableName TABLE_NAME = TableName.valueOf("testScannerHeartbeatMessagesTable"); 099 100 private static int NUM_ROWS = 5; 101 private static byte[] ROW = Bytes.toBytes("testRow"); 102 private static byte[][] ROWS = HTestConst.makeNAscii(ROW, NUM_ROWS); 103 104 private static int NUM_FAMILIES = 4; 105 private static byte[] FAMILY = Bytes.toBytes("testFamily"); 106 private static byte[][] FAMILIES = HTestConst.makeNAscii(FAMILY, NUM_FAMILIES); 107 108 private static int NUM_QUALIFIERS = 3; 109 private static byte[] QUALIFIER = Bytes.toBytes("testQualifier"); 110 private static byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, NUM_QUALIFIERS); 111 112 private static int VALUE_SIZE = 128; 113 private static byte[] VALUE = Bytes.createMaxByteArray(VALUE_SIZE); 114 115 // The time limit should be based on the rpc timeout at client, or the client will regards 116 // the request as timeout before server return a heartbeat. 117 private static int SERVER_TIMEOUT = 60000; 118 119 // Time, in milliseconds, that the client will wait for a response from the server before timing 120 // out. This value is used server side to determine when it is necessary to send a heartbeat 121 // message to the client. Time limit will be 500 ms. 122 private static int CLIENT_TIMEOUT = 1000; 123 124 // In this test, we sleep after reading each row. So we should make sure after we get some number 125 // of rows and sleep same times we must reach time limit, and do not timeout after next sleeping. 126 private static int DEFAULT_ROW_SLEEP_TIME = 300; 127 128 // Similar with row sleep time. 129 private static int DEFAULT_CF_SLEEP_TIME = 300; 130 131 @BeforeAll 132 public static void setUpBeforeClass() throws Exception { 133 Configuration conf = TEST_UTIL.getConfiguration(); 134 135 conf.setStrings(HConstants.REGION_IMPL, HeartbeatHRegion.class.getName()); 136 conf.setStrings(HConstants.REGION_SERVER_IMPL, HeartbeatHRegionServer.class.getName()); 137 conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, SERVER_TIMEOUT); 138 conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, SERVER_TIMEOUT); 139 conf.setInt(HConstants.HBASE_CLIENT_PAUSE, 1); 140 141 // Check the timeout condition after every cell 142 conf.setLong(StoreScanner.HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK, 1); 143 TEST_UTIL.startMiniCluster(1); 144 145 createTestTable(TABLE_NAME, ROWS, FAMILIES, QUALIFIERS, VALUE); 146 147 Configuration newConf = new Configuration(conf); 148 newConf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, CLIENT_TIMEOUT); 149 newConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, CLIENT_TIMEOUT); 150 CONN = ConnectionFactory.createAsyncConnection(newConf).get(); 151 } 152 153 @Test 154 public void testTimeLimitAccountsForQueueTime() throws IOException, InterruptedException { 155 HRegionServer rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0); 156 RSRpcServices services = new RSRpcServices(rs); 157 RpcCall mockRpcCall = Mockito.mock(RpcCall.class); 158 // first return 180 (minimal queuing), then 120 (more queueing), then 101 (heavy queueing) 159 // finally, 25 is fatal levels of queueing -- exceeding timeout 160 when(mockRpcCall.getReceiveTime()).thenReturn(180L, 120L, 101L, 25L); 161 162 // assume timeout of 100ms 163 HBaseRpcController mockController = Mockito.mock(HBaseRpcController.class); 164 when(mockController.getCallTimeout()).thenReturn(100); 165 166 // current time is 100, which we'll subtract from 90 and 50 to generate some time deltas 167 EnvironmentEdgeManager.injectEdge(() -> 200L); 168 169 try { 170 // we queued for 20ms, leaving 80ms of timeout, which we divide by 2 171 assertEquals(200 + (100 - 20) / 2, services.getTimeLimit(mockRpcCall, mockController, true)); 172 // we queued for 80ms, leaving 20ms of timeout, which we divide by 2 173 assertEquals(200 + (100 - 80) / 2, services.getTimeLimit(mockRpcCall, mockController, true)); 174 // we queued for 99ms of 100ms timeout, leaving only 1ms. we fall back to default minimum 175 assertEquals(200 + DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA, 176 services.getTimeLimit(mockRpcCall, mockController, true)); 177 // lastly, we queue for 175ms of 100ms timeout. this should be very rare since we drop 178 // timed out calls in the queue. in this case we still fallback on default minimum for now. 179 assertEquals(200 + DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA, 180 services.getTimeLimit(mockRpcCall, mockController, true)); 181 } finally { 182 EnvironmentEdgeManager.reset(); 183 } 184 185 } 186 187 static void createTestTable(TableName name, byte[][] rows, byte[][] families, byte[][] qualifiers, 188 byte[] cellValue) throws IOException { 189 Table ht = TEST_UTIL.createTable(name, families); 190 List<Put> puts = createPuts(rows, families, qualifiers, cellValue); 191 ht.put(puts); 192 } 193 194 /** 195 * Make puts to put the input value into each combination of row, family, and qualifier 196 */ 197 static ArrayList<Put> createPuts(byte[][] rows, byte[][] families, byte[][] qualifiers, 198 byte[] value) throws IOException { 199 Put put; 200 ArrayList<Put> puts = new ArrayList<>(); 201 202 for (int row = 0; row < rows.length; row++) { 203 put = new Put(rows[row]); 204 for (int fam = 0; fam < families.length; fam++) { 205 for (int qual = 0; qual < qualifiers.length; qual++) { 206 KeyValue kv = new KeyValue(rows[row], families[fam], qualifiers[qual], qual, value); 207 put.add(kv); 208 } 209 } 210 puts.add(put); 211 } 212 213 return puts; 214 } 215 216 @AfterAll 217 public static void tearDownAfterClass() throws Exception { 218 Closeables.close(CONN, true); 219 TEST_UTIL.shutdownMiniCluster(); 220 } 221 222 @BeforeEach 223 public void setupBeforeTest() throws Exception { 224 disableSleeping(); 225 } 226 227 @AfterEach 228 public void teardownAfterTest() throws Exception { 229 disableSleeping(); 230 } 231 232 /** 233 * Run the test callable when heartbeats are enabled/disabled. We expect all tests to only pass 234 * when heartbeat messages are enabled (otherwise the test is pointless). When heartbeats are 235 * disabled, the test should throw an exception. 236 */ 237 private void testImportanceOfHeartbeats(Callable<Void> testCallable) throws InterruptedException { 238 HeartbeatRPCServices.heartbeatsEnabled = true; 239 240 try { 241 testCallable.call(); 242 } catch (Exception e) { 243 fail("Heartbeat messages are enabled, exceptions should NOT be thrown. Exception trace:" 244 + ExceptionUtils.getStackTrace(e)); 245 } 246 247 HeartbeatRPCServices.heartbeatsEnabled = false; 248 try { 249 testCallable.call(); 250 } catch (Exception e) { 251 return; 252 } finally { 253 HeartbeatRPCServices.heartbeatsEnabled = true; 254 } 255 fail("Heartbeats messages are disabled, an exception should be thrown. If an exception " 256 + " is not thrown, the test case is not testing the importance of heartbeat messages"); 257 } 258 259 /** 260 * Test the case that the time limit for the scan is reached after each full row of cells is 261 * fetched. 262 */ 263 @Test 264 public void testHeartbeatBetweenRows() throws Exception { 265 testImportanceOfHeartbeats(new Callable<Void>() { 266 267 @Override 268 public Void call() throws Exception { 269 // Configure the scan so that it can read the entire table in a single RPC. We want to test 270 // the case where a scan stops on the server side due to a time limit 271 Scan scan = new Scan(); 272 scan.setMaxResultSize(Long.MAX_VALUE); 273 scan.setCaching(Integer.MAX_VALUE); 274 275 testEquivalenceOfScanWithHeartbeats(scan, DEFAULT_ROW_SLEEP_TIME, -1, false); 276 return null; 277 } 278 }); 279 } 280 281 /** 282 * Test the case that the time limit for scans is reached in between column families 283 */ 284 @Test 285 public void testHeartbeatBetweenColumnFamilies() throws Exception { 286 testImportanceOfHeartbeats(new Callable<Void>() { 287 @Override 288 public Void call() throws Exception { 289 // Configure the scan so that it can read the entire table in a single RPC. We want to test 290 // the case where a scan stops on the server side due to a time limit 291 Scan baseScan = new Scan(); 292 baseScan.setMaxResultSize(Long.MAX_VALUE); 293 baseScan.setCaching(Integer.MAX_VALUE); 294 295 // Copy the scan before each test. When a scan object is used by a scanner, some of its 296 // fields may be changed such as start row 297 Scan scanCopy = new Scan(baseScan); 298 testEquivalenceOfScanWithHeartbeats(scanCopy, -1, DEFAULT_CF_SLEEP_TIME, false); 299 scanCopy = new Scan(baseScan); 300 testEquivalenceOfScanWithHeartbeats(scanCopy, -1, DEFAULT_CF_SLEEP_TIME, true); 301 return null; 302 } 303 }); 304 } 305 306 public static class SparseCellFilter extends FilterBase { 307 308 @Override 309 public ReturnCode filterCell(final Cell v) throws IOException { 310 try { 311 Thread.sleep(CLIENT_TIMEOUT / 2 + 100); 312 } catch (InterruptedException e) { 313 Thread.currentThread().interrupt(); 314 } 315 return Bytes.equals(CellUtil.cloneRow(v), ROWS[NUM_ROWS - 1]) 316 ? ReturnCode.INCLUDE 317 : ReturnCode.SKIP; 318 } 319 320 public static Filter parseFrom(final byte[] pbBytes) { 321 return new SparseCellFilter(); 322 } 323 } 324 325 public static class SparseRowFilter extends FilterBase { 326 327 @Override 328 public boolean filterRowKey(Cell cell) throws IOException { 329 try { 330 Thread.sleep(CLIENT_TIMEOUT / 2 - 100); 331 } catch (InterruptedException e) { 332 Thread.currentThread().interrupt(); 333 } 334 return !Bytes.equals(CellUtil.cloneRow(cell), ROWS[NUM_ROWS - 1]); 335 } 336 337 public static Filter parseFrom(final byte[] pbBytes) { 338 return new SparseRowFilter(); 339 } 340 } 341 342 /** 343 * Test the case that there is a filter which filters most of cells 344 */ 345 @Test 346 public void testHeartbeatWithSparseCellFilter() throws Exception { 347 testImportanceOfHeartbeats(new Callable<Void>() { 348 @Override 349 public Void call() throws Exception { 350 Scan scan = new Scan(); 351 scan.setMaxResultSize(Long.MAX_VALUE); 352 scan.setCaching(Integer.MAX_VALUE); 353 scan.setFilter(new SparseCellFilter()); 354 try (ScanPerNextResultScanner scanner = 355 new ScanPerNextResultScanner(CONN.getTable(TABLE_NAME), scan)) { 356 int num = 0; 357 while (scanner.next() != null) { 358 num++; 359 } 360 assertEquals(1, num); 361 } 362 363 scan = new Scan(); 364 scan.setMaxResultSize(Long.MAX_VALUE); 365 scan.setCaching(Integer.MAX_VALUE); 366 scan.setFilter(new SparseCellFilter()); 367 scan.setAllowPartialResults(true); 368 try (ScanPerNextResultScanner scanner = 369 new ScanPerNextResultScanner(CONN.getTable(TABLE_NAME), scan)) { 370 int num = 0; 371 while (scanner.next() != null) { 372 num++; 373 } 374 assertEquals(NUM_FAMILIES * NUM_QUALIFIERS, num); 375 } 376 377 return null; 378 } 379 }); 380 } 381 382 /** 383 * Test the case that there is a filter which filters most of rows 384 */ 385 @Test 386 public void testHeartbeatWithSparseRowFilter() throws Exception { 387 testImportanceOfHeartbeats(new Callable<Void>() { 388 @Override 389 public Void call() throws Exception { 390 Scan scan = new Scan(); 391 scan.setMaxResultSize(Long.MAX_VALUE); 392 scan.setCaching(Integer.MAX_VALUE); 393 scan.setFilter(new SparseRowFilter()); 394 try (ScanPerNextResultScanner scanner = 395 new ScanPerNextResultScanner(CONN.getTable(TABLE_NAME), scan)) { 396 int num = 0; 397 while (scanner.next() != null) { 398 num++; 399 } 400 assertEquals(1, num); 401 } 402 403 return null; 404 } 405 }); 406 } 407 408 /** 409 * Test the equivalence of a scan versus the same scan executed when heartbeat messages are 410 * necessary 411 * @param scan The scan configuration being tested 412 * @param rowSleepTime The time to sleep between fetches of row cells 413 * @param cfSleepTime The time to sleep between fetches of column family cells 414 * @param sleepBeforeCf set to true when column family sleeps should occur before the cells for 415 * that column family are fetched 416 */ 417 private void testEquivalenceOfScanWithHeartbeats(final Scan scan, int rowSleepTime, 418 int cfSleepTime, boolean sleepBeforeCf) throws Exception { 419 disableSleeping(); 420 AsyncTable<AdvancedScanResultConsumer> table = CONN.getTable(TABLE_NAME); 421 final ResultScanner scanner = new ScanPerNextResultScanner(table, scan); 422 final ResultScanner scannerWithHeartbeats = new ScanPerNextResultScanner(table, scan); 423 424 Result r1 = null; 425 Result r2 = null; 426 427 while ((r1 = scanner.next()) != null) { 428 // Enforce the specified sleep conditions during calls to the heartbeat scanner 429 configureSleepTime(rowSleepTime, cfSleepTime, sleepBeforeCf); 430 r2 = scannerWithHeartbeats.next(); 431 disableSleeping(); 432 433 assertTrue(r2 != null); 434 try { 435 Result.compareResults(r1, r2); 436 } catch (Exception e) { 437 fail(e.getMessage()); 438 } 439 } 440 441 assertTrue(scannerWithHeartbeats.next() == null); 442 scanner.close(); 443 scannerWithHeartbeats.close(); 444 } 445 446 /** 447 * Helper method for setting the time to sleep between rows and column families. If a sleep time 448 * is negative then that sleep will be disabled 449 */ 450 private static void configureSleepTime(int rowSleepTime, int cfSleepTime, boolean sleepBeforeCf) { 451 HeartbeatHRegion.sleepBetweenRows = rowSleepTime > 0; 452 HeartbeatHRegion.rowSleepTime = rowSleepTime; 453 454 HeartbeatHRegion.sleepBetweenColumnFamilies = cfSleepTime > 0; 455 HeartbeatHRegion.columnFamilySleepTime = cfSleepTime; 456 HeartbeatHRegion.sleepBeforeColumnFamily = sleepBeforeCf; 457 } 458 459 /** 460 * Disable the sleeping mechanism server side. 461 */ 462 private static void disableSleeping() { 463 HeartbeatHRegion.sleepBetweenRows = false; 464 HeartbeatHRegion.sleepBetweenColumnFamilies = false; 465 } 466 467 /** 468 * Custom HRegionServer instance that instantiates {@link HeartbeatRPCServices} in place of 469 * {@link RSRpcServices} to allow us to toggle support for heartbeat messages 470 */ 471 private static class HeartbeatHRegionServer extends HRegionServer { 472 public HeartbeatHRegionServer(Configuration conf) throws IOException, InterruptedException { 473 super(conf); 474 } 475 476 @Override 477 protected RSRpcServices createRpcServices() throws IOException { 478 return new HeartbeatRPCServices(this); 479 } 480 } 481 482 /** 483 * Custom RSRpcServices instance that allows heartbeat support to be toggled 484 */ 485 private static class HeartbeatRPCServices extends RSRpcServices { 486 private static volatile boolean heartbeatsEnabled = true; 487 488 public HeartbeatRPCServices(HRegionServer rs) throws IOException { 489 super(rs); 490 } 491 492 @Override 493 public ScanResponse scan(RpcController controller, ScanRequest request) 494 throws ServiceException { 495 ScanRequest.Builder builder = ScanRequest.newBuilder(request); 496 builder.setClientHandlesHeartbeats(heartbeatsEnabled); 497 return super.scan(controller, builder.build()); 498 } 499 } 500 501 /** 502 * Custom HRegion class that instantiates {@link RegionScanner}s with configurable sleep times 503 * between fetches of row Results and/or column family cells. Useful for emulating an instance 504 * where the server is taking a long time to process a client's scan request 505 */ 506 private static class HeartbeatHRegion extends HRegion { 507 // Row sleeps occur AFTER each row worth of cells is retrieved. 508 private static volatile int rowSleepTime = DEFAULT_ROW_SLEEP_TIME; 509 private static volatile boolean sleepBetweenRows = false; 510 511 // The sleep for column families can be initiated before or after we fetch the cells for the 512 // column family. If the sleep occurs BEFORE then the time limits will be reached inside 513 // StoreScanner while we are fetching individual cells. If the sleep occurs AFTER then the time 514 // limit will be reached inside RegionScanner after all the cells for a column family have been 515 // retrieved. 516 private static volatile boolean sleepBeforeColumnFamily = false; 517 private static volatile int columnFamilySleepTime = DEFAULT_CF_SLEEP_TIME; 518 private static volatile boolean sleepBetweenColumnFamilies = false; 519 520 public HeartbeatHRegion(Path tableDir, WAL wal, FileSystem fs, Configuration confParam, 521 RegionInfo regionInfo, TableDescriptor htd, RegionServerServices rsServices) { 522 super(tableDir, wal, fs, confParam, regionInfo, htd, rsServices); 523 } 524 525 public HeartbeatHRegion(HRegionFileSystem fs, WAL wal, Configuration confParam, 526 TableDescriptor htd, RegionServerServices rsServices) { 527 super(fs, wal, confParam, htd, rsServices); 528 } 529 530 private static void columnFamilySleep() { 531 if (sleepBetweenColumnFamilies) { 532 Threads.sleepWithoutInterrupt(columnFamilySleepTime); 533 } 534 } 535 536 private static void rowSleep() { 537 if (sleepBetweenRows) { 538 Threads.sleepWithoutInterrupt(rowSleepTime); 539 } 540 } 541 542 // Instantiate the custom heartbeat region scanners 543 @Override 544 protected RegionScannerImpl instantiateRegionScanner(Scan scan, 545 List<KeyValueScanner> additionalScanners, long nonceGroup, long nonce) throws IOException { 546 if (scan.isReversed()) { 547 if (scan.getFilter() != null) { 548 scan.getFilter().setReversed(true); 549 } 550 return new HeartbeatReversedRegionScanner(scan, additionalScanners, this); 551 } 552 return new HeartbeatRegionScanner(scan, additionalScanners, this); 553 } 554 } 555 556 /** 557 * Custom ReversedRegionScanner that can be configured to sleep between retrievals of row Results 558 * and/or column family cells 559 */ 560 private static class HeartbeatReversedRegionScanner extends ReversedRegionScannerImpl { 561 HeartbeatReversedRegionScanner(Scan scan, List<KeyValueScanner> additionalScanners, 562 HRegion region) throws IOException { 563 super(scan, additionalScanners, region, HConstants.NO_NONCE, HConstants.NO_NONCE); 564 } 565 566 @Override 567 public boolean nextRaw(List<? super ExtendedCell> outResults, ScannerContext context) 568 throws IOException { 569 boolean moreRows = super.nextRaw(outResults, context); 570 HeartbeatHRegion.rowSleep(); 571 return moreRows; 572 } 573 574 @Override 575 protected void initializeKVHeap(List<KeyValueScanner> scanners, 576 List<KeyValueScanner> joinedScanners, HRegion region) throws IOException { 577 this.storeHeap = 578 new HeartbeatReversedKVHeap(scanners, (CellComparatorImpl) region.getCellComparator()); 579 if (!joinedScanners.isEmpty()) { 580 this.joinedHeap = new HeartbeatReversedKVHeap(joinedScanners, 581 (CellComparatorImpl) region.getCellComparator()); 582 } 583 } 584 } 585 586 /** 587 * Custom RegionScanner that can be configured to sleep between retrievals of row Results and/or 588 * column family cells 589 */ 590 private static class HeartbeatRegionScanner extends RegionScannerImpl { 591 HeartbeatRegionScanner(Scan scan, List<KeyValueScanner> additionalScanners, HRegion region) 592 throws IOException { 593 super(scan, additionalScanners, region, HConstants.NO_NONCE, HConstants.NO_NONCE); 594 } 595 596 @Override 597 public boolean nextRaw(List<? super ExtendedCell> outResults, ScannerContext context) 598 throws IOException { 599 boolean moreRows = super.nextRaw(outResults, context); 600 HeartbeatHRegion.rowSleep(); 601 return moreRows; 602 } 603 604 @Override 605 protected void initializeKVHeap(List<KeyValueScanner> scanners, 606 List<KeyValueScanner> joinedScanners, HRegion region) throws IOException { 607 this.storeHeap = new HeartbeatKVHeap(scanners, region.getCellComparator()); 608 if (!joinedScanners.isEmpty()) { 609 this.joinedHeap = new HeartbeatKVHeap(joinedScanners, region.getCellComparator()); 610 } 611 } 612 } 613 614 /** 615 * Custom KV Heap that can be configured to sleep/wait in between retrievals of column family 616 * cells. Useful for testing 617 */ 618 private static final class HeartbeatKVHeap extends KeyValueHeap { 619 public HeartbeatKVHeap(List<? extends KeyValueScanner> scanners, CellComparator comparator) 620 throws IOException { 621 super(scanners, comparator); 622 } 623 624 HeartbeatKVHeap(List<? extends KeyValueScanner> scanners, KVScannerComparator comparator) 625 throws IOException { 626 super(scanners, comparator); 627 } 628 629 @Override 630 public boolean next(List<? super ExtendedCell> result, ScannerContext context) 631 throws IOException { 632 if (HeartbeatHRegion.sleepBeforeColumnFamily) HeartbeatHRegion.columnFamilySleep(); 633 boolean moreRows = super.next(result, context); 634 if (!HeartbeatHRegion.sleepBeforeColumnFamily) HeartbeatHRegion.columnFamilySleep(); 635 return moreRows; 636 } 637 } 638 639 /** 640 * Custom reversed KV Heap that can be configured to sleep in between retrievals of column family 641 * cells. 642 */ 643 private static final class HeartbeatReversedKVHeap extends ReversedKeyValueHeap { 644 public HeartbeatReversedKVHeap(List<? extends KeyValueScanner> scanners, 645 CellComparatorImpl comparator) throws IOException { 646 super(scanners, comparator); 647 } 648 649 @Override 650 public boolean next(List<? super ExtendedCell> result, ScannerContext context) 651 throws IOException { 652 if (HeartbeatHRegion.sleepBeforeColumnFamily) HeartbeatHRegion.columnFamilySleep(); 653 boolean moreRows = super.next(result, context); 654 if (!HeartbeatHRegion.sleepBeforeColumnFamily) HeartbeatHRegion.columnFamilySleep(); 655 return moreRows; 656 } 657 } 658}