001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.regionserver.RSRpcServices.DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertTrue; 023import static org.junit.Assert.fail; 024import static org.mockito.Mockito.when; 025 026import java.io.IOException; 027import java.util.ArrayList; 028import java.util.List; 029import java.util.concurrent.Callable; 030import org.apache.commons.lang3.exception.ExceptionUtils; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.fs.FileSystem; 033import org.apache.hadoop.fs.Path; 034import org.apache.hadoop.hbase.Cell; 035import org.apache.hadoop.hbase.CellComparator; 036import org.apache.hadoop.hbase.CellComparatorImpl; 037import org.apache.hadoop.hbase.CellUtil; 038import org.apache.hadoop.hbase.HBaseClassTestRule; 039import org.apache.hadoop.hbase.HBaseTestingUtil; 040import org.apache.hadoop.hbase.HConstants; 041import org.apache.hadoop.hbase.HTestConst; 042import org.apache.hadoop.hbase.KeyValue; 043import org.apache.hadoop.hbase.TableName; 044import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer; 045import org.apache.hadoop.hbase.client.AsyncConnection; 046import org.apache.hadoop.hbase.client.AsyncTable; 047import org.apache.hadoop.hbase.client.ConnectionFactory; 048import org.apache.hadoop.hbase.client.Put; 049import org.apache.hadoop.hbase.client.RegionInfo; 050import org.apache.hadoop.hbase.client.Result; 051import org.apache.hadoop.hbase.client.ResultScanner; 052import org.apache.hadoop.hbase.client.Scan; 053import org.apache.hadoop.hbase.client.ScanPerNextResultScanner; 054import org.apache.hadoop.hbase.client.Table; 055import org.apache.hadoop.hbase.client.TableDescriptor; 056import org.apache.hadoop.hbase.filter.Filter; 057import org.apache.hadoop.hbase.filter.FilterBase; 058import org.apache.hadoop.hbase.ipc.HBaseRpcController; 059import org.apache.hadoop.hbase.ipc.RpcCall; 060import org.apache.hadoop.hbase.testclassification.LargeTests; 061import org.apache.hadoop.hbase.util.Bytes; 062import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 063import org.apache.hadoop.hbase.util.Threads; 064import org.apache.hadoop.hbase.wal.WAL; 065import org.junit.After; 066import org.junit.AfterClass; 067import org.junit.Before; 068import org.junit.BeforeClass; 069import org.junit.ClassRule; 070import org.junit.Test; 071import org.junit.experimental.categories.Category; 072import org.mockito.Mockito; 073 074import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 075import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 076import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 077 078import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; 079import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; 080 081/** 082 * Here we test to make sure that scans return the expected Results when the server is sending the 083 * Client heartbeat messages. Heartbeat messages are essentially keep-alive messages (they prevent 084 * the scanner on the client side from timing out). A heartbeat message is sent from the server to 085 * the client when the server has exceeded the time limit during the processing of the scan. When 086 * the time limit is reached, the server will return to the Client whatever Results it has 087 * accumulated (potentially empty). 088 */ 089@Category(LargeTests.class) 090public class TestScannerHeartbeatMessages { 091 092 @ClassRule 093 public static final HBaseClassTestRule CLASS_RULE = 094 HBaseClassTestRule.forClass(TestScannerHeartbeatMessages.class); 095 096 private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 097 098 private static AsyncConnection CONN; 099 100 /** 101 * Table configuration 102 */ 103 private static TableName TABLE_NAME = TableName.valueOf("testScannerHeartbeatMessagesTable"); 104 105 private static int NUM_ROWS = 5; 106 private static byte[] ROW = Bytes.toBytes("testRow"); 107 private static byte[][] ROWS = HTestConst.makeNAscii(ROW, NUM_ROWS); 108 109 private static int NUM_FAMILIES = 4; 110 private static byte[] FAMILY = Bytes.toBytes("testFamily"); 111 private static byte[][] FAMILIES = HTestConst.makeNAscii(FAMILY, NUM_FAMILIES); 112 113 private static int NUM_QUALIFIERS = 3; 114 private static byte[] QUALIFIER = Bytes.toBytes("testQualifier"); 115 private static byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, NUM_QUALIFIERS); 116 117 private static int VALUE_SIZE = 128; 118 private static byte[] VALUE = Bytes.createMaxByteArray(VALUE_SIZE); 119 120 // The time limit should be based on the rpc timeout at client, or the client will regards 121 // the request as timeout before server return a heartbeat. 122 private static int SERVER_TIMEOUT = 60000; 123 124 // Time, in milliseconds, that the client will wait for a response from the server before timing 125 // out. This value is used server side to determine when it is necessary to send a heartbeat 126 // message to the client. Time limit will be 500 ms. 127 private static int CLIENT_TIMEOUT = 1000; 128 129 // In this test, we sleep after reading each row. So we should make sure after we get some number 130 // of rows and sleep same times we must reach time limit, and do not timeout after next sleeping. 131 private static int DEFAULT_ROW_SLEEP_TIME = 300; 132 133 // Similar with row sleep time. 134 private static int DEFAULT_CF_SLEEP_TIME = 300; 135 136 @BeforeClass 137 public static void setUpBeforeClass() throws Exception { 138 Configuration conf = TEST_UTIL.getConfiguration(); 139 140 conf.setStrings(HConstants.REGION_IMPL, HeartbeatHRegion.class.getName()); 141 conf.setStrings(HConstants.REGION_SERVER_IMPL, HeartbeatHRegionServer.class.getName()); 142 conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, SERVER_TIMEOUT); 143 conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, SERVER_TIMEOUT); 144 conf.setInt(HConstants.HBASE_CLIENT_PAUSE, 1); 145 146 // Check the timeout condition after every cell 147 conf.setLong(StoreScanner.HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK, 1); 148 TEST_UTIL.startMiniCluster(1); 149 150 createTestTable(TABLE_NAME, ROWS, FAMILIES, QUALIFIERS, VALUE); 151 152 Configuration newConf = new Configuration(conf); 153 newConf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, CLIENT_TIMEOUT); 154 newConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, CLIENT_TIMEOUT); 155 CONN = ConnectionFactory.createAsyncConnection(newConf).get(); 156 } 157 158 @Test 159 public void testTimeLimitAccountsForQueueTime() throws IOException, InterruptedException { 160 HRegionServer rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0); 161 RSRpcServices services = new RSRpcServices(rs); 162 RpcCall mockRpcCall = Mockito.mock(RpcCall.class); 163 // first return 180 (minimal queuing), then 120 (more queueing), then 101 (heavy queueing) 164 // finally, 25 is fatal levels of queueing -- exceeding timeout 165 when(mockRpcCall.getReceiveTime()).thenReturn(180L, 120L, 101L, 25L); 166 167 // assume timeout of 100ms 168 HBaseRpcController mockController = Mockito.mock(HBaseRpcController.class); 169 when(mockController.getCallTimeout()).thenReturn(100); 170 171 // current time is 100, which we'll subtract from 90 and 50 to generate some time deltas 172 EnvironmentEdgeManager.injectEdge(() -> 200L); 173 174 try { 175 // we queued for 20ms, leaving 80ms of timeout, which we divide by 2 176 assertEquals(200 + (100 - 20) / 2, services.getTimeLimit(mockRpcCall, mockController, true)); 177 // we queued for 80ms, leaving 20ms of timeout, which we divide by 2 178 assertEquals(200 + (100 - 80) / 2, services.getTimeLimit(mockRpcCall, mockController, true)); 179 // we queued for 99ms of 100ms timeout, leaving only 1ms. we fall back to default minimum 180 assertEquals(200 + DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA, 181 services.getTimeLimit(mockRpcCall, mockController, true)); 182 // lastly, we queue for 175ms of 100ms timeout. this should be very rare since we drop 183 // timed out calls in the queue. in this case we still fallback on default minimum for now. 184 assertEquals(200 + DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA, 185 services.getTimeLimit(mockRpcCall, mockController, true)); 186 } finally { 187 EnvironmentEdgeManager.reset(); 188 } 189 190 } 191 192 static void createTestTable(TableName name, byte[][] rows, byte[][] families, byte[][] qualifiers, 193 byte[] cellValue) throws IOException { 194 Table ht = TEST_UTIL.createTable(name, families); 195 List<Put> puts = createPuts(rows, families, qualifiers, cellValue); 196 ht.put(puts); 197 } 198 199 /** 200 * Make puts to put the input value into each combination of row, family, and qualifier 201 */ 202 static ArrayList<Put> createPuts(byte[][] rows, byte[][] families, byte[][] qualifiers, 203 byte[] value) throws IOException { 204 Put put; 205 ArrayList<Put> puts = new ArrayList<>(); 206 207 for (int row = 0; row < rows.length; row++) { 208 put = new Put(rows[row]); 209 for (int fam = 0; fam < families.length; fam++) { 210 for (int qual = 0; qual < qualifiers.length; qual++) { 211 KeyValue kv = new KeyValue(rows[row], families[fam], qualifiers[qual], qual, value); 212 put.add(kv); 213 } 214 } 215 puts.add(put); 216 } 217 218 return puts; 219 } 220 221 @AfterClass 222 public static void tearDownAfterClass() throws Exception { 223 Closeables.close(CONN, true); 224 TEST_UTIL.shutdownMiniCluster(); 225 } 226 227 @Before 228 public void setupBeforeTest() throws Exception { 229 disableSleeping(); 230 } 231 232 @After 233 public void teardownAfterTest() throws Exception { 234 disableSleeping(); 235 } 236 237 /** 238 * Run the test callable when heartbeats are enabled/disabled. We expect all tests to only pass 239 * when heartbeat messages are enabled (otherwise the test is pointless). When heartbeats are 240 * disabled, the test should throw an exception. 241 */ 242 private void testImportanceOfHeartbeats(Callable<Void> testCallable) throws InterruptedException { 243 HeartbeatRPCServices.heartbeatsEnabled = true; 244 245 try { 246 testCallable.call(); 247 } catch (Exception e) { 248 fail("Heartbeat messages are enabled, exceptions should NOT be thrown. Exception trace:" 249 + ExceptionUtils.getStackTrace(e)); 250 } 251 252 HeartbeatRPCServices.heartbeatsEnabled = false; 253 try { 254 testCallable.call(); 255 } catch (Exception e) { 256 return; 257 } finally { 258 HeartbeatRPCServices.heartbeatsEnabled = true; 259 } 260 fail("Heartbeats messages are disabled, an exception should be thrown. If an exception " 261 + " is not thrown, the test case is not testing the importance of heartbeat messages"); 262 } 263 264 /** 265 * Test the case that the time limit for the scan is reached after each full row of cells is 266 * fetched. 267 */ 268 @Test 269 public void testHeartbeatBetweenRows() throws Exception { 270 testImportanceOfHeartbeats(new Callable<Void>() { 271 272 @Override 273 public Void call() throws Exception { 274 // Configure the scan so that it can read the entire table in a single RPC. We want to test 275 // the case where a scan stops on the server side due to a time limit 276 Scan scan = new Scan(); 277 scan.setMaxResultSize(Long.MAX_VALUE); 278 scan.setCaching(Integer.MAX_VALUE); 279 280 testEquivalenceOfScanWithHeartbeats(scan, DEFAULT_ROW_SLEEP_TIME, -1, false); 281 return null; 282 } 283 }); 284 } 285 286 /** 287 * Test the case that the time limit for scans is reached in between column families 288 */ 289 @Test 290 public void testHeartbeatBetweenColumnFamilies() throws Exception { 291 testImportanceOfHeartbeats(new Callable<Void>() { 292 @Override 293 public Void call() throws Exception { 294 // Configure the scan so that it can read the entire table in a single RPC. We want to test 295 // the case where a scan stops on the server side due to a time limit 296 Scan baseScan = new Scan(); 297 baseScan.setMaxResultSize(Long.MAX_VALUE); 298 baseScan.setCaching(Integer.MAX_VALUE); 299 300 // Copy the scan before each test. When a scan object is used by a scanner, some of its 301 // fields may be changed such as start row 302 Scan scanCopy = new Scan(baseScan); 303 testEquivalenceOfScanWithHeartbeats(scanCopy, -1, DEFAULT_CF_SLEEP_TIME, false); 304 scanCopy = new Scan(baseScan); 305 testEquivalenceOfScanWithHeartbeats(scanCopy, -1, DEFAULT_CF_SLEEP_TIME, true); 306 return null; 307 } 308 }); 309 } 310 311 public static class SparseCellFilter extends FilterBase { 312 313 @Override 314 public ReturnCode filterCell(final Cell v) throws IOException { 315 try { 316 Thread.sleep(CLIENT_TIMEOUT / 2 + 100); 317 } catch (InterruptedException e) { 318 Thread.currentThread().interrupt(); 319 } 320 return Bytes.equals(CellUtil.cloneRow(v), ROWS[NUM_ROWS - 1]) 321 ? ReturnCode.INCLUDE 322 : ReturnCode.SKIP; 323 } 324 325 public static Filter parseFrom(final byte[] pbBytes) { 326 return new SparseCellFilter(); 327 } 328 } 329 330 public static class SparseRowFilter extends FilterBase { 331 332 @Override 333 public boolean filterRowKey(Cell cell) throws IOException { 334 try { 335 Thread.sleep(CLIENT_TIMEOUT / 2 - 100); 336 } catch (InterruptedException e) { 337 Thread.currentThread().interrupt(); 338 } 339 return !Bytes.equals(CellUtil.cloneRow(cell), ROWS[NUM_ROWS - 1]); 340 } 341 342 public static Filter parseFrom(final byte[] pbBytes) { 343 return new SparseRowFilter(); 344 } 345 } 346 347 /** 348 * Test the case that there is a filter which filters most of cells 349 */ 350 @Test 351 public void testHeartbeatWithSparseCellFilter() throws Exception { 352 testImportanceOfHeartbeats(new Callable<Void>() { 353 @Override 354 public Void call() throws Exception { 355 Scan scan = new Scan(); 356 scan.setMaxResultSize(Long.MAX_VALUE); 357 scan.setCaching(Integer.MAX_VALUE); 358 scan.setFilter(new SparseCellFilter()); 359 try (ScanPerNextResultScanner scanner = 360 new ScanPerNextResultScanner(CONN.getTable(TABLE_NAME), scan)) { 361 int num = 0; 362 while (scanner.next() != null) { 363 num++; 364 } 365 assertEquals(1, num); 366 } 367 368 scan = new Scan(); 369 scan.setMaxResultSize(Long.MAX_VALUE); 370 scan.setCaching(Integer.MAX_VALUE); 371 scan.setFilter(new SparseCellFilter()); 372 scan.setAllowPartialResults(true); 373 try (ScanPerNextResultScanner scanner = 374 new ScanPerNextResultScanner(CONN.getTable(TABLE_NAME), scan)) { 375 int num = 0; 376 while (scanner.next() != null) { 377 num++; 378 } 379 assertEquals(NUM_FAMILIES * NUM_QUALIFIERS, num); 380 } 381 382 return null; 383 } 384 }); 385 } 386 387 /** 388 * Test the case that there is a filter which filters most of rows 389 */ 390 @Test 391 public void testHeartbeatWithSparseRowFilter() throws Exception { 392 testImportanceOfHeartbeats(new Callable<Void>() { 393 @Override 394 public Void call() throws Exception { 395 Scan scan = new Scan(); 396 scan.setMaxResultSize(Long.MAX_VALUE); 397 scan.setCaching(Integer.MAX_VALUE); 398 scan.setFilter(new SparseRowFilter()); 399 try (ScanPerNextResultScanner scanner = 400 new ScanPerNextResultScanner(CONN.getTable(TABLE_NAME), scan)) { 401 int num = 0; 402 while (scanner.next() != null) { 403 num++; 404 } 405 assertEquals(1, num); 406 } 407 408 return null; 409 } 410 }); 411 } 412 413 /** 414 * Test the equivalence of a scan versus the same scan executed when heartbeat messages are 415 * necessary 416 * @param scan The scan configuration being tested 417 * @param rowSleepTime The time to sleep between fetches of row cells 418 * @param cfSleepTime The time to sleep between fetches of column family cells 419 * @param sleepBeforeCf set to true when column family sleeps should occur before the cells for 420 * that column family are fetched 421 */ 422 private void testEquivalenceOfScanWithHeartbeats(final Scan scan, int rowSleepTime, 423 int cfSleepTime, boolean sleepBeforeCf) throws Exception { 424 disableSleeping(); 425 AsyncTable<AdvancedScanResultConsumer> table = CONN.getTable(TABLE_NAME); 426 final ResultScanner scanner = new ScanPerNextResultScanner(table, scan); 427 final ResultScanner scannerWithHeartbeats = new ScanPerNextResultScanner(table, scan); 428 429 Result r1 = null; 430 Result r2 = null; 431 432 while ((r1 = scanner.next()) != null) { 433 // Enforce the specified sleep conditions during calls to the heartbeat scanner 434 configureSleepTime(rowSleepTime, cfSleepTime, sleepBeforeCf); 435 r2 = scannerWithHeartbeats.next(); 436 disableSleeping(); 437 438 assertTrue(r2 != null); 439 try { 440 Result.compareResults(r1, r2); 441 } catch (Exception e) { 442 fail(e.getMessage()); 443 } 444 } 445 446 assertTrue(scannerWithHeartbeats.next() == null); 447 scanner.close(); 448 scannerWithHeartbeats.close(); 449 } 450 451 /** 452 * Helper method for setting the time to sleep between rows and column families. If a sleep time 453 * is negative then that sleep will be disabled 454 */ 455 private static void configureSleepTime(int rowSleepTime, int cfSleepTime, boolean sleepBeforeCf) { 456 HeartbeatHRegion.sleepBetweenRows = rowSleepTime > 0; 457 HeartbeatHRegion.rowSleepTime = rowSleepTime; 458 459 HeartbeatHRegion.sleepBetweenColumnFamilies = cfSleepTime > 0; 460 HeartbeatHRegion.columnFamilySleepTime = cfSleepTime; 461 HeartbeatHRegion.sleepBeforeColumnFamily = sleepBeforeCf; 462 } 463 464 /** 465 * Disable the sleeping mechanism server side. 466 */ 467 private static void disableSleeping() { 468 HeartbeatHRegion.sleepBetweenRows = false; 469 HeartbeatHRegion.sleepBetweenColumnFamilies = false; 470 } 471 472 /** 473 * Custom HRegionServer instance that instantiates {@link HeartbeatRPCServices} in place of 474 * {@link RSRpcServices} to allow us to toggle support for heartbeat messages 475 */ 476 private static class HeartbeatHRegionServer extends HRegionServer { 477 public HeartbeatHRegionServer(Configuration conf) throws IOException, InterruptedException { 478 super(conf); 479 } 480 481 @Override 482 protected RSRpcServices createRpcServices() throws IOException { 483 return new HeartbeatRPCServices(this); 484 } 485 } 486 487 /** 488 * Custom RSRpcServices instance that allows heartbeat support to be toggled 489 */ 490 private static class HeartbeatRPCServices extends RSRpcServices { 491 private static volatile boolean heartbeatsEnabled = true; 492 493 public HeartbeatRPCServices(HRegionServer rs) throws IOException { 494 super(rs); 495 } 496 497 @Override 498 public ScanResponse scan(RpcController controller, ScanRequest request) 499 throws ServiceException { 500 ScanRequest.Builder builder = ScanRequest.newBuilder(request); 501 builder.setClientHandlesHeartbeats(heartbeatsEnabled); 502 return super.scan(controller, builder.build()); 503 } 504 } 505 506 /** 507 * Custom HRegion class that instantiates {@link RegionScanner}s with configurable sleep times 508 * between fetches of row Results and/or column family cells. Useful for emulating an instance 509 * where the server is taking a long time to process a client's scan request 510 */ 511 private static class HeartbeatHRegion extends HRegion { 512 // Row sleeps occur AFTER each row worth of cells is retrieved. 513 private static volatile int rowSleepTime = DEFAULT_ROW_SLEEP_TIME; 514 private static volatile boolean sleepBetweenRows = false; 515 516 // The sleep for column families can be initiated before or after we fetch the cells for the 517 // column family. If the sleep occurs BEFORE then the time limits will be reached inside 518 // StoreScanner while we are fetching individual cells. If the sleep occurs AFTER then the time 519 // limit will be reached inside RegionScanner after all the cells for a column family have been 520 // retrieved. 521 private static volatile boolean sleepBeforeColumnFamily = false; 522 private static volatile int columnFamilySleepTime = DEFAULT_CF_SLEEP_TIME; 523 private static volatile boolean sleepBetweenColumnFamilies = false; 524 525 public HeartbeatHRegion(Path tableDir, WAL wal, FileSystem fs, Configuration confParam, 526 RegionInfo regionInfo, TableDescriptor htd, RegionServerServices rsServices) { 527 super(tableDir, wal, fs, confParam, regionInfo, htd, rsServices); 528 } 529 530 public HeartbeatHRegion(HRegionFileSystem fs, WAL wal, Configuration confParam, 531 TableDescriptor htd, RegionServerServices rsServices) { 532 super(fs, wal, confParam, htd, rsServices); 533 } 534 535 private static void columnFamilySleep() { 536 if (sleepBetweenColumnFamilies) { 537 Threads.sleepWithoutInterrupt(columnFamilySleepTime); 538 } 539 } 540 541 private static void rowSleep() { 542 if (sleepBetweenRows) { 543 Threads.sleepWithoutInterrupt(rowSleepTime); 544 } 545 } 546 547 // Instantiate the custom heartbeat region scanners 548 @Override 549 protected RegionScannerImpl instantiateRegionScanner(Scan scan, 550 List<KeyValueScanner> additionalScanners, long nonceGroup, long nonce) throws IOException { 551 if (scan.isReversed()) { 552 if (scan.getFilter() != null) { 553 scan.getFilter().setReversed(true); 554 } 555 return new HeartbeatReversedRegionScanner(scan, additionalScanners, this); 556 } 557 return new HeartbeatRegionScanner(scan, additionalScanners, this); 558 } 559 } 560 561 /** 562 * Custom ReversedRegionScanner that can be configured to sleep between retrievals of row Results 563 * and/or column family cells 564 */ 565 private static class HeartbeatReversedRegionScanner extends ReversedRegionScannerImpl { 566 HeartbeatReversedRegionScanner(Scan scan, List<KeyValueScanner> additionalScanners, 567 HRegion region) throws IOException { 568 super(scan, additionalScanners, region, HConstants.NO_NONCE, HConstants.NO_NONCE); 569 } 570 571 @Override 572 public boolean nextRaw(List<Cell> outResults, ScannerContext context) throws IOException { 573 boolean moreRows = super.nextRaw(outResults, context); 574 HeartbeatHRegion.rowSleep(); 575 return moreRows; 576 } 577 578 @Override 579 protected void initializeKVHeap(List<KeyValueScanner> scanners, 580 List<KeyValueScanner> joinedScanners, HRegion region) throws IOException { 581 this.storeHeap = 582 new HeartbeatReversedKVHeap(scanners, (CellComparatorImpl) region.getCellComparator()); 583 if (!joinedScanners.isEmpty()) { 584 this.joinedHeap = new HeartbeatReversedKVHeap(joinedScanners, 585 (CellComparatorImpl) region.getCellComparator()); 586 } 587 } 588 } 589 590 /** 591 * Custom RegionScanner that can be configured to sleep between retrievals of row Results and/or 592 * column family cells 593 */ 594 private static class HeartbeatRegionScanner extends RegionScannerImpl { 595 HeartbeatRegionScanner(Scan scan, List<KeyValueScanner> additionalScanners, HRegion region) 596 throws IOException { 597 super(scan, additionalScanners, region, HConstants.NO_NONCE, HConstants.NO_NONCE); 598 } 599 600 @Override 601 public boolean nextRaw(List<Cell> outResults, ScannerContext context) throws IOException { 602 boolean moreRows = super.nextRaw(outResults, context); 603 HeartbeatHRegion.rowSleep(); 604 return moreRows; 605 } 606 607 @Override 608 protected void initializeKVHeap(List<KeyValueScanner> scanners, 609 List<KeyValueScanner> joinedScanners, HRegion region) throws IOException { 610 this.storeHeap = new HeartbeatKVHeap(scanners, region.getCellComparator()); 611 if (!joinedScanners.isEmpty()) { 612 this.joinedHeap = new HeartbeatKVHeap(joinedScanners, region.getCellComparator()); 613 } 614 } 615 } 616 617 /** 618 * Custom KV Heap that can be configured to sleep/wait in between retrievals of column family 619 * cells. Useful for testing 620 */ 621 private static final class HeartbeatKVHeap extends KeyValueHeap { 622 public HeartbeatKVHeap(List<? extends KeyValueScanner> scanners, CellComparator comparator) 623 throws IOException { 624 super(scanners, comparator); 625 } 626 627 HeartbeatKVHeap(List<? extends KeyValueScanner> scanners, KVScannerComparator comparator) 628 throws IOException { 629 super(scanners, comparator); 630 } 631 632 @Override 633 public boolean next(List<Cell> result, ScannerContext context) throws IOException { 634 if (HeartbeatHRegion.sleepBeforeColumnFamily) HeartbeatHRegion.columnFamilySleep(); 635 boolean moreRows = super.next(result, context); 636 if (!HeartbeatHRegion.sleepBeforeColumnFamily) HeartbeatHRegion.columnFamilySleep(); 637 return moreRows; 638 } 639 } 640 641 /** 642 * Custom reversed KV Heap that can be configured to sleep in between retrievals of column family 643 * cells. 644 */ 645 private static final class HeartbeatReversedKVHeap extends ReversedKeyValueHeap { 646 public HeartbeatReversedKVHeap(List<? extends KeyValueScanner> scanners, 647 CellComparatorImpl comparator) throws IOException { 648 super(scanners, comparator); 649 } 650 651 @Override 652 public boolean next(List<Cell> result, ScannerContext context) throws IOException { 653 if (HeartbeatHRegion.sleepBeforeColumnFamily) HeartbeatHRegion.columnFamilySleep(); 654 boolean moreRows = super.next(result, context); 655 if (!HeartbeatHRegion.sleepBeforeColumnFamily) HeartbeatHRegion.columnFamilySleep(); 656 return moreRows; 657 } 658 } 659}