001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.regionserver.RSRpcServices.DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertTrue;
023import static org.junit.Assert.fail;
024import static org.mockito.Mockito.when;
025
026import java.io.IOException;
027import java.util.ArrayList;
028import java.util.List;
029import java.util.concurrent.Callable;
030import org.apache.commons.lang3.exception.ExceptionUtils;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.fs.FileSystem;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.hbase.Cell;
035import org.apache.hadoop.hbase.CellComparator;
036import org.apache.hadoop.hbase.CellComparatorImpl;
037import org.apache.hadoop.hbase.CellUtil;
038import org.apache.hadoop.hbase.HBaseClassTestRule;
039import org.apache.hadoop.hbase.HBaseTestingUtility;
040import org.apache.hadoop.hbase.HConstants;
041import org.apache.hadoop.hbase.HTestConst;
042import org.apache.hadoop.hbase.KeyValue;
043import org.apache.hadoop.hbase.TableName;
044import org.apache.hadoop.hbase.client.Connection;
045import org.apache.hadoop.hbase.client.ConnectionFactory;
046import org.apache.hadoop.hbase.client.Put;
047import org.apache.hadoop.hbase.client.RegionInfo;
048import org.apache.hadoop.hbase.client.Result;
049import org.apache.hadoop.hbase.client.ResultScanner;
050import org.apache.hadoop.hbase.client.Scan;
051import org.apache.hadoop.hbase.client.Table;
052import org.apache.hadoop.hbase.client.TableDescriptor;
053import org.apache.hadoop.hbase.filter.Filter;
054import org.apache.hadoop.hbase.filter.FilterBase;
055import org.apache.hadoop.hbase.ipc.HBaseRpcController;
056import org.apache.hadoop.hbase.ipc.RpcCall;
057import org.apache.hadoop.hbase.testclassification.LargeTests;
058import org.apache.hadoop.hbase.util.Bytes;
059import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
060import org.apache.hadoop.hbase.util.Threads;
061import org.apache.hadoop.hbase.wal.WAL;
062import org.junit.After;
063import org.junit.AfterClass;
064import org.junit.Before;
065import org.junit.BeforeClass;
066import org.junit.ClassRule;
067import org.junit.Test;
068import org.junit.experimental.categories.Category;
069import org.mockito.Mockito;
070
071import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
072import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
073
074import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
075import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
076
077/**
078 * Here we test to make sure that scans return the expected Results when the server is sending the
079 * Client heartbeat messages. Heartbeat messages are essentially keep-alive messages (they prevent
080 * the scanner on the client side from timing out). A heartbeat message is sent from the server to
081 * the client when the server has exceeded the time limit during the processing of the scan. When
082 * the time limit is reached, the server will return to the Client whatever Results it has
083 * accumulated (potentially empty).
084 */
085@Category(LargeTests.class)
086public class TestScannerHeartbeatMessages {
087
088  @ClassRule
089  public static final HBaseClassTestRule CLASS_RULE =
090    HBaseClassTestRule.forClass(TestScannerHeartbeatMessages.class);
091
092  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
093
094  private static Table TABLE = null;
095  private static Connection CONN = null;
096
097  /**
098   * Table configuration
099   */
100  private static TableName TABLE_NAME = TableName.valueOf("testScannerHeartbeatMessagesTable");
101
102  private static int NUM_ROWS = 5;
103  private static byte[] ROW = Bytes.toBytes("testRow");
104  private static byte[][] ROWS = HTestConst.makeNAscii(ROW, NUM_ROWS);
105
106  private static int NUM_FAMILIES = 4;
107  private static byte[] FAMILY = Bytes.toBytes("testFamily");
108  private static byte[][] FAMILIES = HTestConst.makeNAscii(FAMILY, NUM_FAMILIES);
109
110  private static int NUM_QUALIFIERS = 3;
111  private static byte[] QUALIFIER = Bytes.toBytes("testQualifier");
112  private static byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, NUM_QUALIFIERS);
113
114  private static int VALUE_SIZE = 128;
115  private static byte[] VALUE = Bytes.createMaxByteArray(VALUE_SIZE);
116
117  // The time limit should be based on the rpc timeout at client, or the client will regards
118  // the request as timeout before server return a heartbeat.
119  private static int SERVER_TIMEOUT = 60000;
120
121  // Time, in milliseconds, that the client will wait for a response from the server before timing
122  // out. This value is used server side to determine when it is necessary to send a heartbeat
123  // message to the client. Time limit will be 500 ms.
124  private static int CLIENT_TIMEOUT = 1000;
125
126  // In this test, we sleep after reading each row. So we should make sure after we get some number
127  // of rows and sleep same times we must reach time limit, and do not timeout after next sleeping.
128  private static int DEFAULT_ROW_SLEEP_TIME = 300;
129
130  // Similar with row sleep time.
131  private static int DEFAULT_CF_SLEEP_TIME = 300;
132
133  @BeforeClass
134  public static void setUpBeforeClass() throws Exception {
135    Configuration conf = TEST_UTIL.getConfiguration();
136
137    conf.setStrings(HConstants.REGION_IMPL, HeartbeatHRegion.class.getName());
138    conf.setStrings(HConstants.REGION_SERVER_IMPL, HeartbeatHRegionServer.class.getName());
139    // setting these here for usage on the server side. will override for client side below
140    conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, SERVER_TIMEOUT);
141    conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, SERVER_TIMEOUT);
142    conf.setInt(HConstants.HBASE_CLIENT_PAUSE, 1);
143
144    // Check the timeout condition after every cell
145    conf.setLong(StoreScanner.HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK, 1);
146    TEST_UTIL.startMiniCluster(1);
147
148    // set client timeout for client side, we want it to be less than server side.
149    Configuration clientConf = new Configuration(conf);
150    clientConf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, CLIENT_TIMEOUT);
151    CONN = ConnectionFactory.createConnection(clientConf);
152    TABLE = createTestTable(TABLE_NAME, ROWS, FAMILIES, QUALIFIERS, VALUE);
153  }
154
155  @Test
156  public void testTimeLimitAccountsForQueueTime() throws IOException, InterruptedException {
157    HRegionServer rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0);
158    RSRpcServices services = new RSRpcServices(rs);
159    RpcCall mockRpcCall = Mockito.mock(RpcCall.class);
160    // first return 180 (minimal queuing), then 120 (more queueing), then 101 (heavy queueing)
161    // finally, 25 is fatal levels of queueing -- exceeding timeout
162    when(mockRpcCall.getReceiveTime()).thenReturn(180L, 120L, 101L, 25L);
163
164    // assume timeout of 100ms
165    HBaseRpcController mockController = Mockito.mock(HBaseRpcController.class);
166    when(mockController.getCallTimeout()).thenReturn(100);
167
168    // current time is 100, which we'll subtract from 90 and 50 to generate some time deltas
169    EnvironmentEdgeManager.injectEdge(() -> 200L);
170
171    try {
172      // we queued for 20ms, leaving 80ms of timeout, which we divide by 2
173      assertEquals(200 + (100 - 20) / 2, services.getTimeLimit(mockRpcCall, mockController, true));
174      // we queued for 80ms, leaving 20ms of timeout, which we divide by 2
175      assertEquals(200 + (100 - 80) / 2, services.getTimeLimit(mockRpcCall, mockController, true));
176      // we queued for 99ms of 100ms timeout, leaving only 1ms. we fall back to default minimum
177      assertEquals(200 + DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA,
178        services.getTimeLimit(mockRpcCall, mockController, true));
179      // lastly, we queue for 175ms of 100ms timeout. this should be very rare since we drop
180      // timed out calls in the queue. in this case we still fallback on default minimum for now.
181      assertEquals(200 + DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA,
182        services.getTimeLimit(mockRpcCall, mockController, true));
183    } finally {
184      EnvironmentEdgeManager.reset();
185    }
186
187  }
188
189  static Table createTestTable(TableName name, byte[][] rows, byte[][] families,
190    byte[][] qualifiers, byte[] cellValue) throws IOException {
191    TEST_UTIL.createTable(name, families);
192    Table ht = CONN.getTable(name);
193    List<Put> puts = createPuts(rows, families, qualifiers, cellValue);
194    ht.put(puts);
195    return ht;
196  }
197
198  /**
199   * Make puts to put the input value into each combination of row, family, and qualifier
200   */
201  static ArrayList<Put> createPuts(byte[][] rows, byte[][] families, byte[][] qualifiers,
202    byte[] value) throws IOException {
203    Put put;
204    ArrayList<Put> puts = new ArrayList<>();
205
206    for (int row = 0; row < rows.length; row++) {
207      put = new Put(rows[row]);
208      for (int fam = 0; fam < families.length; fam++) {
209        for (int qual = 0; qual < qualifiers.length; qual++) {
210          KeyValue kv = new KeyValue(rows[row], families[fam], qualifiers[qual], qual, value);
211          put.add(kv);
212        }
213      }
214      puts.add(put);
215    }
216
217    return puts;
218  }
219
220  @AfterClass
221  public static void tearDownAfterClass() throws Exception {
222    CONN.close();
223    TEST_UTIL.shutdownMiniCluster();
224  }
225
226  @Before
227  public void setupBeforeTest() throws Exception {
228    disableSleeping();
229  }
230
231  @After
232  public void teardownAfterTest() throws Exception {
233    disableSleeping();
234  }
235
236  /**
237   * Run the test callable when heartbeats are enabled/disabled. We expect all tests to only pass
238   * when heartbeat messages are enabled (otherwise the test is pointless). When heartbeats are
239   * disabled, the test should throw an exception.
240   */
241  private void testImportanceOfHeartbeats(Callable<Void> testCallable) throws InterruptedException {
242    HeartbeatRPCServices.heartbeatsEnabled = true;
243
244    try {
245      testCallable.call();
246    } catch (Exception e) {
247      fail("Heartbeat messages are enabled, exceptions should NOT be thrown. Exception trace:"
248        + ExceptionUtils.getStackTrace(e));
249    }
250
251    HeartbeatRPCServices.heartbeatsEnabled = false;
252    try {
253      testCallable.call();
254    } catch (Exception e) {
255      return;
256    } finally {
257      HeartbeatRPCServices.heartbeatsEnabled = true;
258    }
259    fail("Heartbeats messages are disabled, an exception should be thrown. If an exception "
260      + " is not thrown, the test case is not testing the importance of heartbeat messages");
261  }
262
263  /**
264   * Test the case that the time limit for the scan is reached after each full row of cells is
265   * fetched.
266   */
267  @Test
268  public void testHeartbeatBetweenRows() throws Exception {
269    testImportanceOfHeartbeats(new Callable<Void>() {
270
271      @Override
272      public Void call() throws Exception {
273        // Configure the scan so that it can read the entire table in a single RPC. We want to test
274        // the case where a scan stops on the server side due to a time limit
275        Scan scan = new Scan();
276        scan.setMaxResultSize(Long.MAX_VALUE);
277        scan.setCaching(Integer.MAX_VALUE);
278
279        testEquivalenceOfScanWithHeartbeats(scan, DEFAULT_ROW_SLEEP_TIME, -1, false);
280        return null;
281      }
282    });
283  }
284
285  /**
286   * Test the case that the time limit for scans is reached in between column families
287   */
288  @Test
289  public void testHeartbeatBetweenColumnFamilies() throws Exception {
290    testImportanceOfHeartbeats(new Callable<Void>() {
291      @Override
292      public Void call() throws Exception {
293        // Configure the scan so that it can read the entire table in a single RPC. We want to test
294        // the case where a scan stops on the server side due to a time limit
295        Scan baseScan = new Scan();
296        baseScan.setMaxResultSize(Long.MAX_VALUE);
297        baseScan.setCaching(Integer.MAX_VALUE);
298
299        // Copy the scan before each test. When a scan object is used by a scanner, some of its
300        // fields may be changed such as start row
301        Scan scanCopy = new Scan(baseScan);
302        testEquivalenceOfScanWithHeartbeats(scanCopy, -1, DEFAULT_CF_SLEEP_TIME, false);
303        scanCopy = new Scan(baseScan);
304        testEquivalenceOfScanWithHeartbeats(scanCopy, -1, DEFAULT_CF_SLEEP_TIME, true);
305        return null;
306      }
307    });
308  }
309
310  public static class SparseCellFilter extends FilterBase {
311
312    @Override
313    public ReturnCode filterCell(final Cell v) throws IOException {
314      try {
315        Thread.sleep(CLIENT_TIMEOUT / 2 + 100);
316      } catch (InterruptedException e) {
317        Thread.currentThread().interrupt();
318      }
319      return Bytes.equals(CellUtil.cloneRow(v), ROWS[NUM_ROWS - 1])
320        ? ReturnCode.INCLUDE
321        : ReturnCode.SKIP;
322    }
323
324    public static Filter parseFrom(final byte[] pbBytes) {
325      return new SparseCellFilter();
326    }
327  }
328
329  public static class SparseRowFilter extends FilterBase {
330
331    @Override
332    public boolean filterRowKey(Cell cell) throws IOException {
333      try {
334        Thread.sleep(CLIENT_TIMEOUT / 2 - 100);
335      } catch (InterruptedException e) {
336        Thread.currentThread().interrupt();
337      }
338      return !Bytes.equals(CellUtil.cloneRow(cell), ROWS[NUM_ROWS - 1]);
339    }
340
341    public static Filter parseFrom(final byte[] pbBytes) {
342      return new SparseRowFilter();
343    }
344  }
345
346  /**
347   * Test the case that there is a filter which filters most of cells
348   */
349  @Test
350  public void testHeartbeatWithSparseCellFilter() throws Exception {
351    testImportanceOfHeartbeats(new Callable<Void>() {
352      @Override
353      public Void call() throws Exception {
354        Scan scan = new Scan();
355        scan.setMaxResultSize(Long.MAX_VALUE);
356        scan.setCaching(Integer.MAX_VALUE);
357        scan.setFilter(new SparseCellFilter());
358        ResultScanner scanner = TABLE.getScanner(scan);
359        int num = 0;
360        while (scanner.next() != null) {
361          num++;
362        }
363        assertEquals(1, num);
364        scanner.close();
365
366        scan = new Scan();
367        scan.setMaxResultSize(Long.MAX_VALUE);
368        scan.setCaching(Integer.MAX_VALUE);
369        scan.setFilter(new SparseCellFilter());
370        scan.setAllowPartialResults(true);
371        scanner = TABLE.getScanner(scan);
372        num = 0;
373        while (scanner.next() != null) {
374          num++;
375        }
376        assertEquals(NUM_FAMILIES * NUM_QUALIFIERS, num);
377        scanner.close();
378
379        return null;
380      }
381    });
382  }
383
384  /**
385   * Test the case that there is a filter which filters most of rows
386   */
387  @Test
388  public void testHeartbeatWithSparseRowFilter() throws Exception {
389    testImportanceOfHeartbeats(new Callable<Void>() {
390      @Override
391      public Void call() throws Exception {
392        Scan scan = new Scan();
393        scan.setMaxResultSize(Long.MAX_VALUE);
394        scan.setCaching(Integer.MAX_VALUE);
395        scan.setFilter(new SparseRowFilter());
396        ResultScanner scanner = TABLE.getScanner(scan);
397        int num = 0;
398        while (scanner.next() != null) {
399          num++;
400        }
401        assertEquals(1, num);
402        scanner.close();
403
404        return null;
405      }
406    });
407  }
408
409  /**
410   * Test the equivalence of a scan versus the same scan executed when heartbeat messages are
411   * necessary
412   * @param scan          The scan configuration being tested
413   * @param rowSleepTime  The time to sleep between fetches of row cells
414   * @param cfSleepTime   The time to sleep between fetches of column family cells
415   * @param sleepBeforeCf set to true when column family sleeps should occur before the cells for
416   *                      that column family are fetched
417   */
418  private void testEquivalenceOfScanWithHeartbeats(final Scan scan, int rowSleepTime,
419    int cfSleepTime, boolean sleepBeforeCf) throws Exception {
420    disableSleeping();
421    final ResultScanner scanner = TABLE.getScanner(scan);
422    final ResultScanner scannerWithHeartbeats = TABLE.getScanner(scan);
423
424    Result r1 = null;
425    Result r2 = null;
426
427    while ((r1 = scanner.next()) != null) {
428      // Enforce the specified sleep conditions during calls to the heartbeat scanner
429      configureSleepTime(rowSleepTime, cfSleepTime, sleepBeforeCf);
430      r2 = scannerWithHeartbeats.next();
431      disableSleeping();
432
433      assertTrue(r2 != null);
434      try {
435        Result.compareResults(r1, r2);
436      } catch (Exception e) {
437        fail(e.getMessage());
438      }
439    }
440
441    assertTrue(scannerWithHeartbeats.next() == null);
442    scanner.close();
443    scannerWithHeartbeats.close();
444  }
445
446  /**
447   * Helper method for setting the time to sleep between rows and column families. If a sleep time
448   * is negative then that sleep will be disabled
449   */
450  private static void configureSleepTime(int rowSleepTime, int cfSleepTime, boolean sleepBeforeCf) {
451    HeartbeatHRegion.sleepBetweenRows = rowSleepTime > 0;
452    HeartbeatHRegion.rowSleepTime = rowSleepTime;
453
454    HeartbeatHRegion.sleepBetweenColumnFamilies = cfSleepTime > 0;
455    HeartbeatHRegion.columnFamilySleepTime = cfSleepTime;
456    HeartbeatHRegion.sleepBeforeColumnFamily = sleepBeforeCf;
457  }
458
459  /**
460   * Disable the sleeping mechanism server side.
461   */
462  private static void disableSleeping() {
463    HeartbeatHRegion.sleepBetweenRows = false;
464    HeartbeatHRegion.sleepBetweenColumnFamilies = false;
465  }
466
467  /**
468   * Custom HRegionServer instance that instantiates {@link HeartbeatRPCServices} in place of
469   * {@link RSRpcServices} to allow us to toggle support for heartbeat messages
470   */
471  private static class HeartbeatHRegionServer extends HRegionServer {
472    public HeartbeatHRegionServer(Configuration conf) throws IOException, InterruptedException {
473      super(conf);
474    }
475
476    @Override
477    protected RSRpcServices createRpcServices() throws IOException {
478      return new HeartbeatRPCServices(this);
479    }
480  }
481
482  /**
483   * Custom RSRpcServices instance that allows heartbeat support to be toggled
484   */
485  private static class HeartbeatRPCServices extends RSRpcServices {
486    private static volatile boolean heartbeatsEnabled = true;
487
488    public HeartbeatRPCServices(HRegionServer rs) throws IOException {
489      super(rs);
490    }
491
492    @Override
493    public ScanResponse scan(RpcController controller, ScanRequest request)
494      throws ServiceException {
495      ScanRequest.Builder builder = ScanRequest.newBuilder(request);
496      builder.setClientHandlesHeartbeats(heartbeatsEnabled);
497      return super.scan(controller, builder.build());
498    }
499  }
500
501  /**
502   * Custom HRegion class that instantiates {@link RegionScanner}s with configurable sleep times
503   * between fetches of row Results and/or column family cells. Useful for emulating an instance
504   * where the server is taking a long time to process a client's scan request
505   */
506  private static class HeartbeatHRegion extends HRegion {
507    // Row sleeps occur AFTER each row worth of cells is retrieved.
508    private static volatile int rowSleepTime = DEFAULT_ROW_SLEEP_TIME;
509    private static volatile boolean sleepBetweenRows = false;
510
511    // The sleep for column families can be initiated before or after we fetch the cells for the
512    // column family. If the sleep occurs BEFORE then the time limits will be reached inside
513    // StoreScanner while we are fetching individual cells. If the sleep occurs AFTER then the time
514    // limit will be reached inside RegionScanner after all the cells for a column family have been
515    // retrieved.
516    private static volatile boolean sleepBeforeColumnFamily = false;
517    private static volatile int columnFamilySleepTime = DEFAULT_CF_SLEEP_TIME;
518    private static volatile boolean sleepBetweenColumnFamilies = false;
519
520    public HeartbeatHRegion(Path tableDir, WAL wal, FileSystem fs, Configuration confParam,
521      RegionInfo regionInfo, TableDescriptor htd, RegionServerServices rsServices) {
522      super(tableDir, wal, fs, confParam, regionInfo, htd, rsServices);
523    }
524
525    public HeartbeatHRegion(HRegionFileSystem fs, WAL wal, Configuration confParam,
526      TableDescriptor htd, RegionServerServices rsServices) {
527      super(fs, wal, confParam, htd, rsServices);
528    }
529
530    private static void columnFamilySleep() {
531      if (sleepBetweenColumnFamilies) {
532        Threads.sleepWithoutInterrupt(columnFamilySleepTime);
533      }
534    }
535
536    private static void rowSleep() {
537      if (sleepBetweenRows) {
538        Threads.sleepWithoutInterrupt(rowSleepTime);
539      }
540    }
541
542    // Instantiate the custom heartbeat region scanners
543    @Override
544    protected RegionScannerImpl instantiateRegionScanner(Scan scan,
545      List<KeyValueScanner> additionalScanners, long nonceGroup, long nonce) throws IOException {
546      if (scan.isReversed()) {
547        if (scan.getFilter() != null) {
548          scan.getFilter().setReversed(true);
549        }
550        return new HeartbeatReversedRegionScanner(scan, additionalScanners, this);
551      }
552      return new HeartbeatRegionScanner(scan, additionalScanners, this);
553    }
554  }
555
556  /**
557   * Custom ReversedRegionScanner that can be configured to sleep between retrievals of row Results
558   * and/or column family cells
559   */
560  private static class HeartbeatReversedRegionScanner extends ReversedRegionScannerImpl {
561    HeartbeatReversedRegionScanner(Scan scan, List<KeyValueScanner> additionalScanners,
562      HRegion region) throws IOException {
563      super(scan, additionalScanners, region, HConstants.NO_NONCE, HConstants.NO_NONCE);
564    }
565
566    @Override
567    public boolean nextRaw(List<Cell> outResults, ScannerContext context) throws IOException {
568      boolean moreRows = super.nextRaw(outResults, context);
569      HeartbeatHRegion.rowSleep();
570      return moreRows;
571    }
572
573    @Override
574    protected void initializeKVHeap(List<KeyValueScanner> scanners,
575      List<KeyValueScanner> joinedScanners, HRegion region) throws IOException {
576      this.storeHeap =
577        new HeartbeatReversedKVHeap(scanners, (CellComparatorImpl) region.getCellComparator());
578      if (!joinedScanners.isEmpty()) {
579        this.joinedHeap = new HeartbeatReversedKVHeap(joinedScanners,
580          (CellComparatorImpl) region.getCellComparator());
581      }
582    }
583  }
584
585  /**
586   * Custom RegionScanner that can be configured to sleep between retrievals of row Results and/or
587   * column family cells
588   */
589  private static class HeartbeatRegionScanner extends RegionScannerImpl {
590    HeartbeatRegionScanner(Scan scan, List<KeyValueScanner> additionalScanners, HRegion region)
591      throws IOException {
592      super(scan, additionalScanners, region, HConstants.NO_NONCE, HConstants.NO_NONCE);
593    }
594
595    @Override
596    public boolean nextRaw(List<Cell> outResults, ScannerContext context) throws IOException {
597      boolean moreRows = super.nextRaw(outResults, context);
598      HeartbeatHRegion.rowSleep();
599      return moreRows;
600    }
601
602    @Override
603    protected void initializeKVHeap(List<KeyValueScanner> scanners,
604      List<KeyValueScanner> joinedScanners, HRegion region) throws IOException {
605      this.storeHeap = new HeartbeatKVHeap(scanners, region.getCellComparator());
606      if (!joinedScanners.isEmpty()) {
607        this.joinedHeap = new HeartbeatKVHeap(joinedScanners, region.getCellComparator());
608      }
609    }
610  }
611
612  /**
613   * Custom KV Heap that can be configured to sleep/wait in between retrievals of column family
614   * cells. Useful for testing
615   */
616  private static final class HeartbeatKVHeap extends KeyValueHeap {
617    public HeartbeatKVHeap(List<? extends KeyValueScanner> scanners, CellComparator comparator)
618      throws IOException {
619      super(scanners, comparator);
620    }
621
622    HeartbeatKVHeap(List<? extends KeyValueScanner> scanners, KVScannerComparator comparator)
623      throws IOException {
624      super(scanners, comparator);
625    }
626
627    @Override
628    public boolean next(List<Cell> result, ScannerContext context) throws IOException {
629      if (HeartbeatHRegion.sleepBeforeColumnFamily) HeartbeatHRegion.columnFamilySleep();
630      boolean moreRows = super.next(result, context);
631      if (!HeartbeatHRegion.sleepBeforeColumnFamily) HeartbeatHRegion.columnFamilySleep();
632      return moreRows;
633    }
634  }
635
636  /**
637   * Custom reversed KV Heap that can be configured to sleep in between retrievals of column family
638   * cells.
639   */
640  private static final class HeartbeatReversedKVHeap extends ReversedKeyValueHeap {
641    public HeartbeatReversedKVHeap(List<? extends KeyValueScanner> scanners,
642      CellComparatorImpl comparator) throws IOException {
643      super(scanners, comparator);
644    }
645
646    @Override
647    public boolean next(List<Cell> result, ScannerContext context) throws IOException {
648      if (HeartbeatHRegion.sleepBeforeColumnFamily) HeartbeatHRegion.columnFamilySleep();
649      boolean moreRows = super.next(result, context);
650      if (!HeartbeatHRegion.sleepBeforeColumnFamily) HeartbeatHRegion.columnFamilySleep();
651      return moreRows;
652    }
653  }
654}