001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022import static org.junit.Assert.fail; 023 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.List; 027import java.util.concurrent.Callable; 028import org.apache.commons.lang3.exception.ExceptionUtils; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.fs.FileSystem; 031import org.apache.hadoop.fs.Path; 032import org.apache.hadoop.hbase.Cell; 033import org.apache.hadoop.hbase.CellComparator; 034import org.apache.hadoop.hbase.CellComparatorImpl; 035import org.apache.hadoop.hbase.CellUtil; 036import org.apache.hadoop.hbase.HBaseClassTestRule; 037import org.apache.hadoop.hbase.HBaseTestingUtility; 038import org.apache.hadoop.hbase.HConstants; 039import org.apache.hadoop.hbase.HTestConst; 040import org.apache.hadoop.hbase.KeyValue; 041import org.apache.hadoop.hbase.TableName; 042import org.apache.hadoop.hbase.client.Put; 043import org.apache.hadoop.hbase.client.RegionInfo; 044import org.apache.hadoop.hbase.client.Result; 045import org.apache.hadoop.hbase.client.ResultScanner; 046import org.apache.hadoop.hbase.client.Scan; 047import org.apache.hadoop.hbase.client.Table; 048import org.apache.hadoop.hbase.client.TableDescriptor; 049import org.apache.hadoop.hbase.filter.Filter; 050import org.apache.hadoop.hbase.filter.FilterBase; 051import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl; 052import org.apache.hadoop.hbase.testclassification.MediumTests; 053import org.apache.hadoop.hbase.util.Bytes; 054import org.apache.hadoop.hbase.util.Threads; 055import org.apache.hadoop.hbase.wal.WAL; 056import org.junit.After; 057import org.junit.AfterClass; 058import org.junit.Before; 059import org.junit.BeforeClass; 060import org.junit.ClassRule; 061import org.junit.Test; 062import org.junit.experimental.categories.Category; 063 064import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 065import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 066 067import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; 068import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; 069 070/** 071 * Here we test to make sure that scans return the expected Results when the server is sending the 072 * Client heartbeat messages. Heartbeat messages are essentially keep-alive messages (they prevent 073 * the scanner on the client side from timing out). A heartbeat message is sent from the server to 074 * the client when the server has exceeded the time limit during the processing of the scan. When 075 * the time limit is reached, the server will return to the Client whatever Results it has 076 * accumulated (potentially empty). 077 */ 078@Category(MediumTests.class) 079public class TestScannerHeartbeatMessages { 080 081 @ClassRule 082 public static final HBaseClassTestRule CLASS_RULE = 083 HBaseClassTestRule.forClass(TestScannerHeartbeatMessages.class); 084 085 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 086 087 private static Table TABLE = null; 088 089 /** 090 * Table configuration 091 */ 092 private static TableName TABLE_NAME = TableName.valueOf("testScannerHeartbeatMessagesTable"); 093 094 private static int NUM_ROWS = 5; 095 private static byte[] ROW = Bytes.toBytes("testRow"); 096 private static byte[][] ROWS = HTestConst.makeNAscii(ROW, NUM_ROWS); 097 098 private static int NUM_FAMILIES = 4; 099 private static byte[] FAMILY = Bytes.toBytes("testFamily"); 100 private static byte[][] FAMILIES = HTestConst.makeNAscii(FAMILY, NUM_FAMILIES); 101 102 private static int NUM_QUALIFIERS = 3; 103 private static byte[] QUALIFIER = Bytes.toBytes("testQualifier"); 104 private static byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, NUM_QUALIFIERS); 105 106 private static int VALUE_SIZE = 128; 107 private static byte[] VALUE = Bytes.createMaxByteArray(VALUE_SIZE); 108 109 // The time limit should be based on the rpc timeout at client, or the client will regards 110 // the request as timeout before server return a heartbeat. 111 private static int SERVER_TIMEOUT = 60000; 112 113 // Time, in milliseconds, that the client will wait for a response from the server before timing 114 // out. This value is used server side to determine when it is necessary to send a heartbeat 115 // message to the client. Time limit will be 500 ms. 116 private static int CLIENT_TIMEOUT = 1000; 117 118 // In this test, we sleep after reading each row. So we should make sure after we get some number 119 // of rows and sleep same times we must reach time limit, and do not timeout after next sleeping. 120 private static int DEFAULT_ROW_SLEEP_TIME = 300; 121 122 // Similar with row sleep time. 123 private static int DEFAULT_CF_SLEEP_TIME = 300; 124 125 @BeforeClass 126 public static void setUpBeforeClass() throws Exception { 127 Configuration conf = TEST_UTIL.getConfiguration(); 128 129 conf.setStrings(HConstants.REGION_IMPL, HeartbeatHRegion.class.getName()); 130 conf.setStrings(HConstants.REGION_SERVER_IMPL, HeartbeatHRegionServer.class.getName()); 131 conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, SERVER_TIMEOUT); 132 conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, SERVER_TIMEOUT); 133 conf.setInt(HConstants.HBASE_CLIENT_PAUSE, 1); 134 135 // Check the timeout condition after every cell 136 conf.setLong(StoreScanner.HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK, 1); 137 TEST_UTIL.startMiniCluster(1); 138 139 TABLE = createTestTable(TABLE_NAME, ROWS, FAMILIES, QUALIFIERS, VALUE); 140 } 141 142 static Table createTestTable(TableName name, byte[][] rows, byte[][] families, 143 byte[][] qualifiers, byte[] cellValue) throws IOException { 144 Table ht = TEST_UTIL.createTable(name, families); 145 List<Put> puts = createPuts(rows, families, qualifiers, cellValue); 146 ht.put(puts); 147 ht.getConfiguration().setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, CLIENT_TIMEOUT); 148 return ht; 149 } 150 151 /** 152 * Make puts to put the input value into each combination of row, family, and qualifier 153 */ 154 static ArrayList<Put> createPuts(byte[][] rows, byte[][] families, byte[][] qualifiers, 155 byte[] value) throws IOException { 156 Put put; 157 ArrayList<Put> puts = new ArrayList<>(); 158 159 for (int row = 0; row < rows.length; row++) { 160 put = new Put(rows[row]); 161 for (int fam = 0; fam < families.length; fam++) { 162 for (int qual = 0; qual < qualifiers.length; qual++) { 163 KeyValue kv = new KeyValue(rows[row], families[fam], qualifiers[qual], qual, value); 164 put.add(kv); 165 } 166 } 167 puts.add(put); 168 } 169 170 return puts; 171 } 172 173 @AfterClass 174 public static void tearDownAfterClass() throws Exception { 175 TEST_UTIL.shutdownMiniCluster(); 176 } 177 178 @Before 179 public void setupBeforeTest() throws Exception { 180 disableSleeping(); 181 } 182 183 @After 184 public void teardownAfterTest() throws Exception { 185 disableSleeping(); 186 } 187 188 /** 189 * Run the test callable when heartbeats are enabled/disabled. We expect all tests to only pass 190 * when heartbeat messages are enabled (otherwise the test is pointless). When heartbeats are 191 * disabled, the test should throw an exception. 192 */ 193 private void testImportanceOfHeartbeats(Callable<Void> testCallable) throws InterruptedException { 194 HeartbeatRPCServices.heartbeatsEnabled = true; 195 196 try { 197 testCallable.call(); 198 } catch (Exception e) { 199 fail("Heartbeat messages are enabled, exceptions should NOT be thrown. Exception trace:" 200 + ExceptionUtils.getStackTrace(e)); 201 } 202 203 HeartbeatRPCServices.heartbeatsEnabled = false; 204 try { 205 testCallable.call(); 206 } catch (Exception e) { 207 return; 208 } finally { 209 HeartbeatRPCServices.heartbeatsEnabled = true; 210 } 211 fail("Heartbeats messages are disabled, an exception should be thrown. If an exception " 212 + " is not thrown, the test case is not testing the importance of heartbeat messages"); 213 } 214 215 /** 216 * Test the case that the time limit for the scan is reached after each full row of cells is 217 * fetched. 218 */ 219 @Test 220 public void testHeartbeatBetweenRows() throws Exception { 221 testImportanceOfHeartbeats(new Callable<Void>() { 222 223 @Override 224 public Void call() throws Exception { 225 // Configure the scan so that it can read the entire table in a single RPC. We want to test 226 // the case where a scan stops on the server side due to a time limit 227 Scan scan = new Scan(); 228 scan.setMaxResultSize(Long.MAX_VALUE); 229 scan.setCaching(Integer.MAX_VALUE); 230 231 testEquivalenceOfScanWithHeartbeats(scan, DEFAULT_ROW_SLEEP_TIME, -1, false); 232 return null; 233 } 234 }); 235 } 236 237 /** 238 * Test the case that the time limit for scans is reached in between column families 239 */ 240 @Test 241 public void testHeartbeatBetweenColumnFamilies() throws Exception { 242 testImportanceOfHeartbeats(new Callable<Void>() { 243 @Override 244 public Void call() throws Exception { 245 // Configure the scan so that it can read the entire table in a single RPC. We want to test 246 // the case where a scan stops on the server side due to a time limit 247 Scan baseScan = new Scan(); 248 baseScan.setMaxResultSize(Long.MAX_VALUE); 249 baseScan.setCaching(Integer.MAX_VALUE); 250 251 // Copy the scan before each test. When a scan object is used by a scanner, some of its 252 // fields may be changed such as start row 253 Scan scanCopy = new Scan(baseScan); 254 testEquivalenceOfScanWithHeartbeats(scanCopy, -1, DEFAULT_CF_SLEEP_TIME, false); 255 scanCopy = new Scan(baseScan); 256 testEquivalenceOfScanWithHeartbeats(scanCopy, -1, DEFAULT_CF_SLEEP_TIME, true); 257 return null; 258 } 259 }); 260 } 261 262 public static class SparseCellFilter extends FilterBase { 263 264 @Override 265 public ReturnCode filterCell(final Cell v) throws IOException { 266 try { 267 Thread.sleep(CLIENT_TIMEOUT / 2 + 100); 268 } catch (InterruptedException e) { 269 Thread.currentThread().interrupt(); 270 } 271 return Bytes.equals(CellUtil.cloneRow(v), ROWS[NUM_ROWS - 1]) ? ReturnCode.INCLUDE 272 : ReturnCode.SKIP; 273 } 274 275 public static Filter parseFrom(final byte[] pbBytes) { 276 return new SparseCellFilter(); 277 } 278 } 279 280 public static class SparseRowFilter extends FilterBase { 281 282 @Override 283 public boolean filterRowKey(Cell cell) throws IOException { 284 try { 285 Thread.sleep(CLIENT_TIMEOUT / 2 - 100); 286 } catch (InterruptedException e) { 287 Thread.currentThread().interrupt(); 288 } 289 return !Bytes.equals(CellUtil.cloneRow(cell), ROWS[NUM_ROWS - 1]); 290 } 291 292 public static Filter parseFrom(final byte[] pbBytes) { 293 return new SparseRowFilter(); 294 } 295 } 296 297 /** 298 * Test the case that there is a filter which filters most of cells 299 */ 300 @Test 301 public void testHeartbeatWithSparseCellFilter() throws Exception { 302 testImportanceOfHeartbeats(new Callable<Void>() { 303 @Override 304 public Void call() throws Exception { 305 Scan scan = new Scan(); 306 scan.setMaxResultSize(Long.MAX_VALUE); 307 scan.setCaching(Integer.MAX_VALUE); 308 scan.setFilter(new SparseCellFilter()); 309 ResultScanner scanner = TABLE.getScanner(scan); 310 int num = 0; 311 while (scanner.next() != null) { 312 num++; 313 } 314 assertEquals(1, num); 315 scanner.close(); 316 317 scan = new Scan(); 318 scan.setMaxResultSize(Long.MAX_VALUE); 319 scan.setCaching(Integer.MAX_VALUE); 320 scan.setFilter(new SparseCellFilter()); 321 scan.setAllowPartialResults(true); 322 scanner = TABLE.getScanner(scan); 323 num = 0; 324 while (scanner.next() != null) { 325 num++; 326 } 327 assertEquals(NUM_FAMILIES * NUM_QUALIFIERS, num); 328 scanner.close(); 329 330 return null; 331 } 332 }); 333 } 334 335 /** 336 * Test the case that there is a filter which filters most of rows 337 */ 338 @Test 339 public void testHeartbeatWithSparseRowFilter() throws Exception { 340 testImportanceOfHeartbeats(new Callable<Void>() { 341 @Override 342 public Void call() throws Exception { 343 Scan scan = new Scan(); 344 scan.setMaxResultSize(Long.MAX_VALUE); 345 scan.setCaching(Integer.MAX_VALUE); 346 scan.setFilter(new SparseRowFilter()); 347 ResultScanner scanner = TABLE.getScanner(scan); 348 int num = 0; 349 while (scanner.next() != null) { 350 num++; 351 } 352 assertEquals(1, num); 353 scanner.close(); 354 355 return null; 356 } 357 }); 358 } 359 360 /** 361 * Test the equivalence of a scan versus the same scan executed when heartbeat messages are 362 * necessary 363 * @param scan The scan configuration being tested 364 * @param rowSleepTime The time to sleep between fetches of row cells 365 * @param cfSleepTime The time to sleep between fetches of column family cells 366 * @param sleepBeforeCf set to true when column family sleeps should occur before the cells for 367 * that column family are fetched 368 */ 369 private void testEquivalenceOfScanWithHeartbeats(final Scan scan, int rowSleepTime, 370 int cfSleepTime, boolean sleepBeforeCf) throws Exception { 371 disableSleeping(); 372 final ResultScanner scanner = TABLE.getScanner(scan); 373 final ResultScanner scannerWithHeartbeats = TABLE.getScanner(scan); 374 375 Result r1 = null; 376 Result r2 = null; 377 378 while ((r1 = scanner.next()) != null) { 379 // Enforce the specified sleep conditions during calls to the heartbeat scanner 380 configureSleepTime(rowSleepTime, cfSleepTime, sleepBeforeCf); 381 r2 = scannerWithHeartbeats.next(); 382 disableSleeping(); 383 384 assertTrue(r2 != null); 385 try { 386 Result.compareResults(r1, r2); 387 } catch (Exception e) { 388 fail(e.getMessage()); 389 } 390 } 391 392 assertTrue(scannerWithHeartbeats.next() == null); 393 scanner.close(); 394 scannerWithHeartbeats.close(); 395 } 396 397 /** 398 * Helper method for setting the time to sleep between rows and column families. If a sleep time 399 * is negative then that sleep will be disabled 400 */ 401 private static void configureSleepTime(int rowSleepTime, int cfSleepTime, boolean sleepBeforeCf) { 402 HeartbeatHRegion.sleepBetweenRows = rowSleepTime > 0; 403 HeartbeatHRegion.rowSleepTime = rowSleepTime; 404 405 HeartbeatHRegion.sleepBetweenColumnFamilies = cfSleepTime > 0; 406 HeartbeatHRegion.columnFamilySleepTime = cfSleepTime; 407 HeartbeatHRegion.sleepBeforeColumnFamily = sleepBeforeCf; 408 } 409 410 /** 411 * Disable the sleeping mechanism server side. 412 */ 413 private static void disableSleeping() { 414 HeartbeatHRegion.sleepBetweenRows = false; 415 HeartbeatHRegion.sleepBetweenColumnFamilies = false; 416 } 417 418 /** 419 * Custom HRegionServer instance that instantiates {@link HeartbeatRPCServices} in place of 420 * {@link RSRpcServices} to allow us to toggle support for heartbeat messages 421 */ 422 private static class HeartbeatHRegionServer extends HRegionServer { 423 public HeartbeatHRegionServer(Configuration conf) throws IOException, InterruptedException { 424 super(conf); 425 } 426 427 @Override 428 protected RSRpcServices createRpcServices() throws IOException { 429 return new HeartbeatRPCServices(this); 430 } 431 } 432 433 /** 434 * Custom RSRpcServices instance that allows heartbeat support to be toggled 435 */ 436 private static class HeartbeatRPCServices extends RSRpcServices { 437 private static volatile boolean heartbeatsEnabled = true; 438 439 public HeartbeatRPCServices(HRegionServer rs) throws IOException { 440 super(rs); 441 } 442 443 @Override 444 public ScanResponse scan(RpcController controller, ScanRequest request) 445 throws ServiceException { 446 ScanRequest.Builder builder = ScanRequest.newBuilder(request); 447 builder.setClientHandlesHeartbeats(heartbeatsEnabled); 448 return super.scan(controller, builder.build()); 449 } 450 } 451 452 /** 453 * Custom HRegion class that instantiates {@link RegionScanner}s with configurable sleep times 454 * between fetches of row Results and/or column family cells. Useful for emulating an instance 455 * where the server is taking a long time to process a client's scan request 456 */ 457 private static class HeartbeatHRegion extends HRegion { 458 // Row sleeps occur AFTER each row worth of cells is retrieved. 459 private static volatile int rowSleepTime = DEFAULT_ROW_SLEEP_TIME; 460 private static volatile boolean sleepBetweenRows = false; 461 462 // The sleep for column families can be initiated before or after we fetch the cells for the 463 // column family. If the sleep occurs BEFORE then the time limits will be reached inside 464 // StoreScanner while we are fetching individual cells. If the sleep occurs AFTER then the time 465 // limit will be reached inside RegionScanner after all the cells for a column family have been 466 // retrieved. 467 private static volatile boolean sleepBeforeColumnFamily = false; 468 private static volatile int columnFamilySleepTime = DEFAULT_CF_SLEEP_TIME; 469 private static volatile boolean sleepBetweenColumnFamilies = false; 470 471 public HeartbeatHRegion(Path tableDir, WAL wal, FileSystem fs, Configuration confParam, 472 RegionInfo regionInfo, TableDescriptor htd, RegionServerServices rsServices) { 473 super(tableDir, wal, fs, confParam, regionInfo, htd, rsServices); 474 } 475 476 public HeartbeatHRegion(HRegionFileSystem fs, WAL wal, Configuration confParam, 477 TableDescriptor htd, RegionServerServices rsServices) { 478 super(fs, wal, confParam, htd, rsServices); 479 } 480 481 private static void columnFamilySleep() { 482 if (sleepBetweenColumnFamilies) { 483 Threads.sleepWithoutInterrupt(columnFamilySleepTime); 484 } 485 } 486 487 private static void rowSleep() { 488 if (sleepBetweenRows) { 489 Threads.sleepWithoutInterrupt(rowSleepTime); 490 } 491 } 492 493 // Instantiate the custom heartbeat region scanners 494 @Override 495 protected RegionScannerImpl instantiateRegionScanner(Scan scan, 496 List<KeyValueScanner> additionalScanners, long nonceGroup, long nonce) throws IOException { 497 if (scan.isReversed()) { 498 if (scan.getFilter() != null) { 499 scan.getFilter().setReversed(true); 500 } 501 return new HeartbeatReversedRegionScanner(scan, additionalScanners, this); 502 } 503 return new HeartbeatRegionScanner(scan, additionalScanners, this); 504 } 505 } 506 507 /** 508 * Custom ReversedRegionScanner that can be configured to sleep between retrievals of row Results 509 * and/or column family cells 510 */ 511 private static class HeartbeatReversedRegionScanner extends ReversedRegionScannerImpl { 512 HeartbeatReversedRegionScanner(Scan scan, List<KeyValueScanner> additionalScanners, 513 HRegion region) throws IOException { 514 super(scan, additionalScanners, region); 515 } 516 517 @Override 518 public boolean nextRaw(List<Cell> outResults, ScannerContext context) throws IOException { 519 boolean moreRows = super.nextRaw(outResults, context); 520 HeartbeatHRegion.rowSleep(); 521 return moreRows; 522 } 523 524 @Override 525 protected void initializeKVHeap(List<KeyValueScanner> scanners, 526 List<KeyValueScanner> joinedScanners, HRegion region) throws IOException { 527 this.storeHeap = 528 new HeartbeatReversedKVHeap(scanners, (CellComparatorImpl) region.getCellComparator()); 529 if (!joinedScanners.isEmpty()) { 530 this.joinedHeap = new HeartbeatReversedKVHeap(joinedScanners, 531 (CellComparatorImpl) region.getCellComparator()); 532 } 533 } 534 } 535 536 /** 537 * Custom RegionScanner that can be configured to sleep between retrievals of row Results and/or 538 * column family cells 539 */ 540 private static class HeartbeatRegionScanner extends RegionScannerImpl { 541 HeartbeatRegionScanner(Scan scan, List<KeyValueScanner> additionalScanners, HRegion region) 542 throws IOException { 543 region.super(scan, additionalScanners, region); 544 } 545 546 @Override 547 public boolean nextRaw(List<Cell> outResults, ScannerContext context) throws IOException { 548 boolean moreRows = super.nextRaw(outResults, context); 549 HeartbeatHRegion.rowSleep(); 550 return moreRows; 551 } 552 553 @Override 554 protected void initializeKVHeap(List<KeyValueScanner> scanners, 555 List<KeyValueScanner> joinedScanners, HRegion region) throws IOException { 556 this.storeHeap = 557 new HeartbeatKVHeap(scanners, region.getCellComparator()); 558 if (!joinedScanners.isEmpty()) { 559 this.joinedHeap = 560 new HeartbeatKVHeap(joinedScanners, region.getCellComparator()); 561 } 562 } 563 } 564 565 /** 566 * Custom KV Heap that can be configured to sleep/wait in between retrievals of column family 567 * cells. Useful for testing 568 */ 569 private static final class HeartbeatKVHeap extends KeyValueHeap { 570 public HeartbeatKVHeap(List<? extends KeyValueScanner> scanners, CellComparator comparator) 571 throws IOException { 572 super(scanners, comparator); 573 } 574 575 HeartbeatKVHeap(List<? extends KeyValueScanner> scanners, KVScannerComparator comparator) 576 throws IOException { 577 super(scanners, comparator); 578 } 579 580 @Override 581 public boolean next(List<Cell> result, ScannerContext context) throws IOException { 582 if (HeartbeatHRegion.sleepBeforeColumnFamily) HeartbeatHRegion.columnFamilySleep(); 583 boolean moreRows = super.next(result, context); 584 if (!HeartbeatHRegion.sleepBeforeColumnFamily) HeartbeatHRegion.columnFamilySleep(); 585 return moreRows; 586 } 587 } 588 589 /** 590 * Custom reversed KV Heap that can be configured to sleep in between retrievals of column family 591 * cells. 592 */ 593 private static final class HeartbeatReversedKVHeap extends ReversedKeyValueHeap { 594 public HeartbeatReversedKVHeap(List<? extends KeyValueScanner> scanners, 595 CellComparatorImpl comparator) throws IOException { 596 super(scanners, comparator); 597 } 598 599 @Override 600 public boolean next(List<Cell> result, ScannerContext context) throws IOException { 601 if (HeartbeatHRegion.sleepBeforeColumnFamily) HeartbeatHRegion.columnFamilySleep(); 602 boolean moreRows = super.next(result, context); 603 if (!HeartbeatHRegion.sleepBeforeColumnFamily) HeartbeatHRegion.columnFamilySleep(); 604 return moreRows; 605 } 606 } 607}