001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022import static org.junit.Assert.fail; 023 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.List; 027import java.util.concurrent.Callable; 028import org.apache.commons.lang3.exception.ExceptionUtils; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.fs.FileSystem; 031import org.apache.hadoop.fs.Path; 032import org.apache.hadoop.hbase.Cell; 033import org.apache.hadoop.hbase.CellComparator; 034import org.apache.hadoop.hbase.CellComparatorImpl; 035import org.apache.hadoop.hbase.CellUtil; 036import org.apache.hadoop.hbase.HBaseClassTestRule; 037import org.apache.hadoop.hbase.HBaseTestingUtil; 038import org.apache.hadoop.hbase.HConstants; 039import org.apache.hadoop.hbase.HTestConst; 040import org.apache.hadoop.hbase.KeyValue; 041import org.apache.hadoop.hbase.TableName; 042import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer; 043import org.apache.hadoop.hbase.client.AsyncConnection; 044import org.apache.hadoop.hbase.client.AsyncTable; 045import org.apache.hadoop.hbase.client.ConnectionFactory; 046import org.apache.hadoop.hbase.client.Put; 047import org.apache.hadoop.hbase.client.RegionInfo; 048import org.apache.hadoop.hbase.client.Result; 049import org.apache.hadoop.hbase.client.ResultScanner; 050import org.apache.hadoop.hbase.client.Scan; 051import org.apache.hadoop.hbase.client.ScanPerNextResultScanner; 052import org.apache.hadoop.hbase.client.Table; 053import org.apache.hadoop.hbase.client.TableDescriptor; 054import org.apache.hadoop.hbase.filter.Filter; 055import org.apache.hadoop.hbase.filter.FilterBase; 056import org.apache.hadoop.hbase.testclassification.LargeTests; 057import org.apache.hadoop.hbase.util.Bytes; 058import org.apache.hadoop.hbase.util.Threads; 059import org.apache.hadoop.hbase.wal.WAL; 060import org.junit.After; 061import org.junit.AfterClass; 062import org.junit.Before; 063import org.junit.BeforeClass; 064import org.junit.ClassRule; 065import org.junit.Test; 066import org.junit.experimental.categories.Category; 067 068import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 069import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 070import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 071 072import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; 073import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; 074 075/** 076 * Here we test to make sure that scans return the expected Results when the server is sending the 077 * Client heartbeat messages. Heartbeat messages are essentially keep-alive messages (they prevent 078 * the scanner on the client side from timing out). A heartbeat message is sent from the server to 079 * the client when the server has exceeded the time limit during the processing of the scan. When 080 * the time limit is reached, the server will return to the Client whatever Results it has 081 * accumulated (potentially empty). 082 */ 083@Category(LargeTests.class) 084public class TestScannerHeartbeatMessages { 085 086 @ClassRule 087 public static final HBaseClassTestRule CLASS_RULE = 088 HBaseClassTestRule.forClass(TestScannerHeartbeatMessages.class); 089 090 private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 091 092 private static AsyncConnection CONN; 093 094 /** 095 * Table configuration 096 */ 097 private static TableName TABLE_NAME = TableName.valueOf("testScannerHeartbeatMessagesTable"); 098 099 private static int NUM_ROWS = 5; 100 private static byte[] ROW = Bytes.toBytes("testRow"); 101 private static byte[][] ROWS = HTestConst.makeNAscii(ROW, NUM_ROWS); 102 103 private static int NUM_FAMILIES = 4; 104 private static byte[] FAMILY = Bytes.toBytes("testFamily"); 105 private static byte[][] FAMILIES = HTestConst.makeNAscii(FAMILY, NUM_FAMILIES); 106 107 private static int NUM_QUALIFIERS = 3; 108 private static byte[] QUALIFIER = Bytes.toBytes("testQualifier"); 109 private static byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, NUM_QUALIFIERS); 110 111 private static int VALUE_SIZE = 128; 112 private static byte[] VALUE = Bytes.createMaxByteArray(VALUE_SIZE); 113 114 // The time limit should be based on the rpc timeout at client, or the client will regards 115 // the request as timeout before server return a heartbeat. 116 private static int SERVER_TIMEOUT = 60000; 117 118 // Time, in milliseconds, that the client will wait for a response from the server before timing 119 // out. This value is used server side to determine when it is necessary to send a heartbeat 120 // message to the client. Time limit will be 500 ms. 121 private static int CLIENT_TIMEOUT = 1000; 122 123 // In this test, we sleep after reading each row. So we should make sure after we get some number 124 // of rows and sleep same times we must reach time limit, and do not timeout after next sleeping. 125 private static int DEFAULT_ROW_SLEEP_TIME = 300; 126 127 // Similar with row sleep time. 128 private static int DEFAULT_CF_SLEEP_TIME = 300; 129 130 @BeforeClass 131 public static void setUpBeforeClass() throws Exception { 132 Configuration conf = TEST_UTIL.getConfiguration(); 133 134 conf.setStrings(HConstants.REGION_IMPL, HeartbeatHRegion.class.getName()); 135 conf.setStrings(HConstants.REGION_SERVER_IMPL, HeartbeatHRegionServer.class.getName()); 136 conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, SERVER_TIMEOUT); 137 conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, SERVER_TIMEOUT); 138 conf.setInt(HConstants.HBASE_CLIENT_PAUSE, 1); 139 140 // Check the timeout condition after every cell 141 conf.setLong(StoreScanner.HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK, 1); 142 TEST_UTIL.startMiniCluster(1); 143 144 createTestTable(TABLE_NAME, ROWS, FAMILIES, QUALIFIERS, VALUE); 145 146 Configuration newConf = new Configuration(conf); 147 newConf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, CLIENT_TIMEOUT); 148 newConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, CLIENT_TIMEOUT); 149 CONN = ConnectionFactory.createAsyncConnection(newConf).get(); 150 } 151 152 static void createTestTable(TableName name, byte[][] rows, byte[][] families, byte[][] qualifiers, 153 byte[] cellValue) throws IOException { 154 Table ht = TEST_UTIL.createTable(name, families); 155 List<Put> puts = createPuts(rows, families, qualifiers, cellValue); 156 ht.put(puts); 157 } 158 159 /** 160 * Make puts to put the input value into each combination of row, family, and qualifier 161 */ 162 static ArrayList<Put> createPuts(byte[][] rows, byte[][] families, byte[][] qualifiers, 163 byte[] value) throws IOException { 164 Put put; 165 ArrayList<Put> puts = new ArrayList<>(); 166 167 for (int row = 0; row < rows.length; row++) { 168 put = new Put(rows[row]); 169 for (int fam = 0; fam < families.length; fam++) { 170 for (int qual = 0; qual < qualifiers.length; qual++) { 171 KeyValue kv = new KeyValue(rows[row], families[fam], qualifiers[qual], qual, value); 172 put.add(kv); 173 } 174 } 175 puts.add(put); 176 } 177 178 return puts; 179 } 180 181 @AfterClass 182 public static void tearDownAfterClass() throws Exception { 183 Closeables.close(CONN, true); 184 TEST_UTIL.shutdownMiniCluster(); 185 } 186 187 @Before 188 public void setupBeforeTest() throws Exception { 189 disableSleeping(); 190 } 191 192 @After 193 public void teardownAfterTest() throws Exception { 194 disableSleeping(); 195 } 196 197 /** 198 * Run the test callable when heartbeats are enabled/disabled. We expect all tests to only pass 199 * when heartbeat messages are enabled (otherwise the test is pointless). When heartbeats are 200 * disabled, the test should throw an exception. 201 */ 202 private void testImportanceOfHeartbeats(Callable<Void> testCallable) throws InterruptedException { 203 HeartbeatRPCServices.heartbeatsEnabled = true; 204 205 try { 206 testCallable.call(); 207 } catch (Exception e) { 208 fail("Heartbeat messages are enabled, exceptions should NOT be thrown. Exception trace:" 209 + ExceptionUtils.getStackTrace(e)); 210 } 211 212 HeartbeatRPCServices.heartbeatsEnabled = false; 213 try { 214 testCallable.call(); 215 } catch (Exception e) { 216 return; 217 } finally { 218 HeartbeatRPCServices.heartbeatsEnabled = true; 219 } 220 fail("Heartbeats messages are disabled, an exception should be thrown. If an exception " 221 + " is not thrown, the test case is not testing the importance of heartbeat messages"); 222 } 223 224 /** 225 * Test the case that the time limit for the scan is reached after each full row of cells is 226 * fetched. 227 */ 228 @Test 229 public void testHeartbeatBetweenRows() throws Exception { 230 testImportanceOfHeartbeats(new Callable<Void>() { 231 232 @Override 233 public Void call() throws Exception { 234 // Configure the scan so that it can read the entire table in a single RPC. We want to test 235 // the case where a scan stops on the server side due to a time limit 236 Scan scan = new Scan(); 237 scan.setMaxResultSize(Long.MAX_VALUE); 238 scan.setCaching(Integer.MAX_VALUE); 239 240 testEquivalenceOfScanWithHeartbeats(scan, DEFAULT_ROW_SLEEP_TIME, -1, false); 241 return null; 242 } 243 }); 244 } 245 246 /** 247 * Test the case that the time limit for scans is reached in between column families 248 */ 249 @Test 250 public void testHeartbeatBetweenColumnFamilies() throws Exception { 251 testImportanceOfHeartbeats(new Callable<Void>() { 252 @Override 253 public Void call() throws Exception { 254 // Configure the scan so that it can read the entire table in a single RPC. We want to test 255 // the case where a scan stops on the server side due to a time limit 256 Scan baseScan = new Scan(); 257 baseScan.setMaxResultSize(Long.MAX_VALUE); 258 baseScan.setCaching(Integer.MAX_VALUE); 259 260 // Copy the scan before each test. When a scan object is used by a scanner, some of its 261 // fields may be changed such as start row 262 Scan scanCopy = new Scan(baseScan); 263 testEquivalenceOfScanWithHeartbeats(scanCopy, -1, DEFAULT_CF_SLEEP_TIME, false); 264 scanCopy = new Scan(baseScan); 265 testEquivalenceOfScanWithHeartbeats(scanCopy, -1, DEFAULT_CF_SLEEP_TIME, true); 266 return null; 267 } 268 }); 269 } 270 271 public static class SparseCellFilter extends FilterBase { 272 273 @Override 274 public ReturnCode filterCell(final Cell v) throws IOException { 275 try { 276 Thread.sleep(CLIENT_TIMEOUT / 2 + 100); 277 } catch (InterruptedException e) { 278 Thread.currentThread().interrupt(); 279 } 280 return Bytes.equals(CellUtil.cloneRow(v), ROWS[NUM_ROWS - 1]) 281 ? ReturnCode.INCLUDE 282 : ReturnCode.SKIP; 283 } 284 285 public static Filter parseFrom(final byte[] pbBytes) { 286 return new SparseCellFilter(); 287 } 288 } 289 290 public static class SparseRowFilter extends FilterBase { 291 292 @Override 293 public boolean filterRowKey(Cell cell) throws IOException { 294 try { 295 Thread.sleep(CLIENT_TIMEOUT / 2 - 100); 296 } catch (InterruptedException e) { 297 Thread.currentThread().interrupt(); 298 } 299 return !Bytes.equals(CellUtil.cloneRow(cell), ROWS[NUM_ROWS - 1]); 300 } 301 302 public static Filter parseFrom(final byte[] pbBytes) { 303 return new SparseRowFilter(); 304 } 305 } 306 307 /** 308 * Test the case that there is a filter which filters most of cells 309 */ 310 @Test 311 public void testHeartbeatWithSparseCellFilter() throws Exception { 312 testImportanceOfHeartbeats(new Callable<Void>() { 313 @Override 314 public Void call() throws Exception { 315 Scan scan = new Scan(); 316 scan.setMaxResultSize(Long.MAX_VALUE); 317 scan.setCaching(Integer.MAX_VALUE); 318 scan.setFilter(new SparseCellFilter()); 319 try (ScanPerNextResultScanner scanner = 320 new ScanPerNextResultScanner(CONN.getTable(TABLE_NAME), scan)) { 321 int num = 0; 322 while (scanner.next() != null) { 323 num++; 324 } 325 assertEquals(1, num); 326 } 327 328 scan = new Scan(); 329 scan.setMaxResultSize(Long.MAX_VALUE); 330 scan.setCaching(Integer.MAX_VALUE); 331 scan.setFilter(new SparseCellFilter()); 332 scan.setAllowPartialResults(true); 333 try (ScanPerNextResultScanner scanner = 334 new ScanPerNextResultScanner(CONN.getTable(TABLE_NAME), scan)) { 335 int num = 0; 336 while (scanner.next() != null) { 337 num++; 338 } 339 assertEquals(NUM_FAMILIES * NUM_QUALIFIERS, num); 340 } 341 342 return null; 343 } 344 }); 345 } 346 347 /** 348 * Test the case that there is a filter which filters most of rows 349 */ 350 @Test 351 public void testHeartbeatWithSparseRowFilter() throws Exception { 352 testImportanceOfHeartbeats(new Callable<Void>() { 353 @Override 354 public Void call() throws Exception { 355 Scan scan = new Scan(); 356 scan.setMaxResultSize(Long.MAX_VALUE); 357 scan.setCaching(Integer.MAX_VALUE); 358 scan.setFilter(new SparseRowFilter()); 359 try (ScanPerNextResultScanner scanner = 360 new ScanPerNextResultScanner(CONN.getTable(TABLE_NAME), scan)) { 361 int num = 0; 362 while (scanner.next() != null) { 363 num++; 364 } 365 assertEquals(1, num); 366 } 367 368 return null; 369 } 370 }); 371 } 372 373 /** 374 * Test the equivalence of a scan versus the same scan executed when heartbeat messages are 375 * necessary 376 * @param scan The scan configuration being tested 377 * @param rowSleepTime The time to sleep between fetches of row cells 378 * @param cfSleepTime The time to sleep between fetches of column family cells 379 * @param sleepBeforeCf set to true when column family sleeps should occur before the cells for 380 * that column family are fetched 381 */ 382 private void testEquivalenceOfScanWithHeartbeats(final Scan scan, int rowSleepTime, 383 int cfSleepTime, boolean sleepBeforeCf) throws Exception { 384 disableSleeping(); 385 AsyncTable<AdvancedScanResultConsumer> table = CONN.getTable(TABLE_NAME); 386 final ResultScanner scanner = new ScanPerNextResultScanner(table, scan); 387 final ResultScanner scannerWithHeartbeats = new ScanPerNextResultScanner(table, scan); 388 389 Result r1 = null; 390 Result r2 = null; 391 392 while ((r1 = scanner.next()) != null) { 393 // Enforce the specified sleep conditions during calls to the heartbeat scanner 394 configureSleepTime(rowSleepTime, cfSleepTime, sleepBeforeCf); 395 r2 = scannerWithHeartbeats.next(); 396 disableSleeping(); 397 398 assertTrue(r2 != null); 399 try { 400 Result.compareResults(r1, r2); 401 } catch (Exception e) { 402 fail(e.getMessage()); 403 } 404 } 405 406 assertTrue(scannerWithHeartbeats.next() == null); 407 scanner.close(); 408 scannerWithHeartbeats.close(); 409 } 410 411 /** 412 * Helper method for setting the time to sleep between rows and column families. If a sleep time 413 * is negative then that sleep will be disabled 414 */ 415 private static void configureSleepTime(int rowSleepTime, int cfSleepTime, boolean sleepBeforeCf) { 416 HeartbeatHRegion.sleepBetweenRows = rowSleepTime > 0; 417 HeartbeatHRegion.rowSleepTime = rowSleepTime; 418 419 HeartbeatHRegion.sleepBetweenColumnFamilies = cfSleepTime > 0; 420 HeartbeatHRegion.columnFamilySleepTime = cfSleepTime; 421 HeartbeatHRegion.sleepBeforeColumnFamily = sleepBeforeCf; 422 } 423 424 /** 425 * Disable the sleeping mechanism server side. 426 */ 427 private static void disableSleeping() { 428 HeartbeatHRegion.sleepBetweenRows = false; 429 HeartbeatHRegion.sleepBetweenColumnFamilies = false; 430 } 431 432 /** 433 * Custom HRegionServer instance that instantiates {@link HeartbeatRPCServices} in place of 434 * {@link RSRpcServices} to allow us to toggle support for heartbeat messages 435 */ 436 private static class HeartbeatHRegionServer extends HRegionServer { 437 public HeartbeatHRegionServer(Configuration conf) throws IOException, InterruptedException { 438 super(conf); 439 } 440 441 @Override 442 protected RSRpcServices createRpcServices() throws IOException { 443 return new HeartbeatRPCServices(this); 444 } 445 } 446 447 /** 448 * Custom RSRpcServices instance that allows heartbeat support to be toggled 449 */ 450 private static class HeartbeatRPCServices extends RSRpcServices { 451 private static volatile boolean heartbeatsEnabled = true; 452 453 public HeartbeatRPCServices(HRegionServer rs) throws IOException { 454 super(rs); 455 } 456 457 @Override 458 public ScanResponse scan(RpcController controller, ScanRequest request) 459 throws ServiceException { 460 ScanRequest.Builder builder = ScanRequest.newBuilder(request); 461 builder.setClientHandlesHeartbeats(heartbeatsEnabled); 462 return super.scan(controller, builder.build()); 463 } 464 } 465 466 /** 467 * Custom HRegion class that instantiates {@link RegionScanner}s with configurable sleep times 468 * between fetches of row Results and/or column family cells. Useful for emulating an instance 469 * where the server is taking a long time to process a client's scan request 470 */ 471 private static class HeartbeatHRegion extends HRegion { 472 // Row sleeps occur AFTER each row worth of cells is retrieved. 473 private static volatile int rowSleepTime = DEFAULT_ROW_SLEEP_TIME; 474 private static volatile boolean sleepBetweenRows = false; 475 476 // The sleep for column families can be initiated before or after we fetch the cells for the 477 // column family. If the sleep occurs BEFORE then the time limits will be reached inside 478 // StoreScanner while we are fetching individual cells. If the sleep occurs AFTER then the time 479 // limit will be reached inside RegionScanner after all the cells for a column family have been 480 // retrieved. 481 private static volatile boolean sleepBeforeColumnFamily = false; 482 private static volatile int columnFamilySleepTime = DEFAULT_CF_SLEEP_TIME; 483 private static volatile boolean sleepBetweenColumnFamilies = false; 484 485 public HeartbeatHRegion(Path tableDir, WAL wal, FileSystem fs, Configuration confParam, 486 RegionInfo regionInfo, TableDescriptor htd, RegionServerServices rsServices) { 487 super(tableDir, wal, fs, confParam, regionInfo, htd, rsServices); 488 } 489 490 public HeartbeatHRegion(HRegionFileSystem fs, WAL wal, Configuration confParam, 491 TableDescriptor htd, RegionServerServices rsServices) { 492 super(fs, wal, confParam, htd, rsServices); 493 } 494 495 private static void columnFamilySleep() { 496 if (sleepBetweenColumnFamilies) { 497 Threads.sleepWithoutInterrupt(columnFamilySleepTime); 498 } 499 } 500 501 private static void rowSleep() { 502 if (sleepBetweenRows) { 503 Threads.sleepWithoutInterrupt(rowSleepTime); 504 } 505 } 506 507 // Instantiate the custom heartbeat region scanners 508 @Override 509 protected RegionScannerImpl instantiateRegionScanner(Scan scan, 510 List<KeyValueScanner> additionalScanners, long nonceGroup, long nonce) throws IOException { 511 if (scan.isReversed()) { 512 if (scan.getFilter() != null) { 513 scan.getFilter().setReversed(true); 514 } 515 return new HeartbeatReversedRegionScanner(scan, additionalScanners, this); 516 } 517 return new HeartbeatRegionScanner(scan, additionalScanners, this); 518 } 519 } 520 521 /** 522 * Custom ReversedRegionScanner that can be configured to sleep between retrievals of row Results 523 * and/or column family cells 524 */ 525 private static class HeartbeatReversedRegionScanner extends ReversedRegionScannerImpl { 526 HeartbeatReversedRegionScanner(Scan scan, List<KeyValueScanner> additionalScanners, 527 HRegion region) throws IOException { 528 super(scan, additionalScanners, region, HConstants.NO_NONCE, HConstants.NO_NONCE); 529 } 530 531 @Override 532 public boolean nextRaw(List<Cell> outResults, ScannerContext context) throws IOException { 533 boolean moreRows = super.nextRaw(outResults, context); 534 HeartbeatHRegion.rowSleep(); 535 return moreRows; 536 } 537 538 @Override 539 protected void initializeKVHeap(List<KeyValueScanner> scanners, 540 List<KeyValueScanner> joinedScanners, HRegion region) throws IOException { 541 this.storeHeap = 542 new HeartbeatReversedKVHeap(scanners, (CellComparatorImpl) region.getCellComparator()); 543 if (!joinedScanners.isEmpty()) { 544 this.joinedHeap = new HeartbeatReversedKVHeap(joinedScanners, 545 (CellComparatorImpl) region.getCellComparator()); 546 } 547 } 548 } 549 550 /** 551 * Custom RegionScanner that can be configured to sleep between retrievals of row Results and/or 552 * column family cells 553 */ 554 private static class HeartbeatRegionScanner extends RegionScannerImpl { 555 HeartbeatRegionScanner(Scan scan, List<KeyValueScanner> additionalScanners, HRegion region) 556 throws IOException { 557 super(scan, additionalScanners, region, HConstants.NO_NONCE, HConstants.NO_NONCE); 558 } 559 560 @Override 561 public boolean nextRaw(List<Cell> outResults, ScannerContext context) throws IOException { 562 boolean moreRows = super.nextRaw(outResults, context); 563 HeartbeatHRegion.rowSleep(); 564 return moreRows; 565 } 566 567 @Override 568 protected void initializeKVHeap(List<KeyValueScanner> scanners, 569 List<KeyValueScanner> joinedScanners, HRegion region) throws IOException { 570 this.storeHeap = new HeartbeatKVHeap(scanners, region.getCellComparator()); 571 if (!joinedScanners.isEmpty()) { 572 this.joinedHeap = new HeartbeatKVHeap(joinedScanners, region.getCellComparator()); 573 } 574 } 575 } 576 577 /** 578 * Custom KV Heap that can be configured to sleep/wait in between retrievals of column family 579 * cells. Useful for testing 580 */ 581 private static final class HeartbeatKVHeap extends KeyValueHeap { 582 public HeartbeatKVHeap(List<? extends KeyValueScanner> scanners, CellComparator comparator) 583 throws IOException { 584 super(scanners, comparator); 585 } 586 587 HeartbeatKVHeap(List<? extends KeyValueScanner> scanners, KVScannerComparator comparator) 588 throws IOException { 589 super(scanners, comparator); 590 } 591 592 @Override 593 public boolean next(List<Cell> result, ScannerContext context) throws IOException { 594 if (HeartbeatHRegion.sleepBeforeColumnFamily) HeartbeatHRegion.columnFamilySleep(); 595 boolean moreRows = super.next(result, context); 596 if (!HeartbeatHRegion.sleepBeforeColumnFamily) HeartbeatHRegion.columnFamilySleep(); 597 return moreRows; 598 } 599 } 600 601 /** 602 * Custom reversed KV Heap that can be configured to sleep in between retrievals of column family 603 * cells. 604 */ 605 private static final class HeartbeatReversedKVHeap extends ReversedKeyValueHeap { 606 public HeartbeatReversedKVHeap(List<? extends KeyValueScanner> scanners, 607 CellComparatorImpl comparator) throws IOException { 608 super(scanners, comparator); 609 } 610 611 @Override 612 public boolean next(List<Cell> result, ScannerContext context) throws IOException { 613 if (HeartbeatHRegion.sleepBeforeColumnFamily) HeartbeatHRegion.columnFamilySleep(); 614 boolean moreRows = super.next(result, context); 615 if (!HeartbeatHRegion.sleepBeforeColumnFamily) HeartbeatHRegion.columnFamilySleep(); 616 return moreRows; 617 } 618 } 619}