001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.regionserver.RSRpcServices.DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA;
021import static org.junit.jupiter.api.Assertions.assertEquals;
022import static org.junit.jupiter.api.Assertions.assertTrue;
023import static org.junit.jupiter.api.Assertions.fail;
024import static org.mockito.Mockito.when;
025
026import java.io.IOException;
027import java.util.ArrayList;
028import java.util.List;
029import java.util.concurrent.Callable;
030import org.apache.commons.lang3.exception.ExceptionUtils;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.fs.FileSystem;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.hbase.Cell;
035import org.apache.hadoop.hbase.CellComparator;
036import org.apache.hadoop.hbase.CellComparatorImpl;
037import org.apache.hadoop.hbase.CellUtil;
038import org.apache.hadoop.hbase.ExtendedCell;
039import org.apache.hadoop.hbase.HBaseTestingUtil;
040import org.apache.hadoop.hbase.HConstants;
041import org.apache.hadoop.hbase.HTestConst;
042import org.apache.hadoop.hbase.KeyValue;
043import org.apache.hadoop.hbase.TableName;
044import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer;
045import org.apache.hadoop.hbase.client.AsyncConnection;
046import org.apache.hadoop.hbase.client.AsyncTable;
047import org.apache.hadoop.hbase.client.ConnectionFactory;
048import org.apache.hadoop.hbase.client.Put;
049import org.apache.hadoop.hbase.client.RegionInfo;
050import org.apache.hadoop.hbase.client.Result;
051import org.apache.hadoop.hbase.client.ResultScanner;
052import org.apache.hadoop.hbase.client.Scan;
053import org.apache.hadoop.hbase.client.ScanPerNextResultScanner;
054import org.apache.hadoop.hbase.client.Table;
055import org.apache.hadoop.hbase.client.TableDescriptor;
056import org.apache.hadoop.hbase.filter.Filter;
057import org.apache.hadoop.hbase.filter.FilterBase;
058import org.apache.hadoop.hbase.ipc.HBaseRpcController;
059import org.apache.hadoop.hbase.ipc.RpcCall;
060import org.apache.hadoop.hbase.testclassification.LargeTests;
061import org.apache.hadoop.hbase.util.Bytes;
062import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
063import org.apache.hadoop.hbase.util.Threads;
064import org.apache.hadoop.hbase.wal.WAL;
065import org.junit.jupiter.api.AfterAll;
066import org.junit.jupiter.api.AfterEach;
067import org.junit.jupiter.api.BeforeAll;
068import org.junit.jupiter.api.BeforeEach;
069import org.junit.jupiter.api.Tag;
070import org.junit.jupiter.api.Test;
071import org.mockito.Mockito;
072
073import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
074import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
075import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
076
077import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
078import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
079
080/**
081 * Here we test to make sure that scans return the expected Results when the server is sending the
082 * Client heartbeat messages. Heartbeat messages are essentially keep-alive messages (they prevent
083 * the scanner on the client side from timing out). A heartbeat message is sent from the server to
084 * the client when the server has exceeded the time limit during the processing of the scan. When
085 * the time limit is reached, the server will return to the Client whatever Results it has
086 * accumulated (potentially empty).
087 */
088@Tag(LargeTests.TAG)
089public class TestScannerHeartbeatMessages {
090
091  private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
092
093  private static AsyncConnection CONN;
094
095  /**
096   * Table configuration
097   */
098  private static TableName TABLE_NAME = TableName.valueOf("testScannerHeartbeatMessagesTable");
099
100  private static int NUM_ROWS = 5;
101  private static byte[] ROW = Bytes.toBytes("testRow");
102  private static byte[][] ROWS = HTestConst.makeNAscii(ROW, NUM_ROWS);
103
104  private static int NUM_FAMILIES = 4;
105  private static byte[] FAMILY = Bytes.toBytes("testFamily");
106  private static byte[][] FAMILIES = HTestConst.makeNAscii(FAMILY, NUM_FAMILIES);
107
108  private static int NUM_QUALIFIERS = 3;
109  private static byte[] QUALIFIER = Bytes.toBytes("testQualifier");
110  private static byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, NUM_QUALIFIERS);
111
112  private static int VALUE_SIZE = 128;
113  private static byte[] VALUE = Bytes.createMaxByteArray(VALUE_SIZE);
114
115  // The time limit should be based on the rpc timeout at client, or the client will regards
116  // the request as timeout before server return a heartbeat.
117  private static int SERVER_TIMEOUT = 60000;
118
119  // Time, in milliseconds, that the client will wait for a response from the server before timing
120  // out. This value is used server side to determine when it is necessary to send a heartbeat
121  // message to the client. Time limit will be 500 ms.
122  private static int CLIENT_TIMEOUT = 1000;
123
124  // In this test, we sleep after reading each row. So we should make sure after we get some number
125  // of rows and sleep same times we must reach time limit, and do not timeout after next sleeping.
126  private static int DEFAULT_ROW_SLEEP_TIME = 300;
127
128  // Similar with row sleep time.
129  private static int DEFAULT_CF_SLEEP_TIME = 300;
130
131  @BeforeAll
132  public static void setUpBeforeClass() throws Exception {
133    Configuration conf = TEST_UTIL.getConfiguration();
134
135    conf.setStrings(HConstants.REGION_IMPL, HeartbeatHRegion.class.getName());
136    conf.setStrings(HConstants.REGION_SERVER_IMPL, HeartbeatHRegionServer.class.getName());
137    conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, SERVER_TIMEOUT);
138    conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, SERVER_TIMEOUT);
139    conf.setInt(HConstants.HBASE_CLIENT_PAUSE, 1);
140
141    // Check the timeout condition after every cell
142    conf.setLong(StoreScanner.HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK, 1);
143    TEST_UTIL.startMiniCluster(1);
144
145    createTestTable(TABLE_NAME, ROWS, FAMILIES, QUALIFIERS, VALUE);
146
147    Configuration newConf = new Configuration(conf);
148    newConf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, CLIENT_TIMEOUT);
149    newConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, CLIENT_TIMEOUT);
150    CONN = ConnectionFactory.createAsyncConnection(newConf).get();
151  }
152
153  @Test
154  public void testTimeLimitAccountsForQueueTime() throws IOException, InterruptedException {
155    HRegionServer rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0);
156    RSRpcServices services = new RSRpcServices(rs);
157    RpcCall mockRpcCall = Mockito.mock(RpcCall.class);
158    // first return 180 (minimal queuing), then 120 (more queueing), then 101 (heavy queueing)
159    // finally, 25 is fatal levels of queueing -- exceeding timeout
160    when(mockRpcCall.getReceiveTime()).thenReturn(180L, 120L, 101L, 25L);
161
162    // assume timeout of 100ms
163    HBaseRpcController mockController = Mockito.mock(HBaseRpcController.class);
164    when(mockController.getCallTimeout()).thenReturn(100);
165
166    // current time is 100, which we'll subtract from 90 and 50 to generate some time deltas
167    EnvironmentEdgeManager.injectEdge(() -> 200L);
168
169    try {
170      // we queued for 20ms, leaving 80ms of timeout, which we divide by 2
171      assertEquals(200 + (100 - 20) / 2, services.getTimeLimit(mockRpcCall, mockController, true));
172      // we queued for 80ms, leaving 20ms of timeout, which we divide by 2
173      assertEquals(200 + (100 - 80) / 2, services.getTimeLimit(mockRpcCall, mockController, true));
174      // we queued for 99ms of 100ms timeout, leaving only 1ms. we fall back to default minimum
175      assertEquals(200 + DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA,
176        services.getTimeLimit(mockRpcCall, mockController, true));
177      // lastly, we queue for 175ms of 100ms timeout. this should be very rare since we drop
178      // timed out calls in the queue. in this case we still fallback on default minimum for now.
179      assertEquals(200 + DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA,
180        services.getTimeLimit(mockRpcCall, mockController, true));
181    } finally {
182      EnvironmentEdgeManager.reset();
183    }
184
185  }
186
187  static void createTestTable(TableName name, byte[][] rows, byte[][] families, byte[][] qualifiers,
188    byte[] cellValue) throws IOException {
189    Table ht = TEST_UTIL.createTable(name, families);
190    List<Put> puts = createPuts(rows, families, qualifiers, cellValue);
191    ht.put(puts);
192  }
193
194  /**
195   * Make puts to put the input value into each combination of row, family, and qualifier
196   */
197  static ArrayList<Put> createPuts(byte[][] rows, byte[][] families, byte[][] qualifiers,
198    byte[] value) throws IOException {
199    Put put;
200    ArrayList<Put> puts = new ArrayList<>();
201
202    for (int row = 0; row < rows.length; row++) {
203      put = new Put(rows[row]);
204      for (int fam = 0; fam < families.length; fam++) {
205        for (int qual = 0; qual < qualifiers.length; qual++) {
206          KeyValue kv = new KeyValue(rows[row], families[fam], qualifiers[qual], qual, value);
207          put.add(kv);
208        }
209      }
210      puts.add(put);
211    }
212
213    return puts;
214  }
215
216  @AfterAll
217  public static void tearDownAfterClass() throws Exception {
218    Closeables.close(CONN, true);
219    TEST_UTIL.shutdownMiniCluster();
220  }
221
222  @BeforeEach
223  public void setupBeforeTest() throws Exception {
224    disableSleeping();
225  }
226
227  @AfterEach
228  public void teardownAfterTest() throws Exception {
229    disableSleeping();
230  }
231
232  /**
233   * Run the test callable when heartbeats are enabled/disabled. We expect all tests to only pass
234   * when heartbeat messages are enabled (otherwise the test is pointless). When heartbeats are
235   * disabled, the test should throw an exception.
236   */
237  private void testImportanceOfHeartbeats(Callable<Void> testCallable) throws InterruptedException {
238    HeartbeatRPCServices.heartbeatsEnabled = true;
239
240    try {
241      testCallable.call();
242    } catch (Exception e) {
243      fail("Heartbeat messages are enabled, exceptions should NOT be thrown. Exception trace:"
244        + ExceptionUtils.getStackTrace(e));
245    }
246
247    HeartbeatRPCServices.heartbeatsEnabled = false;
248    try {
249      testCallable.call();
250    } catch (Exception e) {
251      return;
252    } finally {
253      HeartbeatRPCServices.heartbeatsEnabled = true;
254    }
255    fail("Heartbeats messages are disabled, an exception should be thrown. If an exception "
256      + " is not thrown, the test case is not testing the importance of heartbeat messages");
257  }
258
259  /**
260   * Test the case that the time limit for the scan is reached after each full row of cells is
261   * fetched.
262   */
263  @Test
264  public void testHeartbeatBetweenRows() throws Exception {
265    testImportanceOfHeartbeats(new Callable<Void>() {
266
267      @Override
268      public Void call() throws Exception {
269        // Configure the scan so that it can read the entire table in a single RPC. We want to test
270        // the case where a scan stops on the server side due to a time limit
271        Scan scan = new Scan();
272        scan.setMaxResultSize(Long.MAX_VALUE);
273        scan.setCaching(Integer.MAX_VALUE);
274
275        testEquivalenceOfScanWithHeartbeats(scan, DEFAULT_ROW_SLEEP_TIME, -1, false);
276        return null;
277      }
278    });
279  }
280
281  /**
282   * Test the case that the time limit for scans is reached in between column families
283   */
284  @Test
285  public void testHeartbeatBetweenColumnFamilies() throws Exception {
286    testImportanceOfHeartbeats(new Callable<Void>() {
287      @Override
288      public Void call() throws Exception {
289        // Configure the scan so that it can read the entire table in a single RPC. We want to test
290        // the case where a scan stops on the server side due to a time limit
291        Scan baseScan = new Scan();
292        baseScan.setMaxResultSize(Long.MAX_VALUE);
293        baseScan.setCaching(Integer.MAX_VALUE);
294
295        // Copy the scan before each test. When a scan object is used by a scanner, some of its
296        // fields may be changed such as start row
297        Scan scanCopy = new Scan(baseScan);
298        testEquivalenceOfScanWithHeartbeats(scanCopy, -1, DEFAULT_CF_SLEEP_TIME, false);
299        scanCopy = new Scan(baseScan);
300        testEquivalenceOfScanWithHeartbeats(scanCopy, -1, DEFAULT_CF_SLEEP_TIME, true);
301        return null;
302      }
303    });
304  }
305
306  public static class SparseCellFilter extends FilterBase {
307
308    @Override
309    public ReturnCode filterCell(final Cell v) throws IOException {
310      try {
311        Thread.sleep(CLIENT_TIMEOUT / 2 + 100);
312      } catch (InterruptedException e) {
313        Thread.currentThread().interrupt();
314      }
315      return Bytes.equals(CellUtil.cloneRow(v), ROWS[NUM_ROWS - 1])
316        ? ReturnCode.INCLUDE
317        : ReturnCode.SKIP;
318    }
319
320    public static Filter parseFrom(final byte[] pbBytes) {
321      return new SparseCellFilter();
322    }
323  }
324
325  public static class SparseRowFilter extends FilterBase {
326
327    @Override
328    public boolean filterRowKey(Cell cell) throws IOException {
329      try {
330        Thread.sleep(CLIENT_TIMEOUT / 2 - 100);
331      } catch (InterruptedException e) {
332        Thread.currentThread().interrupt();
333      }
334      return !Bytes.equals(CellUtil.cloneRow(cell), ROWS[NUM_ROWS - 1]);
335    }
336
337    public static Filter parseFrom(final byte[] pbBytes) {
338      return new SparseRowFilter();
339    }
340  }
341
342  /**
343   * Test the case that there is a filter which filters most of cells
344   */
345  @Test
346  public void testHeartbeatWithSparseCellFilter() throws Exception {
347    testImportanceOfHeartbeats(new Callable<Void>() {
348      @Override
349      public Void call() throws Exception {
350        Scan scan = new Scan();
351        scan.setMaxResultSize(Long.MAX_VALUE);
352        scan.setCaching(Integer.MAX_VALUE);
353        scan.setFilter(new SparseCellFilter());
354        try (ScanPerNextResultScanner scanner =
355          new ScanPerNextResultScanner(CONN.getTable(TABLE_NAME), scan)) {
356          int num = 0;
357          while (scanner.next() != null) {
358            num++;
359          }
360          assertEquals(1, num);
361        }
362
363        scan = new Scan();
364        scan.setMaxResultSize(Long.MAX_VALUE);
365        scan.setCaching(Integer.MAX_VALUE);
366        scan.setFilter(new SparseCellFilter());
367        scan.setAllowPartialResults(true);
368        try (ScanPerNextResultScanner scanner =
369          new ScanPerNextResultScanner(CONN.getTable(TABLE_NAME), scan)) {
370          int num = 0;
371          while (scanner.next() != null) {
372            num++;
373          }
374          assertEquals(NUM_FAMILIES * NUM_QUALIFIERS, num);
375        }
376
377        return null;
378      }
379    });
380  }
381
382  /**
383   * Test the case that there is a filter which filters most of rows
384   */
385  @Test
386  public void testHeartbeatWithSparseRowFilter() throws Exception {
387    testImportanceOfHeartbeats(new Callable<Void>() {
388      @Override
389      public Void call() throws Exception {
390        Scan scan = new Scan();
391        scan.setMaxResultSize(Long.MAX_VALUE);
392        scan.setCaching(Integer.MAX_VALUE);
393        scan.setFilter(new SparseRowFilter());
394        try (ScanPerNextResultScanner scanner =
395          new ScanPerNextResultScanner(CONN.getTable(TABLE_NAME), scan)) {
396          int num = 0;
397          while (scanner.next() != null) {
398            num++;
399          }
400          assertEquals(1, num);
401        }
402
403        return null;
404      }
405    });
406  }
407
408  /**
409   * Test the equivalence of a scan versus the same scan executed when heartbeat messages are
410   * necessary
411   * @param scan          The scan configuration being tested
412   * @param rowSleepTime  The time to sleep between fetches of row cells
413   * @param cfSleepTime   The time to sleep between fetches of column family cells
414   * @param sleepBeforeCf set to true when column family sleeps should occur before the cells for
415   *                      that column family are fetched
416   */
417  private void testEquivalenceOfScanWithHeartbeats(final Scan scan, int rowSleepTime,
418    int cfSleepTime, boolean sleepBeforeCf) throws Exception {
419    disableSleeping();
420    AsyncTable<AdvancedScanResultConsumer> table = CONN.getTable(TABLE_NAME);
421    final ResultScanner scanner = new ScanPerNextResultScanner(table, scan);
422    final ResultScanner scannerWithHeartbeats = new ScanPerNextResultScanner(table, scan);
423
424    Result r1 = null;
425    Result r2 = null;
426
427    while ((r1 = scanner.next()) != null) {
428      // Enforce the specified sleep conditions during calls to the heartbeat scanner
429      configureSleepTime(rowSleepTime, cfSleepTime, sleepBeforeCf);
430      r2 = scannerWithHeartbeats.next();
431      disableSleeping();
432
433      assertTrue(r2 != null);
434      try {
435        Result.compareResults(r1, r2);
436      } catch (Exception e) {
437        fail(e.getMessage());
438      }
439    }
440
441    assertTrue(scannerWithHeartbeats.next() == null);
442    scanner.close();
443    scannerWithHeartbeats.close();
444  }
445
446  /**
447   * Helper method for setting the time to sleep between rows and column families. If a sleep time
448   * is negative then that sleep will be disabled
449   */
450  private static void configureSleepTime(int rowSleepTime, int cfSleepTime, boolean sleepBeforeCf) {
451    HeartbeatHRegion.sleepBetweenRows = rowSleepTime > 0;
452    HeartbeatHRegion.rowSleepTime = rowSleepTime;
453
454    HeartbeatHRegion.sleepBetweenColumnFamilies = cfSleepTime > 0;
455    HeartbeatHRegion.columnFamilySleepTime = cfSleepTime;
456    HeartbeatHRegion.sleepBeforeColumnFamily = sleepBeforeCf;
457  }
458
459  /**
460   * Disable the sleeping mechanism server side.
461   */
462  private static void disableSleeping() {
463    HeartbeatHRegion.sleepBetweenRows = false;
464    HeartbeatHRegion.sleepBetweenColumnFamilies = false;
465  }
466
467  /**
468   * Custom HRegionServer instance that instantiates {@link HeartbeatRPCServices} in place of
469   * {@link RSRpcServices} to allow us to toggle support for heartbeat messages
470   */
471  private static class HeartbeatHRegionServer extends HRegionServer {
472    public HeartbeatHRegionServer(Configuration conf) throws IOException, InterruptedException {
473      super(conf);
474    }
475
476    @Override
477    protected RSRpcServices createRpcServices() throws IOException {
478      return new HeartbeatRPCServices(this);
479    }
480  }
481
482  /**
483   * Custom RSRpcServices instance that allows heartbeat support to be toggled
484   */
485  private static class HeartbeatRPCServices extends RSRpcServices {
486    private static volatile boolean heartbeatsEnabled = true;
487
488    public HeartbeatRPCServices(HRegionServer rs) throws IOException {
489      super(rs);
490    }
491
492    @Override
493    public ScanResponse scan(RpcController controller, ScanRequest request)
494      throws ServiceException {
495      ScanRequest.Builder builder = ScanRequest.newBuilder(request);
496      builder.setClientHandlesHeartbeats(heartbeatsEnabled);
497      return super.scan(controller, builder.build());
498    }
499  }
500
501  /**
502   * Custom HRegion class that instantiates {@link RegionScanner}s with configurable sleep times
503   * between fetches of row Results and/or column family cells. Useful for emulating an instance
504   * where the server is taking a long time to process a client's scan request
505   */
506  private static class HeartbeatHRegion extends HRegion {
507    // Row sleeps occur AFTER each row worth of cells is retrieved.
508    private static volatile int rowSleepTime = DEFAULT_ROW_SLEEP_TIME;
509    private static volatile boolean sleepBetweenRows = false;
510
511    // The sleep for column families can be initiated before or after we fetch the cells for the
512    // column family. If the sleep occurs BEFORE then the time limits will be reached inside
513    // StoreScanner while we are fetching individual cells. If the sleep occurs AFTER then the time
514    // limit will be reached inside RegionScanner after all the cells for a column family have been
515    // retrieved.
516    private static volatile boolean sleepBeforeColumnFamily = false;
517    private static volatile int columnFamilySleepTime = DEFAULT_CF_SLEEP_TIME;
518    private static volatile boolean sleepBetweenColumnFamilies = false;
519
520    public HeartbeatHRegion(Path tableDir, WAL wal, FileSystem fs, Configuration confParam,
521      RegionInfo regionInfo, TableDescriptor htd, RegionServerServices rsServices) {
522      super(tableDir, wal, fs, confParam, regionInfo, htd, rsServices);
523    }
524
525    public HeartbeatHRegion(HRegionFileSystem fs, WAL wal, Configuration confParam,
526      TableDescriptor htd, RegionServerServices rsServices) {
527      super(fs, wal, confParam, htd, rsServices);
528    }
529
530    private static void columnFamilySleep() {
531      if (sleepBetweenColumnFamilies) {
532        Threads.sleepWithoutInterrupt(columnFamilySleepTime);
533      }
534    }
535
536    private static void rowSleep() {
537      if (sleepBetweenRows) {
538        Threads.sleepWithoutInterrupt(rowSleepTime);
539      }
540    }
541
542    // Instantiate the custom heartbeat region scanners
543    @Override
544    protected RegionScannerImpl instantiateRegionScanner(Scan scan,
545      List<KeyValueScanner> additionalScanners, long nonceGroup, long nonce) throws IOException {
546      if (scan.isReversed()) {
547        if (scan.getFilter() != null) {
548          scan.getFilter().setReversed(true);
549        }
550        return new HeartbeatReversedRegionScanner(scan, additionalScanners, this);
551      }
552      return new HeartbeatRegionScanner(scan, additionalScanners, this);
553    }
554  }
555
556  /**
557   * Custom ReversedRegionScanner that can be configured to sleep between retrievals of row Results
558   * and/or column family cells
559   */
560  private static class HeartbeatReversedRegionScanner extends ReversedRegionScannerImpl {
561    HeartbeatReversedRegionScanner(Scan scan, List<KeyValueScanner> additionalScanners,
562      HRegion region) throws IOException {
563      super(scan, additionalScanners, region, HConstants.NO_NONCE, HConstants.NO_NONCE);
564    }
565
566    @Override
567    public boolean nextRaw(List<? super ExtendedCell> outResults, ScannerContext context)
568      throws IOException {
569      boolean moreRows = super.nextRaw(outResults, context);
570      HeartbeatHRegion.rowSleep();
571      return moreRows;
572    }
573
574    @Override
575    protected void initializeKVHeap(List<KeyValueScanner> scanners,
576      List<KeyValueScanner> joinedScanners, HRegion region) throws IOException {
577      this.storeHeap =
578        new HeartbeatReversedKVHeap(scanners, (CellComparatorImpl) region.getCellComparator());
579      if (!joinedScanners.isEmpty()) {
580        this.joinedHeap = new HeartbeatReversedKVHeap(joinedScanners,
581          (CellComparatorImpl) region.getCellComparator());
582      }
583    }
584  }
585
586  /**
587   * Custom RegionScanner that can be configured to sleep between retrievals of row Results and/or
588   * column family cells
589   */
590  private static class HeartbeatRegionScanner extends RegionScannerImpl {
591    HeartbeatRegionScanner(Scan scan, List<KeyValueScanner> additionalScanners, HRegion region)
592      throws IOException {
593      super(scan, additionalScanners, region, HConstants.NO_NONCE, HConstants.NO_NONCE);
594    }
595
596    @Override
597    public boolean nextRaw(List<? super ExtendedCell> outResults, ScannerContext context)
598      throws IOException {
599      boolean moreRows = super.nextRaw(outResults, context);
600      HeartbeatHRegion.rowSleep();
601      return moreRows;
602    }
603
604    @Override
605    protected void initializeKVHeap(List<KeyValueScanner> scanners,
606      List<KeyValueScanner> joinedScanners, HRegion region) throws IOException {
607      this.storeHeap = new HeartbeatKVHeap(scanners, region.getCellComparator());
608      if (!joinedScanners.isEmpty()) {
609        this.joinedHeap = new HeartbeatKVHeap(joinedScanners, region.getCellComparator());
610      }
611    }
612  }
613
614  /**
615   * Custom KV Heap that can be configured to sleep/wait in between retrievals of column family
616   * cells. Useful for testing
617   */
618  private static final class HeartbeatKVHeap extends KeyValueHeap {
619    public HeartbeatKVHeap(List<? extends KeyValueScanner> scanners, CellComparator comparator)
620      throws IOException {
621      super(scanners, comparator);
622    }
623
624    HeartbeatKVHeap(List<? extends KeyValueScanner> scanners, KVScannerComparator comparator)
625      throws IOException {
626      super(scanners, comparator);
627    }
628
629    @Override
630    public boolean next(List<? super ExtendedCell> result, ScannerContext context)
631      throws IOException {
632      if (HeartbeatHRegion.sleepBeforeColumnFamily) HeartbeatHRegion.columnFamilySleep();
633      boolean moreRows = super.next(result, context);
634      if (!HeartbeatHRegion.sleepBeforeColumnFamily) HeartbeatHRegion.columnFamilySleep();
635      return moreRows;
636    }
637  }
638
639  /**
640   * Custom reversed KV Heap that can be configured to sleep in between retrievals of column family
641   * cells.
642   */
643  private static final class HeartbeatReversedKVHeap extends ReversedKeyValueHeap {
644    public HeartbeatReversedKVHeap(List<? extends KeyValueScanner> scanners,
645      CellComparatorImpl comparator) throws IOException {
646      super(scanners, comparator);
647    }
648
649    @Override
650    public boolean next(List<? super ExtendedCell> result, ScannerContext context)
651      throws IOException {
652      if (HeartbeatHRegion.sleepBeforeColumnFamily) HeartbeatHRegion.columnFamilySleep();
653      boolean moreRows = super.next(result, context);
654      if (!HeartbeatHRegion.sleepBeforeColumnFamily) HeartbeatHRegion.columnFamilySleep();
655      return moreRows;
656    }
657  }
658}