001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022import static org.junit.Assert.fail;
023
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.List;
027import java.util.concurrent.Callable;
028import org.apache.commons.lang3.exception.ExceptionUtils;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.FileSystem;
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.hbase.Cell;
033import org.apache.hadoop.hbase.CellComparator;
034import org.apache.hadoop.hbase.CellComparatorImpl;
035import org.apache.hadoop.hbase.CellUtil;
036import org.apache.hadoop.hbase.HBaseClassTestRule;
037import org.apache.hadoop.hbase.HBaseTestingUtility;
038import org.apache.hadoop.hbase.HConstants;
039import org.apache.hadoop.hbase.HTestConst;
040import org.apache.hadoop.hbase.KeyValue;
041import org.apache.hadoop.hbase.TableName;
042import org.apache.hadoop.hbase.client.Put;
043import org.apache.hadoop.hbase.client.RegionInfo;
044import org.apache.hadoop.hbase.client.Result;
045import org.apache.hadoop.hbase.client.ResultScanner;
046import org.apache.hadoop.hbase.client.Scan;
047import org.apache.hadoop.hbase.client.Table;
048import org.apache.hadoop.hbase.client.TableDescriptor;
049import org.apache.hadoop.hbase.filter.Filter;
050import org.apache.hadoop.hbase.filter.FilterBase;
051import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
052import org.apache.hadoop.hbase.testclassification.MediumTests;
053import org.apache.hadoop.hbase.util.Bytes;
054import org.apache.hadoop.hbase.util.Threads;
055import org.apache.hadoop.hbase.wal.WAL;
056import org.junit.After;
057import org.junit.AfterClass;
058import org.junit.Before;
059import org.junit.BeforeClass;
060import org.junit.ClassRule;
061import org.junit.Test;
062import org.junit.experimental.categories.Category;
063
064import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
065import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
066
067import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
068import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
069
070/**
071 * Here we test to make sure that scans return the expected Results when the server is sending the
072 * Client heartbeat messages. Heartbeat messages are essentially keep-alive messages (they prevent
073 * the scanner on the client side from timing out). A heartbeat message is sent from the server to
074 * the client when the server has exceeded the time limit during the processing of the scan. When
075 * the time limit is reached, the server will return to the Client whatever Results it has
076 * accumulated (potentially empty).
077 */
078@Category(MediumTests.class)
079public class TestScannerHeartbeatMessages {
080
081  @ClassRule
082  public static final HBaseClassTestRule CLASS_RULE =
083      HBaseClassTestRule.forClass(TestScannerHeartbeatMessages.class);
084
085  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
086
087  private static Table TABLE = null;
088
089  /**
090   * Table configuration
091   */
092  private static TableName TABLE_NAME = TableName.valueOf("testScannerHeartbeatMessagesTable");
093
094  private static int NUM_ROWS = 5;
095  private static byte[] ROW = Bytes.toBytes("testRow");
096  private static byte[][] ROWS = HTestConst.makeNAscii(ROW, NUM_ROWS);
097
098  private static int NUM_FAMILIES = 4;
099  private static byte[] FAMILY = Bytes.toBytes("testFamily");
100  private static byte[][] FAMILIES = HTestConst.makeNAscii(FAMILY, NUM_FAMILIES);
101
102  private static int NUM_QUALIFIERS = 3;
103  private static byte[] QUALIFIER = Bytes.toBytes("testQualifier");
104  private static byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, NUM_QUALIFIERS);
105
106  private static int VALUE_SIZE = 128;
107  private static byte[] VALUE = Bytes.createMaxByteArray(VALUE_SIZE);
108
109  // The time limit should be based on the rpc timeout at client, or the client will regards
110  // the request as timeout before server return a heartbeat.
111  private static int SERVER_TIMEOUT = 60000;
112
113  // Time, in milliseconds, that the client will wait for a response from the server before timing
114  // out. This value is used server side to determine when it is necessary to send a heartbeat
115  // message to the client. Time limit will be 500 ms.
116  private static int CLIENT_TIMEOUT = 1000;
117
118  // In this test, we sleep after reading each row. So we should make sure after we get some number
119  // of rows and sleep same times we must reach time limit, and do not timeout after next sleeping.
120  private static int DEFAULT_ROW_SLEEP_TIME = 300;
121
122  // Similar with row sleep time.
123  private static int DEFAULT_CF_SLEEP_TIME = 300;
124
125  @BeforeClass
126  public static void setUpBeforeClass() throws Exception {
127    Configuration conf = TEST_UTIL.getConfiguration();
128
129    conf.setStrings(HConstants.REGION_IMPL, HeartbeatHRegion.class.getName());
130    conf.setStrings(HConstants.REGION_SERVER_IMPL, HeartbeatHRegionServer.class.getName());
131    conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, SERVER_TIMEOUT);
132    conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, SERVER_TIMEOUT);
133    conf.setInt(HConstants.HBASE_CLIENT_PAUSE, 1);
134
135    // Check the timeout condition after every cell
136    conf.setLong(StoreScanner.HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK, 1);
137    TEST_UTIL.startMiniCluster(1);
138
139    TABLE = createTestTable(TABLE_NAME, ROWS, FAMILIES, QUALIFIERS, VALUE);
140  }
141
142  static Table createTestTable(TableName name, byte[][] rows, byte[][] families,
143      byte[][] qualifiers, byte[] cellValue) throws IOException {
144    Table ht = TEST_UTIL.createTable(name, families);
145    List<Put> puts = createPuts(rows, families, qualifiers, cellValue);
146    ht.put(puts);
147    ht.getConfiguration().setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, CLIENT_TIMEOUT);
148    return ht;
149  }
150
151  /**
152   * Make puts to put the input value into each combination of row, family, and qualifier
153   */
154  static ArrayList<Put> createPuts(byte[][] rows, byte[][] families, byte[][] qualifiers,
155      byte[] value) throws IOException {
156    Put put;
157    ArrayList<Put> puts = new ArrayList<>();
158
159    for (int row = 0; row < rows.length; row++) {
160      put = new Put(rows[row]);
161      for (int fam = 0; fam < families.length; fam++) {
162        for (int qual = 0; qual < qualifiers.length; qual++) {
163          KeyValue kv = new KeyValue(rows[row], families[fam], qualifiers[qual], qual, value);
164          put.add(kv);
165        }
166      }
167      puts.add(put);
168    }
169
170    return puts;
171  }
172
173  @AfterClass
174  public static void tearDownAfterClass() throws Exception {
175    TEST_UTIL.shutdownMiniCluster();
176  }
177
178  @Before
179  public void setupBeforeTest() throws Exception {
180    disableSleeping();
181  }
182
183  @After
184  public void teardownAfterTest() throws Exception {
185    disableSleeping();
186  }
187
188  /**
189   * Run the test callable when heartbeats are enabled/disabled. We expect all tests to only pass
190   * when heartbeat messages are enabled (otherwise the test is pointless). When heartbeats are
191   * disabled, the test should throw an exception.
192   */
193  private void testImportanceOfHeartbeats(Callable<Void> testCallable) throws InterruptedException {
194    HeartbeatRPCServices.heartbeatsEnabled = true;
195
196    try {
197      testCallable.call();
198    } catch (Exception e) {
199      fail("Heartbeat messages are enabled, exceptions should NOT be thrown. Exception trace:"
200          + ExceptionUtils.getStackTrace(e));
201    }
202
203    HeartbeatRPCServices.heartbeatsEnabled = false;
204    try {
205      testCallable.call();
206    } catch (Exception e) {
207      return;
208    } finally {
209      HeartbeatRPCServices.heartbeatsEnabled = true;
210    }
211    fail("Heartbeats messages are disabled, an exception should be thrown. If an exception "
212        + " is not thrown, the test case is not testing the importance of heartbeat messages");
213  }
214
215  /**
216   * Test the case that the time limit for the scan is reached after each full row of cells is
217   * fetched.
218   */
219  @Test
220  public void testHeartbeatBetweenRows() throws Exception {
221    testImportanceOfHeartbeats(new Callable<Void>() {
222
223      @Override
224      public Void call() throws Exception {
225        // Configure the scan so that it can read the entire table in a single RPC. We want to test
226        // the case where a scan stops on the server side due to a time limit
227        Scan scan = new Scan();
228        scan.setMaxResultSize(Long.MAX_VALUE);
229        scan.setCaching(Integer.MAX_VALUE);
230
231        testEquivalenceOfScanWithHeartbeats(scan, DEFAULT_ROW_SLEEP_TIME, -1, false);
232        return null;
233      }
234    });
235  }
236
237  /**
238   * Test the case that the time limit for scans is reached in between column families
239   */
240  @Test
241  public void testHeartbeatBetweenColumnFamilies() throws Exception {
242    testImportanceOfHeartbeats(new Callable<Void>() {
243      @Override
244      public Void call() throws Exception {
245        // Configure the scan so that it can read the entire table in a single RPC. We want to test
246        // the case where a scan stops on the server side due to a time limit
247        Scan baseScan = new Scan();
248        baseScan.setMaxResultSize(Long.MAX_VALUE);
249        baseScan.setCaching(Integer.MAX_VALUE);
250
251        // Copy the scan before each test. When a scan object is used by a scanner, some of its
252        // fields may be changed such as start row
253        Scan scanCopy = new Scan(baseScan);
254        testEquivalenceOfScanWithHeartbeats(scanCopy, -1, DEFAULT_CF_SLEEP_TIME, false);
255        scanCopy = new Scan(baseScan);
256        testEquivalenceOfScanWithHeartbeats(scanCopy, -1, DEFAULT_CF_SLEEP_TIME, true);
257        return null;
258      }
259    });
260  }
261
262  public static class SparseCellFilter extends FilterBase {
263
264    @Override
265    public ReturnCode filterCell(final Cell v) throws IOException {
266      try {
267        Thread.sleep(CLIENT_TIMEOUT / 2 + 100);
268      } catch (InterruptedException e) {
269        Thread.currentThread().interrupt();
270      }
271      return Bytes.equals(CellUtil.cloneRow(v), ROWS[NUM_ROWS - 1]) ? ReturnCode.INCLUDE
272          : ReturnCode.SKIP;
273    }
274
275    public static Filter parseFrom(final byte[] pbBytes) {
276      return new SparseCellFilter();
277    }
278  }
279
280  public static class SparseRowFilter extends FilterBase {
281
282    @Override
283    public boolean filterRowKey(Cell cell) throws IOException {
284      try {
285        Thread.sleep(CLIENT_TIMEOUT / 2 - 100);
286      } catch (InterruptedException e) {
287        Thread.currentThread().interrupt();
288      }
289      return !Bytes.equals(CellUtil.cloneRow(cell), ROWS[NUM_ROWS - 1]);
290    }
291
292    public static Filter parseFrom(final byte[] pbBytes) {
293      return new SparseRowFilter();
294    }
295  }
296
297  /**
298   * Test the case that there is a filter which filters most of cells
299   */
300  @Test
301  public void testHeartbeatWithSparseCellFilter() throws Exception {
302    testImportanceOfHeartbeats(new Callable<Void>() {
303      @Override
304      public Void call() throws Exception {
305        Scan scan = new Scan();
306        scan.setMaxResultSize(Long.MAX_VALUE);
307        scan.setCaching(Integer.MAX_VALUE);
308        scan.setFilter(new SparseCellFilter());
309        ResultScanner scanner = TABLE.getScanner(scan);
310        int num = 0;
311        while (scanner.next() != null) {
312          num++;
313        }
314        assertEquals(1, num);
315        scanner.close();
316
317        scan = new Scan();
318        scan.setMaxResultSize(Long.MAX_VALUE);
319        scan.setCaching(Integer.MAX_VALUE);
320        scan.setFilter(new SparseCellFilter());
321        scan.setAllowPartialResults(true);
322        scanner = TABLE.getScanner(scan);
323        num = 0;
324        while (scanner.next() != null) {
325          num++;
326        }
327        assertEquals(NUM_FAMILIES * NUM_QUALIFIERS, num);
328        scanner.close();
329
330        return null;
331      }
332    });
333  }
334
335  /**
336   * Test the case that there is a filter which filters most of rows
337   */
338  @Test
339  public void testHeartbeatWithSparseRowFilter() throws Exception {
340    testImportanceOfHeartbeats(new Callable<Void>() {
341      @Override
342      public Void call() throws Exception {
343        Scan scan = new Scan();
344        scan.setMaxResultSize(Long.MAX_VALUE);
345        scan.setCaching(Integer.MAX_VALUE);
346        scan.setFilter(new SparseRowFilter());
347        ResultScanner scanner = TABLE.getScanner(scan);
348        int num = 0;
349        while (scanner.next() != null) {
350          num++;
351        }
352        assertEquals(1, num);
353        scanner.close();
354
355        return null;
356      }
357    });
358  }
359
360  /**
361   * Test the equivalence of a scan versus the same scan executed when heartbeat messages are
362   * necessary
363   * @param scan The scan configuration being tested
364   * @param rowSleepTime The time to sleep between fetches of row cells
365   * @param cfSleepTime The time to sleep between fetches of column family cells
366   * @param sleepBeforeCf set to true when column family sleeps should occur before the cells for
367   *          that column family are fetched
368   */
369  private void testEquivalenceOfScanWithHeartbeats(final Scan scan, int rowSleepTime,
370      int cfSleepTime, boolean sleepBeforeCf) throws Exception {
371    disableSleeping();
372    final ResultScanner scanner = TABLE.getScanner(scan);
373    final ResultScanner scannerWithHeartbeats = TABLE.getScanner(scan);
374
375    Result r1 = null;
376    Result r2 = null;
377
378    while ((r1 = scanner.next()) != null) {
379      // Enforce the specified sleep conditions during calls to the heartbeat scanner
380      configureSleepTime(rowSleepTime, cfSleepTime, sleepBeforeCf);
381      r2 = scannerWithHeartbeats.next();
382      disableSleeping();
383
384      assertTrue(r2 != null);
385      try {
386        Result.compareResults(r1, r2);
387      } catch (Exception e) {
388        fail(e.getMessage());
389      }
390    }
391
392    assertTrue(scannerWithHeartbeats.next() == null);
393    scanner.close();
394    scannerWithHeartbeats.close();
395  }
396
397  /**
398   * Helper method for setting the time to sleep between rows and column families. If a sleep time
399   * is negative then that sleep will be disabled
400   */
401  private static void configureSleepTime(int rowSleepTime, int cfSleepTime, boolean sleepBeforeCf) {
402    HeartbeatHRegion.sleepBetweenRows = rowSleepTime > 0;
403    HeartbeatHRegion.rowSleepTime = rowSleepTime;
404
405    HeartbeatHRegion.sleepBetweenColumnFamilies = cfSleepTime > 0;
406    HeartbeatHRegion.columnFamilySleepTime = cfSleepTime;
407    HeartbeatHRegion.sleepBeforeColumnFamily = sleepBeforeCf;
408  }
409
410  /**
411   * Disable the sleeping mechanism server side.
412   */
413  private static void disableSleeping() {
414    HeartbeatHRegion.sleepBetweenRows = false;
415    HeartbeatHRegion.sleepBetweenColumnFamilies = false;
416  }
417
418  /**
419   * Custom HRegionServer instance that instantiates {@link HeartbeatRPCServices} in place of
420   * {@link RSRpcServices} to allow us to toggle support for heartbeat messages
421   */
422  private static class HeartbeatHRegionServer extends HRegionServer {
423    public HeartbeatHRegionServer(Configuration conf) throws IOException, InterruptedException {
424      super(conf);
425    }
426
427    @Override
428    protected RSRpcServices createRpcServices() throws IOException {
429      return new HeartbeatRPCServices(this);
430    }
431  }
432
433  /**
434   * Custom RSRpcServices instance that allows heartbeat support to be toggled
435   */
436  private static class HeartbeatRPCServices extends RSRpcServices {
437    private static volatile boolean heartbeatsEnabled = true;
438
439    public HeartbeatRPCServices(HRegionServer rs) throws IOException {
440      super(rs);
441    }
442
443    @Override
444    public ScanResponse scan(RpcController controller, ScanRequest request)
445        throws ServiceException {
446      ScanRequest.Builder builder = ScanRequest.newBuilder(request);
447      builder.setClientHandlesHeartbeats(heartbeatsEnabled);
448      return super.scan(controller, builder.build());
449    }
450  }
451
452  /**
453   * Custom HRegion class that instantiates {@link RegionScanner}s with configurable sleep times
454   * between fetches of row Results and/or column family cells. Useful for emulating an instance
455   * where the server is taking a long time to process a client's scan request
456   */
457  private static class HeartbeatHRegion extends HRegion {
458    // Row sleeps occur AFTER each row worth of cells is retrieved.
459    private static volatile int rowSleepTime = DEFAULT_ROW_SLEEP_TIME;
460    private static volatile boolean sleepBetweenRows = false;
461
462    // The sleep for column families can be initiated before or after we fetch the cells for the
463    // column family. If the sleep occurs BEFORE then the time limits will be reached inside
464    // StoreScanner while we are fetching individual cells. If the sleep occurs AFTER then the time
465    // limit will be reached inside RegionScanner after all the cells for a column family have been
466    // retrieved.
467    private static volatile boolean sleepBeforeColumnFamily = false;
468    private static volatile int columnFamilySleepTime = DEFAULT_CF_SLEEP_TIME;
469    private static volatile boolean sleepBetweenColumnFamilies = false;
470
471    public HeartbeatHRegion(Path tableDir, WAL wal, FileSystem fs, Configuration confParam,
472        RegionInfo regionInfo, TableDescriptor htd, RegionServerServices rsServices) {
473      super(tableDir, wal, fs, confParam, regionInfo, htd, rsServices);
474    }
475
476    public HeartbeatHRegion(HRegionFileSystem fs, WAL wal, Configuration confParam,
477        TableDescriptor htd, RegionServerServices rsServices) {
478      super(fs, wal, confParam, htd, rsServices);
479    }
480
481    private static void columnFamilySleep() {
482      if (sleepBetweenColumnFamilies) {
483        Threads.sleepWithoutInterrupt(columnFamilySleepTime);
484      }
485    }
486
487    private static void rowSleep() {
488      if (sleepBetweenRows) {
489        Threads.sleepWithoutInterrupt(rowSleepTime);
490      }
491    }
492
493    // Instantiate the custom heartbeat region scanners
494    @Override
495    protected RegionScannerImpl instantiateRegionScanner(Scan scan,
496        List<KeyValueScanner> additionalScanners, long nonceGroup, long nonce) throws IOException {
497      if (scan.isReversed()) {
498        if (scan.getFilter() != null) {
499          scan.getFilter().setReversed(true);
500        }
501        return new HeartbeatReversedRegionScanner(scan, additionalScanners, this);
502      }
503      return new HeartbeatRegionScanner(scan, additionalScanners, this);
504    }
505  }
506
507  /**
508   * Custom ReversedRegionScanner that can be configured to sleep between retrievals of row Results
509   * and/or column family cells
510   */
511  private static class HeartbeatReversedRegionScanner extends ReversedRegionScannerImpl {
512    HeartbeatReversedRegionScanner(Scan scan, List<KeyValueScanner> additionalScanners,
513        HRegion region) throws IOException {
514      super(scan, additionalScanners, region);
515    }
516
517    @Override
518    public boolean nextRaw(List<Cell> outResults, ScannerContext context) throws IOException {
519      boolean moreRows = super.nextRaw(outResults, context);
520      HeartbeatHRegion.rowSleep();
521      return moreRows;
522    }
523
524    @Override
525    protected void initializeKVHeap(List<KeyValueScanner> scanners,
526        List<KeyValueScanner> joinedScanners, HRegion region) throws IOException {
527      this.storeHeap =
528          new HeartbeatReversedKVHeap(scanners, (CellComparatorImpl) region.getCellComparator());
529      if (!joinedScanners.isEmpty()) {
530        this.joinedHeap = new HeartbeatReversedKVHeap(joinedScanners,
531            (CellComparatorImpl) region.getCellComparator());
532      }
533    }
534  }
535
536  /**
537   * Custom RegionScanner that can be configured to sleep between retrievals of row Results and/or
538   * column family cells
539   */
540  private static class HeartbeatRegionScanner extends RegionScannerImpl {
541    HeartbeatRegionScanner(Scan scan, List<KeyValueScanner> additionalScanners, HRegion region)
542        throws IOException {
543      region.super(scan, additionalScanners, region);
544    }
545
546    @Override
547    public boolean nextRaw(List<Cell> outResults, ScannerContext context) throws IOException {
548      boolean moreRows = super.nextRaw(outResults, context);
549      HeartbeatHRegion.rowSleep();
550      return moreRows;
551    }
552
553    @Override
554    protected void initializeKVHeap(List<KeyValueScanner> scanners,
555        List<KeyValueScanner> joinedScanners, HRegion region) throws IOException {
556      this.storeHeap =
557          new HeartbeatKVHeap(scanners, region.getCellComparator());
558      if (!joinedScanners.isEmpty()) {
559        this.joinedHeap =
560            new HeartbeatKVHeap(joinedScanners, region.getCellComparator());
561      }
562    }
563  }
564
565  /**
566   * Custom KV Heap that can be configured to sleep/wait in between retrievals of column family
567   * cells. Useful for testing
568   */
569  private static final class HeartbeatKVHeap extends KeyValueHeap {
570    public HeartbeatKVHeap(List<? extends KeyValueScanner> scanners, CellComparator comparator)
571        throws IOException {
572      super(scanners, comparator);
573    }
574
575    HeartbeatKVHeap(List<? extends KeyValueScanner> scanners, KVScannerComparator comparator)
576        throws IOException {
577      super(scanners, comparator);
578    }
579
580    @Override
581    public boolean next(List<Cell> result, ScannerContext context) throws IOException {
582      if (HeartbeatHRegion.sleepBeforeColumnFamily) HeartbeatHRegion.columnFamilySleep();
583      boolean moreRows = super.next(result, context);
584      if (!HeartbeatHRegion.sleepBeforeColumnFamily) HeartbeatHRegion.columnFamilySleep();
585      return moreRows;
586    }
587  }
588
589  /**
590   * Custom reversed KV Heap that can be configured to sleep in between retrievals of column family
591   * cells.
592   */
593  private static final class HeartbeatReversedKVHeap extends ReversedKeyValueHeap {
594    public HeartbeatReversedKVHeap(List<? extends KeyValueScanner> scanners,
595        CellComparatorImpl comparator) throws IOException {
596      super(scanners, comparator);
597    }
598
599    @Override
600    public boolean next(List<Cell> result, ScannerContext context) throws IOException {
601      if (HeartbeatHRegion.sleepBeforeColumnFamily) HeartbeatHRegion.columnFamilySleep();
602      boolean moreRows = super.next(result, context);
603      if (!HeartbeatHRegion.sleepBeforeColumnFamily) HeartbeatHRegion.columnFamilySleep();
604      return moreRows;
605    }
606  }
607}