001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.regionserver.RSRpcServices.DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertTrue;
023import static org.junit.Assert.fail;
024import static org.mockito.Mockito.when;
025
026import java.io.IOException;
027import java.util.ArrayList;
028import java.util.List;
029import java.util.concurrent.Callable;
030import org.apache.commons.lang3.exception.ExceptionUtils;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.fs.FileSystem;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.hbase.Cell;
035import org.apache.hadoop.hbase.CellComparator;
036import org.apache.hadoop.hbase.CellComparatorImpl;
037import org.apache.hadoop.hbase.CellUtil;
038import org.apache.hadoop.hbase.HBaseClassTestRule;
039import org.apache.hadoop.hbase.HBaseTestingUtil;
040import org.apache.hadoop.hbase.HConstants;
041import org.apache.hadoop.hbase.HTestConst;
042import org.apache.hadoop.hbase.KeyValue;
043import org.apache.hadoop.hbase.TableName;
044import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer;
045import org.apache.hadoop.hbase.client.AsyncConnection;
046import org.apache.hadoop.hbase.client.AsyncTable;
047import org.apache.hadoop.hbase.client.ConnectionFactory;
048import org.apache.hadoop.hbase.client.Put;
049import org.apache.hadoop.hbase.client.RegionInfo;
050import org.apache.hadoop.hbase.client.Result;
051import org.apache.hadoop.hbase.client.ResultScanner;
052import org.apache.hadoop.hbase.client.Scan;
053import org.apache.hadoop.hbase.client.ScanPerNextResultScanner;
054import org.apache.hadoop.hbase.client.Table;
055import org.apache.hadoop.hbase.client.TableDescriptor;
056import org.apache.hadoop.hbase.filter.Filter;
057import org.apache.hadoop.hbase.filter.FilterBase;
058import org.apache.hadoop.hbase.ipc.HBaseRpcController;
059import org.apache.hadoop.hbase.ipc.RpcCall;
060import org.apache.hadoop.hbase.testclassification.LargeTests;
061import org.apache.hadoop.hbase.util.Bytes;
062import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
063import org.apache.hadoop.hbase.util.Threads;
064import org.apache.hadoop.hbase.wal.WAL;
065import org.junit.After;
066import org.junit.AfterClass;
067import org.junit.Before;
068import org.junit.BeforeClass;
069import org.junit.ClassRule;
070import org.junit.Test;
071import org.junit.experimental.categories.Category;
072import org.mockito.Mockito;
073
074import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
075import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
076import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
077
078import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
079import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
080
081/**
082 * Here we test to make sure that scans return the expected Results when the server is sending the
083 * Client heartbeat messages. Heartbeat messages are essentially keep-alive messages (they prevent
084 * the scanner on the client side from timing out). A heartbeat message is sent from the server to
085 * the client when the server has exceeded the time limit during the processing of the scan. When
086 * the time limit is reached, the server will return to the Client whatever Results it has
087 * accumulated (potentially empty).
088 */
089@Category(LargeTests.class)
090public class TestScannerHeartbeatMessages {
091
092  @ClassRule
093  public static final HBaseClassTestRule CLASS_RULE =
094    HBaseClassTestRule.forClass(TestScannerHeartbeatMessages.class);
095
096  private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
097
098  private static AsyncConnection CONN;
099
100  /**
101   * Table configuration
102   */
103  private static TableName TABLE_NAME = TableName.valueOf("testScannerHeartbeatMessagesTable");
104
105  private static int NUM_ROWS = 5;
106  private static byte[] ROW = Bytes.toBytes("testRow");
107  private static byte[][] ROWS = HTestConst.makeNAscii(ROW, NUM_ROWS);
108
109  private static int NUM_FAMILIES = 4;
110  private static byte[] FAMILY = Bytes.toBytes("testFamily");
111  private static byte[][] FAMILIES = HTestConst.makeNAscii(FAMILY, NUM_FAMILIES);
112
113  private static int NUM_QUALIFIERS = 3;
114  private static byte[] QUALIFIER = Bytes.toBytes("testQualifier");
115  private static byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, NUM_QUALIFIERS);
116
117  private static int VALUE_SIZE = 128;
118  private static byte[] VALUE = Bytes.createMaxByteArray(VALUE_SIZE);
119
120  // The time limit should be based on the rpc timeout at client, or the client will regards
121  // the request as timeout before server return a heartbeat.
122  private static int SERVER_TIMEOUT = 60000;
123
124  // Time, in milliseconds, that the client will wait for a response from the server before timing
125  // out. This value is used server side to determine when it is necessary to send a heartbeat
126  // message to the client. Time limit will be 500 ms.
127  private static int CLIENT_TIMEOUT = 1000;
128
129  // In this test, we sleep after reading each row. So we should make sure after we get some number
130  // of rows and sleep same times we must reach time limit, and do not timeout after next sleeping.
131  private static int DEFAULT_ROW_SLEEP_TIME = 300;
132
133  // Similar with row sleep time.
134  private static int DEFAULT_CF_SLEEP_TIME = 300;
135
136  @BeforeClass
137  public static void setUpBeforeClass() throws Exception {
138    Configuration conf = TEST_UTIL.getConfiguration();
139
140    conf.setStrings(HConstants.REGION_IMPL, HeartbeatHRegion.class.getName());
141    conf.setStrings(HConstants.REGION_SERVER_IMPL, HeartbeatHRegionServer.class.getName());
142    conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, SERVER_TIMEOUT);
143    conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, SERVER_TIMEOUT);
144    conf.setInt(HConstants.HBASE_CLIENT_PAUSE, 1);
145
146    // Check the timeout condition after every cell
147    conf.setLong(StoreScanner.HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK, 1);
148    TEST_UTIL.startMiniCluster(1);
149
150    createTestTable(TABLE_NAME, ROWS, FAMILIES, QUALIFIERS, VALUE);
151
152    Configuration newConf = new Configuration(conf);
153    newConf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, CLIENT_TIMEOUT);
154    newConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, CLIENT_TIMEOUT);
155    CONN = ConnectionFactory.createAsyncConnection(newConf).get();
156  }
157
158  @Test
159  public void testTimeLimitAccountsForQueueTime() throws IOException, InterruptedException {
160    HRegionServer rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0);
161    RSRpcServices services = new RSRpcServices(rs);
162    RpcCall mockRpcCall = Mockito.mock(RpcCall.class);
163    // first return 180 (minimal queuing), then 120 (more queueing), then 101 (heavy queueing)
164    // finally, 25 is fatal levels of queueing -- exceeding timeout
165    when(mockRpcCall.getReceiveTime()).thenReturn(180L, 120L, 101L, 25L);
166
167    // assume timeout of 100ms
168    HBaseRpcController mockController = Mockito.mock(HBaseRpcController.class);
169    when(mockController.getCallTimeout()).thenReturn(100);
170
171    // current time is 100, which we'll subtract from 90 and 50 to generate some time deltas
172    EnvironmentEdgeManager.injectEdge(() -> 200L);
173
174    try {
175      // we queued for 20ms, leaving 80ms of timeout, which we divide by 2
176      assertEquals(200 + (100 - 20) / 2, services.getTimeLimit(mockRpcCall, mockController, true));
177      // we queued for 80ms, leaving 20ms of timeout, which we divide by 2
178      assertEquals(200 + (100 - 80) / 2, services.getTimeLimit(mockRpcCall, mockController, true));
179      // we queued for 99ms of 100ms timeout, leaving only 1ms. we fall back to default minimum
180      assertEquals(200 + DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA,
181        services.getTimeLimit(mockRpcCall, mockController, true));
182      // lastly, we queue for 175ms of 100ms timeout. this should be very rare since we drop
183      // timed out calls in the queue. in this case we still fallback on default minimum for now.
184      assertEquals(200 + DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA,
185        services.getTimeLimit(mockRpcCall, mockController, true));
186    } finally {
187      EnvironmentEdgeManager.reset();
188    }
189
190  }
191
192  static void createTestTable(TableName name, byte[][] rows, byte[][] families, byte[][] qualifiers,
193    byte[] cellValue) throws IOException {
194    Table ht = TEST_UTIL.createTable(name, families);
195    List<Put> puts = createPuts(rows, families, qualifiers, cellValue);
196    ht.put(puts);
197  }
198
199  /**
200   * Make puts to put the input value into each combination of row, family, and qualifier
201   */
202  static ArrayList<Put> createPuts(byte[][] rows, byte[][] families, byte[][] qualifiers,
203    byte[] value) throws IOException {
204    Put put;
205    ArrayList<Put> puts = new ArrayList<>();
206
207    for (int row = 0; row < rows.length; row++) {
208      put = new Put(rows[row]);
209      for (int fam = 0; fam < families.length; fam++) {
210        for (int qual = 0; qual < qualifiers.length; qual++) {
211          KeyValue kv = new KeyValue(rows[row], families[fam], qualifiers[qual], qual, value);
212          put.add(kv);
213        }
214      }
215      puts.add(put);
216    }
217
218    return puts;
219  }
220
221  @AfterClass
222  public static void tearDownAfterClass() throws Exception {
223    Closeables.close(CONN, true);
224    TEST_UTIL.shutdownMiniCluster();
225  }
226
227  @Before
228  public void setupBeforeTest() throws Exception {
229    disableSleeping();
230  }
231
232  @After
233  public void teardownAfterTest() throws Exception {
234    disableSleeping();
235  }
236
237  /**
238   * Run the test callable when heartbeats are enabled/disabled. We expect all tests to only pass
239   * when heartbeat messages are enabled (otherwise the test is pointless). When heartbeats are
240   * disabled, the test should throw an exception.
241   */
242  private void testImportanceOfHeartbeats(Callable<Void> testCallable) throws InterruptedException {
243    HeartbeatRPCServices.heartbeatsEnabled = true;
244
245    try {
246      testCallable.call();
247    } catch (Exception e) {
248      fail("Heartbeat messages are enabled, exceptions should NOT be thrown. Exception trace:"
249        + ExceptionUtils.getStackTrace(e));
250    }
251
252    HeartbeatRPCServices.heartbeatsEnabled = false;
253    try {
254      testCallable.call();
255    } catch (Exception e) {
256      return;
257    } finally {
258      HeartbeatRPCServices.heartbeatsEnabled = true;
259    }
260    fail("Heartbeats messages are disabled, an exception should be thrown. If an exception "
261      + " is not thrown, the test case is not testing the importance of heartbeat messages");
262  }
263
264  /**
265   * Test the case that the time limit for the scan is reached after each full row of cells is
266   * fetched.
267   */
268  @Test
269  public void testHeartbeatBetweenRows() throws Exception {
270    testImportanceOfHeartbeats(new Callable<Void>() {
271
272      @Override
273      public Void call() throws Exception {
274        // Configure the scan so that it can read the entire table in a single RPC. We want to test
275        // the case where a scan stops on the server side due to a time limit
276        Scan scan = new Scan();
277        scan.setMaxResultSize(Long.MAX_VALUE);
278        scan.setCaching(Integer.MAX_VALUE);
279
280        testEquivalenceOfScanWithHeartbeats(scan, DEFAULT_ROW_SLEEP_TIME, -1, false);
281        return null;
282      }
283    });
284  }
285
286  /**
287   * Test the case that the time limit for scans is reached in between column families
288   */
289  @Test
290  public void testHeartbeatBetweenColumnFamilies() throws Exception {
291    testImportanceOfHeartbeats(new Callable<Void>() {
292      @Override
293      public Void call() throws Exception {
294        // Configure the scan so that it can read the entire table in a single RPC. We want to test
295        // the case where a scan stops on the server side due to a time limit
296        Scan baseScan = new Scan();
297        baseScan.setMaxResultSize(Long.MAX_VALUE);
298        baseScan.setCaching(Integer.MAX_VALUE);
299
300        // Copy the scan before each test. When a scan object is used by a scanner, some of its
301        // fields may be changed such as start row
302        Scan scanCopy = new Scan(baseScan);
303        testEquivalenceOfScanWithHeartbeats(scanCopy, -1, DEFAULT_CF_SLEEP_TIME, false);
304        scanCopy = new Scan(baseScan);
305        testEquivalenceOfScanWithHeartbeats(scanCopy, -1, DEFAULT_CF_SLEEP_TIME, true);
306        return null;
307      }
308    });
309  }
310
311  public static class SparseCellFilter extends FilterBase {
312
313    @Override
314    public ReturnCode filterCell(final Cell v) throws IOException {
315      try {
316        Thread.sleep(CLIENT_TIMEOUT / 2 + 100);
317      } catch (InterruptedException e) {
318        Thread.currentThread().interrupt();
319      }
320      return Bytes.equals(CellUtil.cloneRow(v), ROWS[NUM_ROWS - 1])
321        ? ReturnCode.INCLUDE
322        : ReturnCode.SKIP;
323    }
324
325    public static Filter parseFrom(final byte[] pbBytes) {
326      return new SparseCellFilter();
327    }
328  }
329
330  public static class SparseRowFilter extends FilterBase {
331
332    @Override
333    public boolean filterRowKey(Cell cell) throws IOException {
334      try {
335        Thread.sleep(CLIENT_TIMEOUT / 2 - 100);
336      } catch (InterruptedException e) {
337        Thread.currentThread().interrupt();
338      }
339      return !Bytes.equals(CellUtil.cloneRow(cell), ROWS[NUM_ROWS - 1]);
340    }
341
342    public static Filter parseFrom(final byte[] pbBytes) {
343      return new SparseRowFilter();
344    }
345  }
346
347  /**
348   * Test the case that there is a filter which filters most of cells
349   */
350  @Test
351  public void testHeartbeatWithSparseCellFilter() throws Exception {
352    testImportanceOfHeartbeats(new Callable<Void>() {
353      @Override
354      public Void call() throws Exception {
355        Scan scan = new Scan();
356        scan.setMaxResultSize(Long.MAX_VALUE);
357        scan.setCaching(Integer.MAX_VALUE);
358        scan.setFilter(new SparseCellFilter());
359        try (ScanPerNextResultScanner scanner =
360          new ScanPerNextResultScanner(CONN.getTable(TABLE_NAME), scan)) {
361          int num = 0;
362          while (scanner.next() != null) {
363            num++;
364          }
365          assertEquals(1, num);
366        }
367
368        scan = new Scan();
369        scan.setMaxResultSize(Long.MAX_VALUE);
370        scan.setCaching(Integer.MAX_VALUE);
371        scan.setFilter(new SparseCellFilter());
372        scan.setAllowPartialResults(true);
373        try (ScanPerNextResultScanner scanner =
374          new ScanPerNextResultScanner(CONN.getTable(TABLE_NAME), scan)) {
375          int num = 0;
376          while (scanner.next() != null) {
377            num++;
378          }
379          assertEquals(NUM_FAMILIES * NUM_QUALIFIERS, num);
380        }
381
382        return null;
383      }
384    });
385  }
386
387  /**
388   * Test the case that there is a filter which filters most of rows
389   */
390  @Test
391  public void testHeartbeatWithSparseRowFilter() throws Exception {
392    testImportanceOfHeartbeats(new Callable<Void>() {
393      @Override
394      public Void call() throws Exception {
395        Scan scan = new Scan();
396        scan.setMaxResultSize(Long.MAX_VALUE);
397        scan.setCaching(Integer.MAX_VALUE);
398        scan.setFilter(new SparseRowFilter());
399        try (ScanPerNextResultScanner scanner =
400          new ScanPerNextResultScanner(CONN.getTable(TABLE_NAME), scan)) {
401          int num = 0;
402          while (scanner.next() != null) {
403            num++;
404          }
405          assertEquals(1, num);
406        }
407
408        return null;
409      }
410    });
411  }
412
413  /**
414   * Test the equivalence of a scan versus the same scan executed when heartbeat messages are
415   * necessary
416   * @param scan          The scan configuration being tested
417   * @param rowSleepTime  The time to sleep between fetches of row cells
418   * @param cfSleepTime   The time to sleep between fetches of column family cells
419   * @param sleepBeforeCf set to true when column family sleeps should occur before the cells for
420   *                      that column family are fetched
421   */
422  private void testEquivalenceOfScanWithHeartbeats(final Scan scan, int rowSleepTime,
423    int cfSleepTime, boolean sleepBeforeCf) throws Exception {
424    disableSleeping();
425    AsyncTable<AdvancedScanResultConsumer> table = CONN.getTable(TABLE_NAME);
426    final ResultScanner scanner = new ScanPerNextResultScanner(table, scan);
427    final ResultScanner scannerWithHeartbeats = new ScanPerNextResultScanner(table, scan);
428
429    Result r1 = null;
430    Result r2 = null;
431
432    while ((r1 = scanner.next()) != null) {
433      // Enforce the specified sleep conditions during calls to the heartbeat scanner
434      configureSleepTime(rowSleepTime, cfSleepTime, sleepBeforeCf);
435      r2 = scannerWithHeartbeats.next();
436      disableSleeping();
437
438      assertTrue(r2 != null);
439      try {
440        Result.compareResults(r1, r2);
441      } catch (Exception e) {
442        fail(e.getMessage());
443      }
444    }
445
446    assertTrue(scannerWithHeartbeats.next() == null);
447    scanner.close();
448    scannerWithHeartbeats.close();
449  }
450
451  /**
452   * Helper method for setting the time to sleep between rows and column families. If a sleep time
453   * is negative then that sleep will be disabled
454   */
455  private static void configureSleepTime(int rowSleepTime, int cfSleepTime, boolean sleepBeforeCf) {
456    HeartbeatHRegion.sleepBetweenRows = rowSleepTime > 0;
457    HeartbeatHRegion.rowSleepTime = rowSleepTime;
458
459    HeartbeatHRegion.sleepBetweenColumnFamilies = cfSleepTime > 0;
460    HeartbeatHRegion.columnFamilySleepTime = cfSleepTime;
461    HeartbeatHRegion.sleepBeforeColumnFamily = sleepBeforeCf;
462  }
463
464  /**
465   * Disable the sleeping mechanism server side.
466   */
467  private static void disableSleeping() {
468    HeartbeatHRegion.sleepBetweenRows = false;
469    HeartbeatHRegion.sleepBetweenColumnFamilies = false;
470  }
471
472  /**
473   * Custom HRegionServer instance that instantiates {@link HeartbeatRPCServices} in place of
474   * {@link RSRpcServices} to allow us to toggle support for heartbeat messages
475   */
476  private static class HeartbeatHRegionServer extends HRegionServer {
477    public HeartbeatHRegionServer(Configuration conf) throws IOException, InterruptedException {
478      super(conf);
479    }
480
481    @Override
482    protected RSRpcServices createRpcServices() throws IOException {
483      return new HeartbeatRPCServices(this);
484    }
485  }
486
487  /**
488   * Custom RSRpcServices instance that allows heartbeat support to be toggled
489   */
490  private static class HeartbeatRPCServices extends RSRpcServices {
491    private static volatile boolean heartbeatsEnabled = true;
492
493    public HeartbeatRPCServices(HRegionServer rs) throws IOException {
494      super(rs);
495    }
496
497    @Override
498    public ScanResponse scan(RpcController controller, ScanRequest request)
499      throws ServiceException {
500      ScanRequest.Builder builder = ScanRequest.newBuilder(request);
501      builder.setClientHandlesHeartbeats(heartbeatsEnabled);
502      return super.scan(controller, builder.build());
503    }
504  }
505
506  /**
507   * Custom HRegion class that instantiates {@link RegionScanner}s with configurable sleep times
508   * between fetches of row Results and/or column family cells. Useful for emulating an instance
509   * where the server is taking a long time to process a client's scan request
510   */
511  private static class HeartbeatHRegion extends HRegion {
512    // Row sleeps occur AFTER each row worth of cells is retrieved.
513    private static volatile int rowSleepTime = DEFAULT_ROW_SLEEP_TIME;
514    private static volatile boolean sleepBetweenRows = false;
515
516    // The sleep for column families can be initiated before or after we fetch the cells for the
517    // column family. If the sleep occurs BEFORE then the time limits will be reached inside
518    // StoreScanner while we are fetching individual cells. If the sleep occurs AFTER then the time
519    // limit will be reached inside RegionScanner after all the cells for a column family have been
520    // retrieved.
521    private static volatile boolean sleepBeforeColumnFamily = false;
522    private static volatile int columnFamilySleepTime = DEFAULT_CF_SLEEP_TIME;
523    private static volatile boolean sleepBetweenColumnFamilies = false;
524
525    public HeartbeatHRegion(Path tableDir, WAL wal, FileSystem fs, Configuration confParam,
526      RegionInfo regionInfo, TableDescriptor htd, RegionServerServices rsServices) {
527      super(tableDir, wal, fs, confParam, regionInfo, htd, rsServices);
528    }
529
530    public HeartbeatHRegion(HRegionFileSystem fs, WAL wal, Configuration confParam,
531      TableDescriptor htd, RegionServerServices rsServices) {
532      super(fs, wal, confParam, htd, rsServices);
533    }
534
535    private static void columnFamilySleep() {
536      if (sleepBetweenColumnFamilies) {
537        Threads.sleepWithoutInterrupt(columnFamilySleepTime);
538      }
539    }
540
541    private static void rowSleep() {
542      if (sleepBetweenRows) {
543        Threads.sleepWithoutInterrupt(rowSleepTime);
544      }
545    }
546
547    // Instantiate the custom heartbeat region scanners
548    @Override
549    protected RegionScannerImpl instantiateRegionScanner(Scan scan,
550      List<KeyValueScanner> additionalScanners, long nonceGroup, long nonce) throws IOException {
551      if (scan.isReversed()) {
552        if (scan.getFilter() != null) {
553          scan.getFilter().setReversed(true);
554        }
555        return new HeartbeatReversedRegionScanner(scan, additionalScanners, this);
556      }
557      return new HeartbeatRegionScanner(scan, additionalScanners, this);
558    }
559  }
560
561  /**
562   * Custom ReversedRegionScanner that can be configured to sleep between retrievals of row Results
563   * and/or column family cells
564   */
565  private static class HeartbeatReversedRegionScanner extends ReversedRegionScannerImpl {
566    HeartbeatReversedRegionScanner(Scan scan, List<KeyValueScanner> additionalScanners,
567      HRegion region) throws IOException {
568      super(scan, additionalScanners, region, HConstants.NO_NONCE, HConstants.NO_NONCE);
569    }
570
571    @Override
572    public boolean nextRaw(List<Cell> outResults, ScannerContext context) throws IOException {
573      boolean moreRows = super.nextRaw(outResults, context);
574      HeartbeatHRegion.rowSleep();
575      return moreRows;
576    }
577
578    @Override
579    protected void initializeKVHeap(List<KeyValueScanner> scanners,
580      List<KeyValueScanner> joinedScanners, HRegion region) throws IOException {
581      this.storeHeap =
582        new HeartbeatReversedKVHeap(scanners, (CellComparatorImpl) region.getCellComparator());
583      if (!joinedScanners.isEmpty()) {
584        this.joinedHeap = new HeartbeatReversedKVHeap(joinedScanners,
585          (CellComparatorImpl) region.getCellComparator());
586      }
587    }
588  }
589
590  /**
591   * Custom RegionScanner that can be configured to sleep between retrievals of row Results and/or
592   * column family cells
593   */
594  private static class HeartbeatRegionScanner extends RegionScannerImpl {
595    HeartbeatRegionScanner(Scan scan, List<KeyValueScanner> additionalScanners, HRegion region)
596      throws IOException {
597      super(scan, additionalScanners, region, HConstants.NO_NONCE, HConstants.NO_NONCE);
598    }
599
600    @Override
601    public boolean nextRaw(List<Cell> outResults, ScannerContext context) throws IOException {
602      boolean moreRows = super.nextRaw(outResults, context);
603      HeartbeatHRegion.rowSleep();
604      return moreRows;
605    }
606
607    @Override
608    protected void initializeKVHeap(List<KeyValueScanner> scanners,
609      List<KeyValueScanner> joinedScanners, HRegion region) throws IOException {
610      this.storeHeap = new HeartbeatKVHeap(scanners, region.getCellComparator());
611      if (!joinedScanners.isEmpty()) {
612        this.joinedHeap = new HeartbeatKVHeap(joinedScanners, region.getCellComparator());
613      }
614    }
615  }
616
617  /**
618   * Custom KV Heap that can be configured to sleep/wait in between retrievals of column family
619   * cells. Useful for testing
620   */
621  private static final class HeartbeatKVHeap extends KeyValueHeap {
622    public HeartbeatKVHeap(List<? extends KeyValueScanner> scanners, CellComparator comparator)
623      throws IOException {
624      super(scanners, comparator);
625    }
626
627    HeartbeatKVHeap(List<? extends KeyValueScanner> scanners, KVScannerComparator comparator)
628      throws IOException {
629      super(scanners, comparator);
630    }
631
632    @Override
633    public boolean next(List<Cell> result, ScannerContext context) throws IOException {
634      if (HeartbeatHRegion.sleepBeforeColumnFamily) HeartbeatHRegion.columnFamilySleep();
635      boolean moreRows = super.next(result, context);
636      if (!HeartbeatHRegion.sleepBeforeColumnFamily) HeartbeatHRegion.columnFamilySleep();
637      return moreRows;
638    }
639  }
640
641  /**
642   * Custom reversed KV Heap that can be configured to sleep in between retrievals of column family
643   * cells.
644   */
645  private static final class HeartbeatReversedKVHeap extends ReversedKeyValueHeap {
646    public HeartbeatReversedKVHeap(List<? extends KeyValueScanner> scanners,
647      CellComparatorImpl comparator) throws IOException {
648      super(scanners, comparator);
649    }
650
651    @Override
652    public boolean next(List<Cell> result, ScannerContext context) throws IOException {
653      if (HeartbeatHRegion.sleepBeforeColumnFamily) HeartbeatHRegion.columnFamilySleep();
654      boolean moreRows = super.next(result, context);
655      if (!HeartbeatHRegion.sleepBeforeColumnFamily) HeartbeatHRegion.columnFamilySleep();
656      return moreRows;
657    }
658  }
659}