001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.regionserver.RSRpcServices.DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertTrue; 023import static org.junit.Assert.fail; 024import static org.mockito.Mockito.when; 025 026import java.io.IOException; 027import java.util.ArrayList; 028import java.util.List; 029import java.util.concurrent.Callable; 030import org.apache.commons.lang3.exception.ExceptionUtils; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.fs.FileSystem; 033import org.apache.hadoop.fs.Path; 034import org.apache.hadoop.hbase.Cell; 035import org.apache.hadoop.hbase.CellComparator; 036import org.apache.hadoop.hbase.CellComparatorImpl; 037import org.apache.hadoop.hbase.CellUtil; 038import org.apache.hadoop.hbase.HBaseClassTestRule; 039import org.apache.hadoop.hbase.HBaseTestingUtility; 040import org.apache.hadoop.hbase.HConstants; 041import org.apache.hadoop.hbase.HTestConst; 042import org.apache.hadoop.hbase.KeyValue; 043import org.apache.hadoop.hbase.TableName; 044import org.apache.hadoop.hbase.client.Connection; 045import org.apache.hadoop.hbase.client.ConnectionFactory; 046import org.apache.hadoop.hbase.client.Put; 047import org.apache.hadoop.hbase.client.RegionInfo; 048import org.apache.hadoop.hbase.client.Result; 049import org.apache.hadoop.hbase.client.ResultScanner; 050import org.apache.hadoop.hbase.client.Scan; 051import org.apache.hadoop.hbase.client.Table; 052import org.apache.hadoop.hbase.client.TableDescriptor; 053import org.apache.hadoop.hbase.filter.Filter; 054import org.apache.hadoop.hbase.filter.FilterBase; 055import org.apache.hadoop.hbase.ipc.HBaseRpcController; 056import org.apache.hadoop.hbase.ipc.RpcCall; 057import org.apache.hadoop.hbase.testclassification.LargeTests; 058import org.apache.hadoop.hbase.util.Bytes; 059import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 060import org.apache.hadoop.hbase.util.Threads; 061import org.apache.hadoop.hbase.wal.WAL; 062import org.junit.After; 063import org.junit.AfterClass; 064import org.junit.Before; 065import org.junit.BeforeClass; 066import org.junit.ClassRule; 067import org.junit.Test; 068import org.junit.experimental.categories.Category; 069import org.mockito.Mockito; 070 071import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 072import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 073 074import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; 075import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; 076 077/** 078 * Here we test to make sure that scans return the expected Results when the server is sending the 079 * Client heartbeat messages. Heartbeat messages are essentially keep-alive messages (they prevent 080 * the scanner on the client side from timing out). A heartbeat message is sent from the server to 081 * the client when the server has exceeded the time limit during the processing of the scan. When 082 * the time limit is reached, the server will return to the Client whatever Results it has 083 * accumulated (potentially empty). 084 */ 085@Category(LargeTests.class) 086public class TestScannerHeartbeatMessages { 087 088 @ClassRule 089 public static final HBaseClassTestRule CLASS_RULE = 090 HBaseClassTestRule.forClass(TestScannerHeartbeatMessages.class); 091 092 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 093 094 private static Table TABLE = null; 095 private static Connection CONN = null; 096 097 /** 098 * Table configuration 099 */ 100 private static TableName TABLE_NAME = TableName.valueOf("testScannerHeartbeatMessagesTable"); 101 102 private static int NUM_ROWS = 5; 103 private static byte[] ROW = Bytes.toBytes("testRow"); 104 private static byte[][] ROWS = HTestConst.makeNAscii(ROW, NUM_ROWS); 105 106 private static int NUM_FAMILIES = 4; 107 private static byte[] FAMILY = Bytes.toBytes("testFamily"); 108 private static byte[][] FAMILIES = HTestConst.makeNAscii(FAMILY, NUM_FAMILIES); 109 110 private static int NUM_QUALIFIERS = 3; 111 private static byte[] QUALIFIER = Bytes.toBytes("testQualifier"); 112 private static byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, NUM_QUALIFIERS); 113 114 private static int VALUE_SIZE = 128; 115 private static byte[] VALUE = Bytes.createMaxByteArray(VALUE_SIZE); 116 117 // The time limit should be based on the rpc timeout at client, or the client will regards 118 // the request as timeout before server return a heartbeat. 119 private static int SERVER_TIMEOUT = 60000; 120 121 // Time, in milliseconds, that the client will wait for a response from the server before timing 122 // out. This value is used server side to determine when it is necessary to send a heartbeat 123 // message to the client. Time limit will be 500 ms. 124 private static int CLIENT_TIMEOUT = 1000; 125 126 // In this test, we sleep after reading each row. So we should make sure after we get some number 127 // of rows and sleep same times we must reach time limit, and do not timeout after next sleeping. 128 private static int DEFAULT_ROW_SLEEP_TIME = 300; 129 130 // Similar with row sleep time. 131 private static int DEFAULT_CF_SLEEP_TIME = 300; 132 133 @BeforeClass 134 public static void setUpBeforeClass() throws Exception { 135 Configuration conf = TEST_UTIL.getConfiguration(); 136 137 conf.setStrings(HConstants.REGION_IMPL, HeartbeatHRegion.class.getName()); 138 conf.setStrings(HConstants.REGION_SERVER_IMPL, HeartbeatHRegionServer.class.getName()); 139 // setting these here for usage on the server side. will override for client side below 140 conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, SERVER_TIMEOUT); 141 conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, SERVER_TIMEOUT); 142 conf.setInt(HConstants.HBASE_CLIENT_PAUSE, 1); 143 144 // Check the timeout condition after every cell 145 conf.setLong(StoreScanner.HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK, 1); 146 TEST_UTIL.startMiniCluster(1); 147 148 // set client timeout for client side, we want it to be less than server side. 149 Configuration clientConf = new Configuration(conf); 150 clientConf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, CLIENT_TIMEOUT); 151 CONN = ConnectionFactory.createConnection(clientConf); 152 TABLE = createTestTable(TABLE_NAME, ROWS, FAMILIES, QUALIFIERS, VALUE); 153 } 154 155 @Test 156 public void testTimeLimitAccountsForQueueTime() throws IOException, InterruptedException { 157 HRegionServer rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0); 158 RSRpcServices services = new RSRpcServices(rs); 159 RpcCall mockRpcCall = Mockito.mock(RpcCall.class); 160 // first return 180 (minimal queuing), then 120 (more queueing), then 101 (heavy queueing) 161 // finally, 25 is fatal levels of queueing -- exceeding timeout 162 when(mockRpcCall.getReceiveTime()).thenReturn(180L, 120L, 101L, 25L); 163 164 // assume timeout of 100ms 165 HBaseRpcController mockController = Mockito.mock(HBaseRpcController.class); 166 when(mockController.getCallTimeout()).thenReturn(100); 167 168 // current time is 100, which we'll subtract from 90 and 50 to generate some time deltas 169 EnvironmentEdgeManager.injectEdge(() -> 200L); 170 171 try { 172 // we queued for 20ms, leaving 80ms of timeout, which we divide by 2 173 assertEquals(200 + (100 - 20) / 2, services.getTimeLimit(mockRpcCall, mockController, true)); 174 // we queued for 80ms, leaving 20ms of timeout, which we divide by 2 175 assertEquals(200 + (100 - 80) / 2, services.getTimeLimit(mockRpcCall, mockController, true)); 176 // we queued for 99ms of 100ms timeout, leaving only 1ms. we fall back to default minimum 177 assertEquals(200 + DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA, 178 services.getTimeLimit(mockRpcCall, mockController, true)); 179 // lastly, we queue for 175ms of 100ms timeout. this should be very rare since we drop 180 // timed out calls in the queue. in this case we still fallback on default minimum for now. 181 assertEquals(200 + DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA, 182 services.getTimeLimit(mockRpcCall, mockController, true)); 183 } finally { 184 EnvironmentEdgeManager.reset(); 185 } 186 187 } 188 189 static Table createTestTable(TableName name, byte[][] rows, byte[][] families, 190 byte[][] qualifiers, byte[] cellValue) throws IOException { 191 TEST_UTIL.createTable(name, families); 192 Table ht = CONN.getTable(name); 193 List<Put> puts = createPuts(rows, families, qualifiers, cellValue); 194 ht.put(puts); 195 return ht; 196 } 197 198 /** 199 * Make puts to put the input value into each combination of row, family, and qualifier 200 */ 201 static ArrayList<Put> createPuts(byte[][] rows, byte[][] families, byte[][] qualifiers, 202 byte[] value) throws IOException { 203 Put put; 204 ArrayList<Put> puts = new ArrayList<>(); 205 206 for (int row = 0; row < rows.length; row++) { 207 put = new Put(rows[row]); 208 for (int fam = 0; fam < families.length; fam++) { 209 for (int qual = 0; qual < qualifiers.length; qual++) { 210 KeyValue kv = new KeyValue(rows[row], families[fam], qualifiers[qual], qual, value); 211 put.add(kv); 212 } 213 } 214 puts.add(put); 215 } 216 217 return puts; 218 } 219 220 @AfterClass 221 public static void tearDownAfterClass() throws Exception { 222 CONN.close(); 223 TEST_UTIL.shutdownMiniCluster(); 224 } 225 226 @Before 227 public void setupBeforeTest() throws Exception { 228 disableSleeping(); 229 } 230 231 @After 232 public void teardownAfterTest() throws Exception { 233 disableSleeping(); 234 } 235 236 /** 237 * Run the test callable when heartbeats are enabled/disabled. We expect all tests to only pass 238 * when heartbeat messages are enabled (otherwise the test is pointless). When heartbeats are 239 * disabled, the test should throw an exception. 240 */ 241 private void testImportanceOfHeartbeats(Callable<Void> testCallable) throws InterruptedException { 242 HeartbeatRPCServices.heartbeatsEnabled = true; 243 244 try { 245 testCallable.call(); 246 } catch (Exception e) { 247 fail("Heartbeat messages are enabled, exceptions should NOT be thrown. Exception trace:" 248 + ExceptionUtils.getStackTrace(e)); 249 } 250 251 HeartbeatRPCServices.heartbeatsEnabled = false; 252 try { 253 testCallable.call(); 254 } catch (Exception e) { 255 return; 256 } finally { 257 HeartbeatRPCServices.heartbeatsEnabled = true; 258 } 259 fail("Heartbeats messages are disabled, an exception should be thrown. If an exception " 260 + " is not thrown, the test case is not testing the importance of heartbeat messages"); 261 } 262 263 /** 264 * Test the case that the time limit for the scan is reached after each full row of cells is 265 * fetched. 266 */ 267 @Test 268 public void testHeartbeatBetweenRows() throws Exception { 269 testImportanceOfHeartbeats(new Callable<Void>() { 270 271 @Override 272 public Void call() throws Exception { 273 // Configure the scan so that it can read the entire table in a single RPC. We want to test 274 // the case where a scan stops on the server side due to a time limit 275 Scan scan = new Scan(); 276 scan.setMaxResultSize(Long.MAX_VALUE); 277 scan.setCaching(Integer.MAX_VALUE); 278 279 testEquivalenceOfScanWithHeartbeats(scan, DEFAULT_ROW_SLEEP_TIME, -1, false); 280 return null; 281 } 282 }); 283 } 284 285 /** 286 * Test the case that the time limit for scans is reached in between column families 287 */ 288 @Test 289 public void testHeartbeatBetweenColumnFamilies() throws Exception { 290 testImportanceOfHeartbeats(new Callable<Void>() { 291 @Override 292 public Void call() throws Exception { 293 // Configure the scan so that it can read the entire table in a single RPC. We want to test 294 // the case where a scan stops on the server side due to a time limit 295 Scan baseScan = new Scan(); 296 baseScan.setMaxResultSize(Long.MAX_VALUE); 297 baseScan.setCaching(Integer.MAX_VALUE); 298 299 // Copy the scan before each test. When a scan object is used by a scanner, some of its 300 // fields may be changed such as start row 301 Scan scanCopy = new Scan(baseScan); 302 testEquivalenceOfScanWithHeartbeats(scanCopy, -1, DEFAULT_CF_SLEEP_TIME, false); 303 scanCopy = new Scan(baseScan); 304 testEquivalenceOfScanWithHeartbeats(scanCopy, -1, DEFAULT_CF_SLEEP_TIME, true); 305 return null; 306 } 307 }); 308 } 309 310 public static class SparseCellFilter extends FilterBase { 311 312 @Override 313 public ReturnCode filterCell(final Cell v) throws IOException { 314 try { 315 Thread.sleep(CLIENT_TIMEOUT / 2 + 100); 316 } catch (InterruptedException e) { 317 Thread.currentThread().interrupt(); 318 } 319 return Bytes.equals(CellUtil.cloneRow(v), ROWS[NUM_ROWS - 1]) 320 ? ReturnCode.INCLUDE 321 : ReturnCode.SKIP; 322 } 323 324 public static Filter parseFrom(final byte[] pbBytes) { 325 return new SparseCellFilter(); 326 } 327 } 328 329 public static class SparseRowFilter extends FilterBase { 330 331 @Override 332 public boolean filterRowKey(Cell cell) throws IOException { 333 try { 334 Thread.sleep(CLIENT_TIMEOUT / 2 - 100); 335 } catch (InterruptedException e) { 336 Thread.currentThread().interrupt(); 337 } 338 return !Bytes.equals(CellUtil.cloneRow(cell), ROWS[NUM_ROWS - 1]); 339 } 340 341 public static Filter parseFrom(final byte[] pbBytes) { 342 return new SparseRowFilter(); 343 } 344 } 345 346 /** 347 * Test the case that there is a filter which filters most of cells 348 */ 349 @Test 350 public void testHeartbeatWithSparseCellFilter() throws Exception { 351 testImportanceOfHeartbeats(new Callable<Void>() { 352 @Override 353 public Void call() throws Exception { 354 Scan scan = new Scan(); 355 scan.setMaxResultSize(Long.MAX_VALUE); 356 scan.setCaching(Integer.MAX_VALUE); 357 scan.setFilter(new SparseCellFilter()); 358 ResultScanner scanner = TABLE.getScanner(scan); 359 int num = 0; 360 while (scanner.next() != null) { 361 num++; 362 } 363 assertEquals(1, num); 364 scanner.close(); 365 366 scan = new Scan(); 367 scan.setMaxResultSize(Long.MAX_VALUE); 368 scan.setCaching(Integer.MAX_VALUE); 369 scan.setFilter(new SparseCellFilter()); 370 scan.setAllowPartialResults(true); 371 scanner = TABLE.getScanner(scan); 372 num = 0; 373 while (scanner.next() != null) { 374 num++; 375 } 376 assertEquals(NUM_FAMILIES * NUM_QUALIFIERS, num); 377 scanner.close(); 378 379 return null; 380 } 381 }); 382 } 383 384 /** 385 * Test the case that there is a filter which filters most of rows 386 */ 387 @Test 388 public void testHeartbeatWithSparseRowFilter() throws Exception { 389 testImportanceOfHeartbeats(new Callable<Void>() { 390 @Override 391 public Void call() throws Exception { 392 Scan scan = new Scan(); 393 scan.setMaxResultSize(Long.MAX_VALUE); 394 scan.setCaching(Integer.MAX_VALUE); 395 scan.setFilter(new SparseRowFilter()); 396 ResultScanner scanner = TABLE.getScanner(scan); 397 int num = 0; 398 while (scanner.next() != null) { 399 num++; 400 } 401 assertEquals(1, num); 402 scanner.close(); 403 404 return null; 405 } 406 }); 407 } 408 409 /** 410 * Test the equivalence of a scan versus the same scan executed when heartbeat messages are 411 * necessary 412 * @param scan The scan configuration being tested 413 * @param rowSleepTime The time to sleep between fetches of row cells 414 * @param cfSleepTime The time to sleep between fetches of column family cells 415 * @param sleepBeforeCf set to true when column family sleeps should occur before the cells for 416 * that column family are fetched 417 */ 418 private void testEquivalenceOfScanWithHeartbeats(final Scan scan, int rowSleepTime, 419 int cfSleepTime, boolean sleepBeforeCf) throws Exception { 420 disableSleeping(); 421 final ResultScanner scanner = TABLE.getScanner(scan); 422 final ResultScanner scannerWithHeartbeats = TABLE.getScanner(scan); 423 424 Result r1 = null; 425 Result r2 = null; 426 427 while ((r1 = scanner.next()) != null) { 428 // Enforce the specified sleep conditions during calls to the heartbeat scanner 429 configureSleepTime(rowSleepTime, cfSleepTime, sleepBeforeCf); 430 r2 = scannerWithHeartbeats.next(); 431 disableSleeping(); 432 433 assertTrue(r2 != null); 434 try { 435 Result.compareResults(r1, r2); 436 } catch (Exception e) { 437 fail(e.getMessage()); 438 } 439 } 440 441 assertTrue(scannerWithHeartbeats.next() == null); 442 scanner.close(); 443 scannerWithHeartbeats.close(); 444 } 445 446 /** 447 * Helper method for setting the time to sleep between rows and column families. If a sleep time 448 * is negative then that sleep will be disabled 449 */ 450 private static void configureSleepTime(int rowSleepTime, int cfSleepTime, boolean sleepBeforeCf) { 451 HeartbeatHRegion.sleepBetweenRows = rowSleepTime > 0; 452 HeartbeatHRegion.rowSleepTime = rowSleepTime; 453 454 HeartbeatHRegion.sleepBetweenColumnFamilies = cfSleepTime > 0; 455 HeartbeatHRegion.columnFamilySleepTime = cfSleepTime; 456 HeartbeatHRegion.sleepBeforeColumnFamily = sleepBeforeCf; 457 } 458 459 /** 460 * Disable the sleeping mechanism server side. 461 */ 462 private static void disableSleeping() { 463 HeartbeatHRegion.sleepBetweenRows = false; 464 HeartbeatHRegion.sleepBetweenColumnFamilies = false; 465 } 466 467 /** 468 * Custom HRegionServer instance that instantiates {@link HeartbeatRPCServices} in place of 469 * {@link RSRpcServices} to allow us to toggle support for heartbeat messages 470 */ 471 private static class HeartbeatHRegionServer extends HRegionServer { 472 public HeartbeatHRegionServer(Configuration conf) throws IOException, InterruptedException { 473 super(conf); 474 } 475 476 @Override 477 protected RSRpcServices createRpcServices() throws IOException { 478 return new HeartbeatRPCServices(this); 479 } 480 } 481 482 /** 483 * Custom RSRpcServices instance that allows heartbeat support to be toggled 484 */ 485 private static class HeartbeatRPCServices extends RSRpcServices { 486 private static volatile boolean heartbeatsEnabled = true; 487 488 public HeartbeatRPCServices(HRegionServer rs) throws IOException { 489 super(rs); 490 } 491 492 @Override 493 public ScanResponse scan(RpcController controller, ScanRequest request) 494 throws ServiceException { 495 ScanRequest.Builder builder = ScanRequest.newBuilder(request); 496 builder.setClientHandlesHeartbeats(heartbeatsEnabled); 497 return super.scan(controller, builder.build()); 498 } 499 } 500 501 /** 502 * Custom HRegion class that instantiates {@link RegionScanner}s with configurable sleep times 503 * between fetches of row Results and/or column family cells. Useful for emulating an instance 504 * where the server is taking a long time to process a client's scan request 505 */ 506 private static class HeartbeatHRegion extends HRegion { 507 // Row sleeps occur AFTER each row worth of cells is retrieved. 508 private static volatile int rowSleepTime = DEFAULT_ROW_SLEEP_TIME; 509 private static volatile boolean sleepBetweenRows = false; 510 511 // The sleep for column families can be initiated before or after we fetch the cells for the 512 // column family. If the sleep occurs BEFORE then the time limits will be reached inside 513 // StoreScanner while we are fetching individual cells. If the sleep occurs AFTER then the time 514 // limit will be reached inside RegionScanner after all the cells for a column family have been 515 // retrieved. 516 private static volatile boolean sleepBeforeColumnFamily = false; 517 private static volatile int columnFamilySleepTime = DEFAULT_CF_SLEEP_TIME; 518 private static volatile boolean sleepBetweenColumnFamilies = false; 519 520 public HeartbeatHRegion(Path tableDir, WAL wal, FileSystem fs, Configuration confParam, 521 RegionInfo regionInfo, TableDescriptor htd, RegionServerServices rsServices) { 522 super(tableDir, wal, fs, confParam, regionInfo, htd, rsServices); 523 } 524 525 public HeartbeatHRegion(HRegionFileSystem fs, WAL wal, Configuration confParam, 526 TableDescriptor htd, RegionServerServices rsServices) { 527 super(fs, wal, confParam, htd, rsServices); 528 } 529 530 private static void columnFamilySleep() { 531 if (sleepBetweenColumnFamilies) { 532 Threads.sleepWithoutInterrupt(columnFamilySleepTime); 533 } 534 } 535 536 private static void rowSleep() { 537 if (sleepBetweenRows) { 538 Threads.sleepWithoutInterrupt(rowSleepTime); 539 } 540 } 541 542 // Instantiate the custom heartbeat region scanners 543 @Override 544 protected RegionScannerImpl instantiateRegionScanner(Scan scan, 545 List<KeyValueScanner> additionalScanners, long nonceGroup, long nonce) throws IOException { 546 if (scan.isReversed()) { 547 if (scan.getFilter() != null) { 548 scan.getFilter().setReversed(true); 549 } 550 return new HeartbeatReversedRegionScanner(scan, additionalScanners, this); 551 } 552 return new HeartbeatRegionScanner(scan, additionalScanners, this); 553 } 554 } 555 556 /** 557 * Custom ReversedRegionScanner that can be configured to sleep between retrievals of row Results 558 * and/or column family cells 559 */ 560 private static class HeartbeatReversedRegionScanner extends ReversedRegionScannerImpl { 561 HeartbeatReversedRegionScanner(Scan scan, List<KeyValueScanner> additionalScanners, 562 HRegion region) throws IOException { 563 super(scan, additionalScanners, region, HConstants.NO_NONCE, HConstants.NO_NONCE); 564 } 565 566 @Override 567 public boolean nextRaw(List<Cell> outResults, ScannerContext context) throws IOException { 568 boolean moreRows = super.nextRaw(outResults, context); 569 HeartbeatHRegion.rowSleep(); 570 return moreRows; 571 } 572 573 @Override 574 protected void initializeKVHeap(List<KeyValueScanner> scanners, 575 List<KeyValueScanner> joinedScanners, HRegion region) throws IOException { 576 this.storeHeap = 577 new HeartbeatReversedKVHeap(scanners, (CellComparatorImpl) region.getCellComparator()); 578 if (!joinedScanners.isEmpty()) { 579 this.joinedHeap = new HeartbeatReversedKVHeap(joinedScanners, 580 (CellComparatorImpl) region.getCellComparator()); 581 } 582 } 583 } 584 585 /** 586 * Custom RegionScanner that can be configured to sleep between retrievals of row Results and/or 587 * column family cells 588 */ 589 private static class HeartbeatRegionScanner extends RegionScannerImpl { 590 HeartbeatRegionScanner(Scan scan, List<KeyValueScanner> additionalScanners, HRegion region) 591 throws IOException { 592 super(scan, additionalScanners, region, HConstants.NO_NONCE, HConstants.NO_NONCE); 593 } 594 595 @Override 596 public boolean nextRaw(List<Cell> outResults, ScannerContext context) throws IOException { 597 boolean moreRows = super.nextRaw(outResults, context); 598 HeartbeatHRegion.rowSleep(); 599 return moreRows; 600 } 601 602 @Override 603 protected void initializeKVHeap(List<KeyValueScanner> scanners, 604 List<KeyValueScanner> joinedScanners, HRegion region) throws IOException { 605 this.storeHeap = new HeartbeatKVHeap(scanners, region.getCellComparator()); 606 if (!joinedScanners.isEmpty()) { 607 this.joinedHeap = new HeartbeatKVHeap(joinedScanners, region.getCellComparator()); 608 } 609 } 610 } 611 612 /** 613 * Custom KV Heap that can be configured to sleep/wait in between retrievals of column family 614 * cells. Useful for testing 615 */ 616 private static final class HeartbeatKVHeap extends KeyValueHeap { 617 public HeartbeatKVHeap(List<? extends KeyValueScanner> scanners, CellComparator comparator) 618 throws IOException { 619 super(scanners, comparator); 620 } 621 622 HeartbeatKVHeap(List<? extends KeyValueScanner> scanners, KVScannerComparator comparator) 623 throws IOException { 624 super(scanners, comparator); 625 } 626 627 @Override 628 public boolean next(List<Cell> result, ScannerContext context) throws IOException { 629 if (HeartbeatHRegion.sleepBeforeColumnFamily) HeartbeatHRegion.columnFamilySleep(); 630 boolean moreRows = super.next(result, context); 631 if (!HeartbeatHRegion.sleepBeforeColumnFamily) HeartbeatHRegion.columnFamilySleep(); 632 return moreRows; 633 } 634 } 635 636 /** 637 * Custom reversed KV Heap that can be configured to sleep in between retrievals of column family 638 * cells. 639 */ 640 private static final class HeartbeatReversedKVHeap extends ReversedKeyValueHeap { 641 public HeartbeatReversedKVHeap(List<? extends KeyValueScanner> scanners, 642 CellComparatorImpl comparator) throws IOException { 643 super(scanners, comparator); 644 } 645 646 @Override 647 public boolean next(List<Cell> result, ScannerContext context) throws IOException { 648 if (HeartbeatHRegion.sleepBeforeColumnFamily) HeartbeatHRegion.columnFamilySleep(); 649 boolean moreRows = super.next(result, context); 650 if (!HeartbeatHRegion.sleepBeforeColumnFamily) HeartbeatHRegion.columnFamilySleep(); 651 return moreRows; 652 } 653 } 654}