001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022import static org.junit.Assert.fail;
023
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.List;
027import java.util.concurrent.Callable;
028import org.apache.commons.lang3.exception.ExceptionUtils;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.FileSystem;
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.hbase.Cell;
033import org.apache.hadoop.hbase.CellComparator;
034import org.apache.hadoop.hbase.CellComparatorImpl;
035import org.apache.hadoop.hbase.CellUtil;
036import org.apache.hadoop.hbase.HBaseClassTestRule;
037import org.apache.hadoop.hbase.HBaseTestingUtil;
038import org.apache.hadoop.hbase.HConstants;
039import org.apache.hadoop.hbase.HTestConst;
040import org.apache.hadoop.hbase.KeyValue;
041import org.apache.hadoop.hbase.TableName;
042import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer;
043import org.apache.hadoop.hbase.client.AsyncConnection;
044import org.apache.hadoop.hbase.client.AsyncTable;
045import org.apache.hadoop.hbase.client.ConnectionFactory;
046import org.apache.hadoop.hbase.client.Put;
047import org.apache.hadoop.hbase.client.RegionInfo;
048import org.apache.hadoop.hbase.client.Result;
049import org.apache.hadoop.hbase.client.ResultScanner;
050import org.apache.hadoop.hbase.client.Scan;
051import org.apache.hadoop.hbase.client.ScanPerNextResultScanner;
052import org.apache.hadoop.hbase.client.Table;
053import org.apache.hadoop.hbase.client.TableDescriptor;
054import org.apache.hadoop.hbase.filter.Filter;
055import org.apache.hadoop.hbase.filter.FilterBase;
056import org.apache.hadoop.hbase.testclassification.LargeTests;
057import org.apache.hadoop.hbase.util.Bytes;
058import org.apache.hadoop.hbase.util.Threads;
059import org.apache.hadoop.hbase.wal.WAL;
060import org.junit.After;
061import org.junit.AfterClass;
062import org.junit.Before;
063import org.junit.BeforeClass;
064import org.junit.ClassRule;
065import org.junit.Test;
066import org.junit.experimental.categories.Category;
067
068import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
069import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
070import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
071
072import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
073import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
074
075/**
076 * Here we test to make sure that scans return the expected Results when the server is sending the
077 * Client heartbeat messages. Heartbeat messages are essentially keep-alive messages (they prevent
078 * the scanner on the client side from timing out). A heartbeat message is sent from the server to
079 * the client when the server has exceeded the time limit during the processing of the scan. When
080 * the time limit is reached, the server will return to the Client whatever Results it has
081 * accumulated (potentially empty).
082 */
083@Category(LargeTests.class)
084public class TestScannerHeartbeatMessages {
085
086  @ClassRule
087  public static final HBaseClassTestRule CLASS_RULE =
088      HBaseClassTestRule.forClass(TestScannerHeartbeatMessages.class);
089
090  private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
091
092  private static AsyncConnection CONN;
093
094  /**
095   * Table configuration
096   */
097  private static TableName TABLE_NAME = TableName.valueOf("testScannerHeartbeatMessagesTable");
098
099  private static int NUM_ROWS = 5;
100  private static byte[] ROW = Bytes.toBytes("testRow");
101  private static byte[][] ROWS = HTestConst.makeNAscii(ROW, NUM_ROWS);
102
103  private static int NUM_FAMILIES = 4;
104  private static byte[] FAMILY = Bytes.toBytes("testFamily");
105  private static byte[][] FAMILIES = HTestConst.makeNAscii(FAMILY, NUM_FAMILIES);
106
107  private static int NUM_QUALIFIERS = 3;
108  private static byte[] QUALIFIER = Bytes.toBytes("testQualifier");
109  private static byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, NUM_QUALIFIERS);
110
111  private static int VALUE_SIZE = 128;
112  private static byte[] VALUE = Bytes.createMaxByteArray(VALUE_SIZE);
113
114  // The time limit should be based on the rpc timeout at client, or the client will regards
115  // the request as timeout before server return a heartbeat.
116  private static int SERVER_TIMEOUT = 60000;
117
118  // Time, in milliseconds, that the client will wait for a response from the server before timing
119  // out. This value is used server side to determine when it is necessary to send a heartbeat
120  // message to the client. Time limit will be 500 ms.
121  private static int CLIENT_TIMEOUT = 1000;
122
123  // In this test, we sleep after reading each row. So we should make sure after we get some number
124  // of rows and sleep same times we must reach time limit, and do not timeout after next sleeping.
125  private static int DEFAULT_ROW_SLEEP_TIME = 300;
126
127  // Similar with row sleep time.
128  private static int DEFAULT_CF_SLEEP_TIME = 300;
129
130  @BeforeClass
131  public static void setUpBeforeClass() throws Exception {
132    Configuration conf = TEST_UTIL.getConfiguration();
133
134    conf.setStrings(HConstants.REGION_IMPL, HeartbeatHRegion.class.getName());
135    conf.setStrings(HConstants.REGION_SERVER_IMPL, HeartbeatHRegionServer.class.getName());
136    conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, SERVER_TIMEOUT);
137    conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, SERVER_TIMEOUT);
138    conf.setInt(HConstants.HBASE_CLIENT_PAUSE, 1);
139
140    // Check the timeout condition after every cell
141    conf.setLong(StoreScanner.HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK, 1);
142    TEST_UTIL.startMiniCluster(1);
143
144    createTestTable(TABLE_NAME, ROWS, FAMILIES, QUALIFIERS, VALUE);
145
146    Configuration newConf = new Configuration(conf);
147    newConf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, CLIENT_TIMEOUT);
148    newConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, CLIENT_TIMEOUT);
149    CONN = ConnectionFactory.createAsyncConnection(newConf).get();
150  }
151
152  static void createTestTable(TableName name, byte[][] rows, byte[][] families, byte[][] qualifiers,
153      byte[] cellValue) throws IOException {
154    Table ht = TEST_UTIL.createTable(name, families);
155    List<Put> puts = createPuts(rows, families, qualifiers, cellValue);
156    ht.put(puts);
157  }
158
159  /**
160   * Make puts to put the input value into each combination of row, family, and qualifier
161   */
162  static ArrayList<Put> createPuts(byte[][] rows, byte[][] families, byte[][] qualifiers,
163      byte[] value) throws IOException {
164    Put put;
165    ArrayList<Put> puts = new ArrayList<>();
166
167    for (int row = 0; row < rows.length; row++) {
168      put = new Put(rows[row]);
169      for (int fam = 0; fam < families.length; fam++) {
170        for (int qual = 0; qual < qualifiers.length; qual++) {
171          KeyValue kv = new KeyValue(rows[row], families[fam], qualifiers[qual], qual, value);
172          put.add(kv);
173        }
174      }
175      puts.add(put);
176    }
177
178    return puts;
179  }
180
181  @AfterClass
182  public static void tearDownAfterClass() throws Exception {
183    Closeables.close(CONN, true);
184    TEST_UTIL.shutdownMiniCluster();
185  }
186
187  @Before
188  public void setupBeforeTest() throws Exception {
189    disableSleeping();
190  }
191
192  @After
193  public void teardownAfterTest() throws Exception {
194    disableSleeping();
195  }
196
197  /**
198   * Run the test callable when heartbeats are enabled/disabled. We expect all tests to only pass
199   * when heartbeat messages are enabled (otherwise the test is pointless). When heartbeats are
200   * disabled, the test should throw an exception.
201   */
202  private void testImportanceOfHeartbeats(Callable<Void> testCallable) throws InterruptedException {
203    HeartbeatRPCServices.heartbeatsEnabled = true;
204
205    try {
206      testCallable.call();
207    } catch (Exception e) {
208      fail("Heartbeat messages are enabled, exceptions should NOT be thrown. Exception trace:"
209          + ExceptionUtils.getStackTrace(e));
210    }
211
212    HeartbeatRPCServices.heartbeatsEnabled = false;
213    try {
214      testCallable.call();
215    } catch (Exception e) {
216      return;
217    } finally {
218      HeartbeatRPCServices.heartbeatsEnabled = true;
219    }
220    fail("Heartbeats messages are disabled, an exception should be thrown. If an exception "
221        + " is not thrown, the test case is not testing the importance of heartbeat messages");
222  }
223
224  /**
225   * Test the case that the time limit for the scan is reached after each full row of cells is
226   * fetched.
227   */
228  @Test
229  public void testHeartbeatBetweenRows() throws Exception {
230    testImportanceOfHeartbeats(new Callable<Void>() {
231
232      @Override
233      public Void call() throws Exception {
234        // Configure the scan so that it can read the entire table in a single RPC. We want to test
235        // the case where a scan stops on the server side due to a time limit
236        Scan scan = new Scan();
237        scan.setMaxResultSize(Long.MAX_VALUE);
238        scan.setCaching(Integer.MAX_VALUE);
239
240        testEquivalenceOfScanWithHeartbeats(scan, DEFAULT_ROW_SLEEP_TIME, -1, false);
241        return null;
242      }
243    });
244  }
245
246  /**
247   * Test the case that the time limit for scans is reached in between column families
248   */
249  @Test
250  public void testHeartbeatBetweenColumnFamilies() throws Exception {
251    testImportanceOfHeartbeats(new Callable<Void>() {
252      @Override
253      public Void call() throws Exception {
254        // Configure the scan so that it can read the entire table in a single RPC. We want to test
255        // the case where a scan stops on the server side due to a time limit
256        Scan baseScan = new Scan();
257        baseScan.setMaxResultSize(Long.MAX_VALUE);
258        baseScan.setCaching(Integer.MAX_VALUE);
259
260        // Copy the scan before each test. When a scan object is used by a scanner, some of its
261        // fields may be changed such as start row
262        Scan scanCopy = new Scan(baseScan);
263        testEquivalenceOfScanWithHeartbeats(scanCopy, -1, DEFAULT_CF_SLEEP_TIME, false);
264        scanCopy = new Scan(baseScan);
265        testEquivalenceOfScanWithHeartbeats(scanCopy, -1, DEFAULT_CF_SLEEP_TIME, true);
266        return null;
267      }
268    });
269  }
270
271  public static class SparseCellFilter extends FilterBase {
272
273    @Override
274    public ReturnCode filterCell(final Cell v) throws IOException {
275      try {
276        Thread.sleep(CLIENT_TIMEOUT / 2 + 100);
277      } catch (InterruptedException e) {
278        Thread.currentThread().interrupt();
279      }
280      return Bytes.equals(CellUtil.cloneRow(v), ROWS[NUM_ROWS - 1]) ? ReturnCode.INCLUDE
281          : ReturnCode.SKIP;
282    }
283
284    public static Filter parseFrom(final byte[] pbBytes) {
285      return new SparseCellFilter();
286    }
287  }
288
289  public static class SparseRowFilter extends FilterBase {
290
291    @Override
292    public boolean filterRowKey(Cell cell) throws IOException {
293      try {
294        Thread.sleep(CLIENT_TIMEOUT / 2 - 100);
295      } catch (InterruptedException e) {
296        Thread.currentThread().interrupt();
297      }
298      return !Bytes.equals(CellUtil.cloneRow(cell), ROWS[NUM_ROWS - 1]);
299    }
300
301    public static Filter parseFrom(final byte[] pbBytes) {
302      return new SparseRowFilter();
303    }
304  }
305
306  /**
307   * Test the case that there is a filter which filters most of cells
308   */
309  @Test
310  public void testHeartbeatWithSparseCellFilter() throws Exception {
311    testImportanceOfHeartbeats(new Callable<Void>() {
312      @Override
313      public Void call() throws Exception {
314        Scan scan = new Scan();
315        scan.setMaxResultSize(Long.MAX_VALUE);
316        scan.setCaching(Integer.MAX_VALUE);
317        scan.setFilter(new SparseCellFilter());
318        try (ScanPerNextResultScanner scanner =
319          new ScanPerNextResultScanner(CONN.getTable(TABLE_NAME), scan)) {
320          int num = 0;
321          while (scanner.next() != null) {
322            num++;
323          }
324          assertEquals(1, num);
325        }
326
327        scan = new Scan();
328        scan.setMaxResultSize(Long.MAX_VALUE);
329        scan.setCaching(Integer.MAX_VALUE);
330        scan.setFilter(new SparseCellFilter());
331        scan.setAllowPartialResults(true);
332        try (ScanPerNextResultScanner scanner =
333          new ScanPerNextResultScanner(CONN.getTable(TABLE_NAME), scan)) {
334          int num = 0;
335          while (scanner.next() != null) {
336            num++;
337          }
338          assertEquals(NUM_FAMILIES * NUM_QUALIFIERS, num);
339        }
340
341        return null;
342      }
343    });
344  }
345
346  /**
347   * Test the case that there is a filter which filters most of rows
348   */
349  @Test
350  public void testHeartbeatWithSparseRowFilter() throws Exception {
351    testImportanceOfHeartbeats(new Callable<Void>() {
352      @Override
353      public Void call() throws Exception {
354        Scan scan = new Scan();
355        scan.setMaxResultSize(Long.MAX_VALUE);
356        scan.setCaching(Integer.MAX_VALUE);
357        scan.setFilter(new SparseRowFilter());
358        try (ScanPerNextResultScanner scanner =
359          new ScanPerNextResultScanner(CONN.getTable(TABLE_NAME), scan)) {
360          int num = 0;
361          while (scanner.next() != null) {
362            num++;
363          }
364          assertEquals(1, num);
365        }
366
367        return null;
368      }
369    });
370  }
371
372  /**
373   * Test the equivalence of a scan versus the same scan executed when heartbeat messages are
374   * necessary
375   * @param scan The scan configuration being tested
376   * @param rowSleepTime The time to sleep between fetches of row cells
377   * @param cfSleepTime The time to sleep between fetches of column family cells
378   * @param sleepBeforeCf set to true when column family sleeps should occur before the cells for
379   *          that column family are fetched
380   */
381  private void testEquivalenceOfScanWithHeartbeats(final Scan scan, int rowSleepTime,
382      int cfSleepTime, boolean sleepBeforeCf) throws Exception {
383    disableSleeping();
384    AsyncTable<AdvancedScanResultConsumer> table = CONN.getTable(TABLE_NAME);
385    final ResultScanner scanner = new ScanPerNextResultScanner(table, scan);
386    final ResultScanner scannerWithHeartbeats = new ScanPerNextResultScanner(table, scan);
387
388    Result r1 = null;
389    Result r2 = null;
390
391    while ((r1 = scanner.next()) != null) {
392      // Enforce the specified sleep conditions during calls to the heartbeat scanner
393      configureSleepTime(rowSleepTime, cfSleepTime, sleepBeforeCf);
394      r2 = scannerWithHeartbeats.next();
395      disableSleeping();
396
397      assertTrue(r2 != null);
398      try {
399        Result.compareResults(r1, r2);
400      } catch (Exception e) {
401        fail(e.getMessage());
402      }
403    }
404
405    assertTrue(scannerWithHeartbeats.next() == null);
406    scanner.close();
407    scannerWithHeartbeats.close();
408  }
409
410  /**
411   * Helper method for setting the time to sleep between rows and column families. If a sleep time
412   * is negative then that sleep will be disabled
413   */
414  private static void configureSleepTime(int rowSleepTime, int cfSleepTime, boolean sleepBeforeCf) {
415    HeartbeatHRegion.sleepBetweenRows = rowSleepTime > 0;
416    HeartbeatHRegion.rowSleepTime = rowSleepTime;
417
418    HeartbeatHRegion.sleepBetweenColumnFamilies = cfSleepTime > 0;
419    HeartbeatHRegion.columnFamilySleepTime = cfSleepTime;
420    HeartbeatHRegion.sleepBeforeColumnFamily = sleepBeforeCf;
421  }
422
423  /**
424   * Disable the sleeping mechanism server side.
425   */
426  private static void disableSleeping() {
427    HeartbeatHRegion.sleepBetweenRows = false;
428    HeartbeatHRegion.sleepBetweenColumnFamilies = false;
429  }
430
431  /**
432   * Custom HRegionServer instance that instantiates {@link HeartbeatRPCServices} in place of
433   * {@link RSRpcServices} to allow us to toggle support for heartbeat messages
434   */
435  private static class HeartbeatHRegionServer extends HRegionServer {
436    public HeartbeatHRegionServer(Configuration conf) throws IOException, InterruptedException {
437      super(conf);
438    }
439
440    @Override
441    protected RSRpcServices createRpcServices() throws IOException {
442      return new HeartbeatRPCServices(this);
443    }
444  }
445
446  /**
447   * Custom RSRpcServices instance that allows heartbeat support to be toggled
448   */
449  private static class HeartbeatRPCServices extends RSRpcServices {
450    private static volatile boolean heartbeatsEnabled = true;
451
452    public HeartbeatRPCServices(HRegionServer rs) throws IOException {
453      super(rs);
454    }
455
456    @Override
457    public ScanResponse scan(RpcController controller, ScanRequest request)
458        throws ServiceException {
459      ScanRequest.Builder builder = ScanRequest.newBuilder(request);
460      builder.setClientHandlesHeartbeats(heartbeatsEnabled);
461      return super.scan(controller, builder.build());
462    }
463  }
464
465  /**
466   * Custom HRegion class that instantiates {@link RegionScanner}s with configurable sleep times
467   * between fetches of row Results and/or column family cells. Useful for emulating an instance
468   * where the server is taking a long time to process a client's scan request
469   */
470  private static class HeartbeatHRegion extends HRegion {
471    // Row sleeps occur AFTER each row worth of cells is retrieved.
472    private static volatile int rowSleepTime = DEFAULT_ROW_SLEEP_TIME;
473    private static volatile boolean sleepBetweenRows = false;
474
475    // The sleep for column families can be initiated before or after we fetch the cells for the
476    // column family. If the sleep occurs BEFORE then the time limits will be reached inside
477    // StoreScanner while we are fetching individual cells. If the sleep occurs AFTER then the time
478    // limit will be reached inside RegionScanner after all the cells for a column family have been
479    // retrieved.
480    private static volatile boolean sleepBeforeColumnFamily = false;
481    private static volatile int columnFamilySleepTime = DEFAULT_CF_SLEEP_TIME;
482    private static volatile boolean sleepBetweenColumnFamilies = false;
483
484    public HeartbeatHRegion(Path tableDir, WAL wal, FileSystem fs, Configuration confParam,
485        RegionInfo regionInfo, TableDescriptor htd, RegionServerServices rsServices) {
486      super(tableDir, wal, fs, confParam, regionInfo, htd, rsServices);
487    }
488
489    public HeartbeatHRegion(HRegionFileSystem fs, WAL wal, Configuration confParam,
490        TableDescriptor htd, RegionServerServices rsServices) {
491      super(fs, wal, confParam, htd, rsServices);
492    }
493
494    private static void columnFamilySleep() {
495      if (sleepBetweenColumnFamilies) {
496        Threads.sleepWithoutInterrupt(columnFamilySleepTime);
497      }
498    }
499
500    private static void rowSleep() {
501      if (sleepBetweenRows) {
502        Threads.sleepWithoutInterrupt(rowSleepTime);
503      }
504    }
505
506    // Instantiate the custom heartbeat region scanners
507    @Override
508    protected RegionScannerImpl instantiateRegionScanner(Scan scan,
509        List<KeyValueScanner> additionalScanners, long nonceGroup, long nonce) throws IOException {
510      if (scan.isReversed()) {
511        if (scan.getFilter() != null) {
512          scan.getFilter().setReversed(true);
513        }
514        return new HeartbeatReversedRegionScanner(scan, additionalScanners, this);
515      }
516      return new HeartbeatRegionScanner(scan, additionalScanners, this);
517    }
518  }
519
520  /**
521   * Custom ReversedRegionScanner that can be configured to sleep between retrievals of row Results
522   * and/or column family cells
523   */
524  private static class HeartbeatReversedRegionScanner extends ReversedRegionScannerImpl {
525    HeartbeatReversedRegionScanner(Scan scan, List<KeyValueScanner> additionalScanners,
526        HRegion region) throws IOException {
527      super(scan, additionalScanners, region, HConstants.NO_NONCE, HConstants.NO_NONCE);
528    }
529
530    @Override
531    public boolean nextRaw(List<Cell> outResults, ScannerContext context) throws IOException {
532      boolean moreRows = super.nextRaw(outResults, context);
533      HeartbeatHRegion.rowSleep();
534      return moreRows;
535    }
536
537    @Override
538    protected void initializeKVHeap(List<KeyValueScanner> scanners,
539        List<KeyValueScanner> joinedScanners, HRegion region) throws IOException {
540      this.storeHeap =
541          new HeartbeatReversedKVHeap(scanners, (CellComparatorImpl) region.getCellComparator());
542      if (!joinedScanners.isEmpty()) {
543        this.joinedHeap = new HeartbeatReversedKVHeap(joinedScanners,
544            (CellComparatorImpl) region.getCellComparator());
545      }
546    }
547  }
548
549  /**
550   * Custom RegionScanner that can be configured to sleep between retrievals of row Results and/or
551   * column family cells
552   */
553  private static class HeartbeatRegionScanner extends RegionScannerImpl {
554    HeartbeatRegionScanner(Scan scan, List<KeyValueScanner> additionalScanners, HRegion region)
555        throws IOException {
556      super(scan, additionalScanners, region, HConstants.NO_NONCE, HConstants.NO_NONCE);
557    }
558
559    @Override
560    public boolean nextRaw(List<Cell> outResults, ScannerContext context) throws IOException {
561      boolean moreRows = super.nextRaw(outResults, context);
562      HeartbeatHRegion.rowSleep();
563      return moreRows;
564    }
565
566    @Override
567    protected void initializeKVHeap(List<KeyValueScanner> scanners,
568        List<KeyValueScanner> joinedScanners, HRegion region) throws IOException {
569      this.storeHeap =
570          new HeartbeatKVHeap(scanners, region.getCellComparator());
571      if (!joinedScanners.isEmpty()) {
572        this.joinedHeap =
573            new HeartbeatKVHeap(joinedScanners, region.getCellComparator());
574      }
575    }
576  }
577
578  /**
579   * Custom KV Heap that can be configured to sleep/wait in between retrievals of column family
580   * cells. Useful for testing
581   */
582  private static final class HeartbeatKVHeap extends KeyValueHeap {
583    public HeartbeatKVHeap(List<? extends KeyValueScanner> scanners, CellComparator comparator)
584        throws IOException {
585      super(scanners, comparator);
586    }
587
588    HeartbeatKVHeap(List<? extends KeyValueScanner> scanners, KVScannerComparator comparator)
589        throws IOException {
590      super(scanners, comparator);
591    }
592
593    @Override
594    public boolean next(List<Cell> result, ScannerContext context) throws IOException {
595      if (HeartbeatHRegion.sleepBeforeColumnFamily) HeartbeatHRegion.columnFamilySleep();
596      boolean moreRows = super.next(result, context);
597      if (!HeartbeatHRegion.sleepBeforeColumnFamily) HeartbeatHRegion.columnFamilySleep();
598      return moreRows;
599    }
600  }
601
602  /**
603   * Custom reversed KV Heap that can be configured to sleep in between retrievals of column family
604   * cells.
605   */
606  private static final class HeartbeatReversedKVHeap extends ReversedKeyValueHeap {
607    public HeartbeatReversedKVHeap(List<? extends KeyValueScanner> scanners,
608        CellComparatorImpl comparator) throws IOException {
609      super(scanners, comparator);
610    }
611
612    @Override
613    public boolean next(List<Cell> result, ScannerContext context) throws IOException {
614      if (HeartbeatHRegion.sleepBeforeColumnFamily) HeartbeatHRegion.columnFamilySleep();
615      boolean moreRows = super.next(result, context);
616      if (!HeartbeatHRegion.sleepBeforeColumnFamily) HeartbeatHRegion.columnFamilySleep();
617      return moreRows;
618    }
619  }
620}