001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022import static org.junit.Assert.fail;
023
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.List;
027import java.util.concurrent.Callable;
028import org.apache.commons.lang3.exception.ExceptionUtils;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.FileSystem;
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.hbase.Cell;
033import org.apache.hadoop.hbase.CellComparator;
034import org.apache.hadoop.hbase.CellComparatorImpl;
035import org.apache.hadoop.hbase.CellUtil;
036import org.apache.hadoop.hbase.HBaseClassTestRule;
037import org.apache.hadoop.hbase.HBaseTestingUtil;
038import org.apache.hadoop.hbase.HConstants;
039import org.apache.hadoop.hbase.HTestConst;
040import org.apache.hadoop.hbase.KeyValue;
041import org.apache.hadoop.hbase.TableName;
042import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer;
043import org.apache.hadoop.hbase.client.AsyncConnection;
044import org.apache.hadoop.hbase.client.AsyncTable;
045import org.apache.hadoop.hbase.client.ConnectionFactory;
046import org.apache.hadoop.hbase.client.Put;
047import org.apache.hadoop.hbase.client.RegionInfo;
048import org.apache.hadoop.hbase.client.Result;
049import org.apache.hadoop.hbase.client.ResultScanner;
050import org.apache.hadoop.hbase.client.Scan;
051import org.apache.hadoop.hbase.client.ScanPerNextResultScanner;
052import org.apache.hadoop.hbase.client.Table;
053import org.apache.hadoop.hbase.client.TableDescriptor;
054import org.apache.hadoop.hbase.filter.Filter;
055import org.apache.hadoop.hbase.filter.FilterBase;
056import org.apache.hadoop.hbase.testclassification.LargeTests;
057import org.apache.hadoop.hbase.util.Bytes;
058import org.apache.hadoop.hbase.util.Threads;
059import org.apache.hadoop.hbase.wal.WAL;
060import org.junit.After;
061import org.junit.AfterClass;
062import org.junit.Before;
063import org.junit.BeforeClass;
064import org.junit.ClassRule;
065import org.junit.Test;
066import org.junit.experimental.categories.Category;
067
068import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
069import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
070import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
071
072import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
073import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
074
075/**
076 * Here we test to make sure that scans return the expected Results when the server is sending the
077 * Client heartbeat messages. Heartbeat messages are essentially keep-alive messages (they prevent
078 * the scanner on the client side from timing out). A heartbeat message is sent from the server to
079 * the client when the server has exceeded the time limit during the processing of the scan. When
080 * the time limit is reached, the server will return to the Client whatever Results it has
081 * accumulated (potentially empty).
082 */
083@Category(LargeTests.class)
084public class TestScannerHeartbeatMessages {
085
086  @ClassRule
087  public static final HBaseClassTestRule CLASS_RULE =
088    HBaseClassTestRule.forClass(TestScannerHeartbeatMessages.class);
089
090  private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
091
092  private static AsyncConnection CONN;
093
094  /**
095   * Table configuration
096   */
097  private static TableName TABLE_NAME = TableName.valueOf("testScannerHeartbeatMessagesTable");
098
099  private static int NUM_ROWS = 5;
100  private static byte[] ROW = Bytes.toBytes("testRow");
101  private static byte[][] ROWS = HTestConst.makeNAscii(ROW, NUM_ROWS);
102
103  private static int NUM_FAMILIES = 4;
104  private static byte[] FAMILY = Bytes.toBytes("testFamily");
105  private static byte[][] FAMILIES = HTestConst.makeNAscii(FAMILY, NUM_FAMILIES);
106
107  private static int NUM_QUALIFIERS = 3;
108  private static byte[] QUALIFIER = Bytes.toBytes("testQualifier");
109  private static byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, NUM_QUALIFIERS);
110
111  private static int VALUE_SIZE = 128;
112  private static byte[] VALUE = Bytes.createMaxByteArray(VALUE_SIZE);
113
114  // The time limit should be based on the rpc timeout at client, or the client will regards
115  // the request as timeout before server return a heartbeat.
116  private static int SERVER_TIMEOUT = 60000;
117
118  // Time, in milliseconds, that the client will wait for a response from the server before timing
119  // out. This value is used server side to determine when it is necessary to send a heartbeat
120  // message to the client. Time limit will be 500 ms.
121  private static int CLIENT_TIMEOUT = 1000;
122
123  // In this test, we sleep after reading each row. So we should make sure after we get some number
124  // of rows and sleep same times we must reach time limit, and do not timeout after next sleeping.
125  private static int DEFAULT_ROW_SLEEP_TIME = 300;
126
127  // Similar with row sleep time.
128  private static int DEFAULT_CF_SLEEP_TIME = 300;
129
130  @BeforeClass
131  public static void setUpBeforeClass() throws Exception {
132    Configuration conf = TEST_UTIL.getConfiguration();
133
134    conf.setStrings(HConstants.REGION_IMPL, HeartbeatHRegion.class.getName());
135    conf.setStrings(HConstants.REGION_SERVER_IMPL, HeartbeatHRegionServer.class.getName());
136    conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, SERVER_TIMEOUT);
137    conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, SERVER_TIMEOUT);
138    conf.setInt(HConstants.HBASE_CLIENT_PAUSE, 1);
139
140    // Check the timeout condition after every cell
141    conf.setLong(StoreScanner.HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK, 1);
142    TEST_UTIL.startMiniCluster(1);
143
144    createTestTable(TABLE_NAME, ROWS, FAMILIES, QUALIFIERS, VALUE);
145
146    Configuration newConf = new Configuration(conf);
147    newConf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, CLIENT_TIMEOUT);
148    newConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, CLIENT_TIMEOUT);
149    CONN = ConnectionFactory.createAsyncConnection(newConf).get();
150  }
151
152  static void createTestTable(TableName name, byte[][] rows, byte[][] families, byte[][] qualifiers,
153    byte[] cellValue) throws IOException {
154    Table ht = TEST_UTIL.createTable(name, families);
155    List<Put> puts = createPuts(rows, families, qualifiers, cellValue);
156    ht.put(puts);
157  }
158
159  /**
160   * Make puts to put the input value into each combination of row, family, and qualifier
161   */
162  static ArrayList<Put> createPuts(byte[][] rows, byte[][] families, byte[][] qualifiers,
163    byte[] value) throws IOException {
164    Put put;
165    ArrayList<Put> puts = new ArrayList<>();
166
167    for (int row = 0; row < rows.length; row++) {
168      put = new Put(rows[row]);
169      for (int fam = 0; fam < families.length; fam++) {
170        for (int qual = 0; qual < qualifiers.length; qual++) {
171          KeyValue kv = new KeyValue(rows[row], families[fam], qualifiers[qual], qual, value);
172          put.add(kv);
173        }
174      }
175      puts.add(put);
176    }
177
178    return puts;
179  }
180
181  @AfterClass
182  public static void tearDownAfterClass() throws Exception {
183    Closeables.close(CONN, true);
184    TEST_UTIL.shutdownMiniCluster();
185  }
186
187  @Before
188  public void setupBeforeTest() throws Exception {
189    disableSleeping();
190  }
191
192  @After
193  public void teardownAfterTest() throws Exception {
194    disableSleeping();
195  }
196
197  /**
198   * Run the test callable when heartbeats are enabled/disabled. We expect all tests to only pass
199   * when heartbeat messages are enabled (otherwise the test is pointless). When heartbeats are
200   * disabled, the test should throw an exception.
201   */
202  private void testImportanceOfHeartbeats(Callable<Void> testCallable) throws InterruptedException {
203    HeartbeatRPCServices.heartbeatsEnabled = true;
204
205    try {
206      testCallable.call();
207    } catch (Exception e) {
208      fail("Heartbeat messages are enabled, exceptions should NOT be thrown. Exception trace:"
209        + ExceptionUtils.getStackTrace(e));
210    }
211
212    HeartbeatRPCServices.heartbeatsEnabled = false;
213    try {
214      testCallable.call();
215    } catch (Exception e) {
216      return;
217    } finally {
218      HeartbeatRPCServices.heartbeatsEnabled = true;
219    }
220    fail("Heartbeats messages are disabled, an exception should be thrown. If an exception "
221      + " is not thrown, the test case is not testing the importance of heartbeat messages");
222  }
223
224  /**
225   * Test the case that the time limit for the scan is reached after each full row of cells is
226   * fetched.
227   */
228  @Test
229  public void testHeartbeatBetweenRows() throws Exception {
230    testImportanceOfHeartbeats(new Callable<Void>() {
231
232      @Override
233      public Void call() throws Exception {
234        // Configure the scan so that it can read the entire table in a single RPC. We want to test
235        // the case where a scan stops on the server side due to a time limit
236        Scan scan = new Scan();
237        scan.setMaxResultSize(Long.MAX_VALUE);
238        scan.setCaching(Integer.MAX_VALUE);
239
240        testEquivalenceOfScanWithHeartbeats(scan, DEFAULT_ROW_SLEEP_TIME, -1, false);
241        return null;
242      }
243    });
244  }
245
246  /**
247   * Test the case that the time limit for scans is reached in between column families
248   */
249  @Test
250  public void testHeartbeatBetweenColumnFamilies() throws Exception {
251    testImportanceOfHeartbeats(new Callable<Void>() {
252      @Override
253      public Void call() throws Exception {
254        // Configure the scan so that it can read the entire table in a single RPC. We want to test
255        // the case where a scan stops on the server side due to a time limit
256        Scan baseScan = new Scan();
257        baseScan.setMaxResultSize(Long.MAX_VALUE);
258        baseScan.setCaching(Integer.MAX_VALUE);
259
260        // Copy the scan before each test. When a scan object is used by a scanner, some of its
261        // fields may be changed such as start row
262        Scan scanCopy = new Scan(baseScan);
263        testEquivalenceOfScanWithHeartbeats(scanCopy, -1, DEFAULT_CF_SLEEP_TIME, false);
264        scanCopy = new Scan(baseScan);
265        testEquivalenceOfScanWithHeartbeats(scanCopy, -1, DEFAULT_CF_SLEEP_TIME, true);
266        return null;
267      }
268    });
269  }
270
271  public static class SparseCellFilter extends FilterBase {
272
273    @Override
274    public ReturnCode filterCell(final Cell v) throws IOException {
275      try {
276        Thread.sleep(CLIENT_TIMEOUT / 2 + 100);
277      } catch (InterruptedException e) {
278        Thread.currentThread().interrupt();
279      }
280      return Bytes.equals(CellUtil.cloneRow(v), ROWS[NUM_ROWS - 1])
281        ? ReturnCode.INCLUDE
282        : ReturnCode.SKIP;
283    }
284
285    public static Filter parseFrom(final byte[] pbBytes) {
286      return new SparseCellFilter();
287    }
288  }
289
290  public static class SparseRowFilter extends FilterBase {
291
292    @Override
293    public boolean filterRowKey(Cell cell) throws IOException {
294      try {
295        Thread.sleep(CLIENT_TIMEOUT / 2 - 100);
296      } catch (InterruptedException e) {
297        Thread.currentThread().interrupt();
298      }
299      return !Bytes.equals(CellUtil.cloneRow(cell), ROWS[NUM_ROWS - 1]);
300    }
301
302    public static Filter parseFrom(final byte[] pbBytes) {
303      return new SparseRowFilter();
304    }
305  }
306
307  /**
308   * Test the case that there is a filter which filters most of cells
309   */
310  @Test
311  public void testHeartbeatWithSparseCellFilter() throws Exception {
312    testImportanceOfHeartbeats(new Callable<Void>() {
313      @Override
314      public Void call() throws Exception {
315        Scan scan = new Scan();
316        scan.setMaxResultSize(Long.MAX_VALUE);
317        scan.setCaching(Integer.MAX_VALUE);
318        scan.setFilter(new SparseCellFilter());
319        try (ScanPerNextResultScanner scanner =
320          new ScanPerNextResultScanner(CONN.getTable(TABLE_NAME), scan)) {
321          int num = 0;
322          while (scanner.next() != null) {
323            num++;
324          }
325          assertEquals(1, num);
326        }
327
328        scan = new Scan();
329        scan.setMaxResultSize(Long.MAX_VALUE);
330        scan.setCaching(Integer.MAX_VALUE);
331        scan.setFilter(new SparseCellFilter());
332        scan.setAllowPartialResults(true);
333        try (ScanPerNextResultScanner scanner =
334          new ScanPerNextResultScanner(CONN.getTable(TABLE_NAME), scan)) {
335          int num = 0;
336          while (scanner.next() != null) {
337            num++;
338          }
339          assertEquals(NUM_FAMILIES * NUM_QUALIFIERS, num);
340        }
341
342        return null;
343      }
344    });
345  }
346
347  /**
348   * Test the case that there is a filter which filters most of rows
349   */
350  @Test
351  public void testHeartbeatWithSparseRowFilter() throws Exception {
352    testImportanceOfHeartbeats(new Callable<Void>() {
353      @Override
354      public Void call() throws Exception {
355        Scan scan = new Scan();
356        scan.setMaxResultSize(Long.MAX_VALUE);
357        scan.setCaching(Integer.MAX_VALUE);
358        scan.setFilter(new SparseRowFilter());
359        try (ScanPerNextResultScanner scanner =
360          new ScanPerNextResultScanner(CONN.getTable(TABLE_NAME), scan)) {
361          int num = 0;
362          while (scanner.next() != null) {
363            num++;
364          }
365          assertEquals(1, num);
366        }
367
368        return null;
369      }
370    });
371  }
372
373  /**
374   * Test the equivalence of a scan versus the same scan executed when heartbeat messages are
375   * necessary
376   * @param scan          The scan configuration being tested
377   * @param rowSleepTime  The time to sleep between fetches of row cells
378   * @param cfSleepTime   The time to sleep between fetches of column family cells
379   * @param sleepBeforeCf set to true when column family sleeps should occur before the cells for
380   *                      that column family are fetched
381   */
382  private void testEquivalenceOfScanWithHeartbeats(final Scan scan, int rowSleepTime,
383    int cfSleepTime, boolean sleepBeforeCf) throws Exception {
384    disableSleeping();
385    AsyncTable<AdvancedScanResultConsumer> table = CONN.getTable(TABLE_NAME);
386    final ResultScanner scanner = new ScanPerNextResultScanner(table, scan);
387    final ResultScanner scannerWithHeartbeats = new ScanPerNextResultScanner(table, scan);
388
389    Result r1 = null;
390    Result r2 = null;
391
392    while ((r1 = scanner.next()) != null) {
393      // Enforce the specified sleep conditions during calls to the heartbeat scanner
394      configureSleepTime(rowSleepTime, cfSleepTime, sleepBeforeCf);
395      r2 = scannerWithHeartbeats.next();
396      disableSleeping();
397
398      assertTrue(r2 != null);
399      try {
400        Result.compareResults(r1, r2);
401      } catch (Exception e) {
402        fail(e.getMessage());
403      }
404    }
405
406    assertTrue(scannerWithHeartbeats.next() == null);
407    scanner.close();
408    scannerWithHeartbeats.close();
409  }
410
411  /**
412   * Helper method for setting the time to sleep between rows and column families. If a sleep time
413   * is negative then that sleep will be disabled
414   */
415  private static void configureSleepTime(int rowSleepTime, int cfSleepTime, boolean sleepBeforeCf) {
416    HeartbeatHRegion.sleepBetweenRows = rowSleepTime > 0;
417    HeartbeatHRegion.rowSleepTime = rowSleepTime;
418
419    HeartbeatHRegion.sleepBetweenColumnFamilies = cfSleepTime > 0;
420    HeartbeatHRegion.columnFamilySleepTime = cfSleepTime;
421    HeartbeatHRegion.sleepBeforeColumnFamily = sleepBeforeCf;
422  }
423
424  /**
425   * Disable the sleeping mechanism server side.
426   */
427  private static void disableSleeping() {
428    HeartbeatHRegion.sleepBetweenRows = false;
429    HeartbeatHRegion.sleepBetweenColumnFamilies = false;
430  }
431
432  /**
433   * Custom HRegionServer instance that instantiates {@link HeartbeatRPCServices} in place of
434   * {@link RSRpcServices} to allow us to toggle support for heartbeat messages
435   */
436  private static class HeartbeatHRegionServer extends HRegionServer {
437    public HeartbeatHRegionServer(Configuration conf) throws IOException, InterruptedException {
438      super(conf);
439    }
440
441    @Override
442    protected RSRpcServices createRpcServices() throws IOException {
443      return new HeartbeatRPCServices(this);
444    }
445  }
446
447  /**
448   * Custom RSRpcServices instance that allows heartbeat support to be toggled
449   */
450  private static class HeartbeatRPCServices extends RSRpcServices {
451    private static volatile boolean heartbeatsEnabled = true;
452
453    public HeartbeatRPCServices(HRegionServer rs) throws IOException {
454      super(rs);
455    }
456
457    @Override
458    public ScanResponse scan(RpcController controller, ScanRequest request)
459      throws ServiceException {
460      ScanRequest.Builder builder = ScanRequest.newBuilder(request);
461      builder.setClientHandlesHeartbeats(heartbeatsEnabled);
462      return super.scan(controller, builder.build());
463    }
464  }
465
466  /**
467   * Custom HRegion class that instantiates {@link RegionScanner}s with configurable sleep times
468   * between fetches of row Results and/or column family cells. Useful for emulating an instance
469   * where the server is taking a long time to process a client's scan request
470   */
471  private static class HeartbeatHRegion extends HRegion {
472    // Row sleeps occur AFTER each row worth of cells is retrieved.
473    private static volatile int rowSleepTime = DEFAULT_ROW_SLEEP_TIME;
474    private static volatile boolean sleepBetweenRows = false;
475
476    // The sleep for column families can be initiated before or after we fetch the cells for the
477    // column family. If the sleep occurs BEFORE then the time limits will be reached inside
478    // StoreScanner while we are fetching individual cells. If the sleep occurs AFTER then the time
479    // limit will be reached inside RegionScanner after all the cells for a column family have been
480    // retrieved.
481    private static volatile boolean sleepBeforeColumnFamily = false;
482    private static volatile int columnFamilySleepTime = DEFAULT_CF_SLEEP_TIME;
483    private static volatile boolean sleepBetweenColumnFamilies = false;
484
485    public HeartbeatHRegion(Path tableDir, WAL wal, FileSystem fs, Configuration confParam,
486      RegionInfo regionInfo, TableDescriptor htd, RegionServerServices rsServices) {
487      super(tableDir, wal, fs, confParam, regionInfo, htd, rsServices);
488    }
489
490    public HeartbeatHRegion(HRegionFileSystem fs, WAL wal, Configuration confParam,
491      TableDescriptor htd, RegionServerServices rsServices) {
492      super(fs, wal, confParam, htd, rsServices);
493    }
494
495    private static void columnFamilySleep() {
496      if (sleepBetweenColumnFamilies) {
497        Threads.sleepWithoutInterrupt(columnFamilySleepTime);
498      }
499    }
500
501    private static void rowSleep() {
502      if (sleepBetweenRows) {
503        Threads.sleepWithoutInterrupt(rowSleepTime);
504      }
505    }
506
507    // Instantiate the custom heartbeat region scanners
508    @Override
509    protected RegionScannerImpl instantiateRegionScanner(Scan scan,
510      List<KeyValueScanner> additionalScanners, long nonceGroup, long nonce) throws IOException {
511      if (scan.isReversed()) {
512        if (scan.getFilter() != null) {
513          scan.getFilter().setReversed(true);
514        }
515        return new HeartbeatReversedRegionScanner(scan, additionalScanners, this);
516      }
517      return new HeartbeatRegionScanner(scan, additionalScanners, this);
518    }
519  }
520
521  /**
522   * Custom ReversedRegionScanner that can be configured to sleep between retrievals of row Results
523   * and/or column family cells
524   */
525  private static class HeartbeatReversedRegionScanner extends ReversedRegionScannerImpl {
526    HeartbeatReversedRegionScanner(Scan scan, List<KeyValueScanner> additionalScanners,
527      HRegion region) throws IOException {
528      super(scan, additionalScanners, region, HConstants.NO_NONCE, HConstants.NO_NONCE);
529    }
530
531    @Override
532    public boolean nextRaw(List<Cell> outResults, ScannerContext context) throws IOException {
533      boolean moreRows = super.nextRaw(outResults, context);
534      HeartbeatHRegion.rowSleep();
535      return moreRows;
536    }
537
538    @Override
539    protected void initializeKVHeap(List<KeyValueScanner> scanners,
540      List<KeyValueScanner> joinedScanners, HRegion region) throws IOException {
541      this.storeHeap =
542        new HeartbeatReversedKVHeap(scanners, (CellComparatorImpl) region.getCellComparator());
543      if (!joinedScanners.isEmpty()) {
544        this.joinedHeap = new HeartbeatReversedKVHeap(joinedScanners,
545          (CellComparatorImpl) region.getCellComparator());
546      }
547    }
548  }
549
550  /**
551   * Custom RegionScanner that can be configured to sleep between retrievals of row Results and/or
552   * column family cells
553   */
554  private static class HeartbeatRegionScanner extends RegionScannerImpl {
555    HeartbeatRegionScanner(Scan scan, List<KeyValueScanner> additionalScanners, HRegion region)
556      throws IOException {
557      super(scan, additionalScanners, region, HConstants.NO_NONCE, HConstants.NO_NONCE);
558    }
559
560    @Override
561    public boolean nextRaw(List<Cell> outResults, ScannerContext context) throws IOException {
562      boolean moreRows = super.nextRaw(outResults, context);
563      HeartbeatHRegion.rowSleep();
564      return moreRows;
565    }
566
567    @Override
568    protected void initializeKVHeap(List<KeyValueScanner> scanners,
569      List<KeyValueScanner> joinedScanners, HRegion region) throws IOException {
570      this.storeHeap = new HeartbeatKVHeap(scanners, region.getCellComparator());
571      if (!joinedScanners.isEmpty()) {
572        this.joinedHeap = new HeartbeatKVHeap(joinedScanners, region.getCellComparator());
573      }
574    }
575  }
576
577  /**
578   * Custom KV Heap that can be configured to sleep/wait in between retrievals of column family
579   * cells. Useful for testing
580   */
581  private static final class HeartbeatKVHeap extends KeyValueHeap {
582    public HeartbeatKVHeap(List<? extends KeyValueScanner> scanners, CellComparator comparator)
583      throws IOException {
584      super(scanners, comparator);
585    }
586
587    HeartbeatKVHeap(List<? extends KeyValueScanner> scanners, KVScannerComparator comparator)
588      throws IOException {
589      super(scanners, comparator);
590    }
591
592    @Override
593    public boolean next(List<Cell> result, ScannerContext context) throws IOException {
594      if (HeartbeatHRegion.sleepBeforeColumnFamily) HeartbeatHRegion.columnFamilySleep();
595      boolean moreRows = super.next(result, context);
596      if (!HeartbeatHRegion.sleepBeforeColumnFamily) HeartbeatHRegion.columnFamilySleep();
597      return moreRows;
598    }
599  }
600
601  /**
602   * Custom reversed KV Heap that can be configured to sleep in between retrievals of column family
603   * cells.
604   */
605  private static final class HeartbeatReversedKVHeap extends ReversedKeyValueHeap {
606    public HeartbeatReversedKVHeap(List<? extends KeyValueScanner> scanners,
607      CellComparatorImpl comparator) throws IOException {
608      super(scanners, comparator);
609    }
610
611    @Override
612    public boolean next(List<Cell> result, ScannerContext context) throws IOException {
613      if (HeartbeatHRegion.sleepBeforeColumnFamily) HeartbeatHRegion.columnFamilySleep();
614      boolean moreRows = super.next(result, context);
615      if (!HeartbeatHRegion.sleepBeforeColumnFamily) HeartbeatHRegion.columnFamilySleep();
616      return moreRows;
617    }
618  }
619}