001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.HBaseTestingUtil.START_KEY_BYTES;
021import static org.apache.hadoop.hbase.HBaseTestingUtil.fam1;
022import static org.apache.hadoop.hbase.HBaseTestingUtil.fam2;
023import static org.junit.jupiter.api.Assertions.assertEquals;
024import static org.junit.jupiter.api.Assertions.assertFalse;
025import static org.junit.jupiter.api.Assertions.assertNotNull;
026import static org.junit.jupiter.api.Assertions.assertTrue;
027import static org.junit.jupiter.api.Assertions.fail;
028
029import java.io.IOException;
030import java.util.ArrayList;
031import java.util.List;
032import org.apache.hadoop.hbase.Cell;
033import org.apache.hadoop.hbase.CellUtil;
034import org.apache.hadoop.hbase.HBaseTestingUtil;
035import org.apache.hadoop.hbase.HConstants;
036import org.apache.hadoop.hbase.HTestConst;
037import org.apache.hadoop.hbase.TableName;
038import org.apache.hadoop.hbase.UnknownScannerException;
039import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
040import org.apache.hadoop.hbase.client.Delete;
041import org.apache.hadoop.hbase.client.Get;
042import org.apache.hadoop.hbase.client.Put;
043import org.apache.hadoop.hbase.client.RegionInfo;
044import org.apache.hadoop.hbase.client.RegionInfoBuilder;
045import org.apache.hadoop.hbase.client.Result;
046import org.apache.hadoop.hbase.client.ResultScanner;
047import org.apache.hadoop.hbase.client.Scan;
048import org.apache.hadoop.hbase.client.Table;
049import org.apache.hadoop.hbase.client.TableDescriptor;
050import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
051import org.apache.hadoop.hbase.filter.Filter;
052import org.apache.hadoop.hbase.filter.InclusiveStopFilter;
053import org.apache.hadoop.hbase.filter.PrefixFilter;
054import org.apache.hadoop.hbase.filter.WhileMatchFilter;
055import org.apache.hadoop.hbase.testclassification.MediumTests;
056import org.apache.hadoop.hbase.testclassification.RegionServerTests;
057import org.apache.hadoop.hbase.util.Bytes;
058import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
059import org.junit.jupiter.api.Tag;
060import org.junit.jupiter.api.Test;
061import org.junit.jupiter.api.TestInfo;
062import org.slf4j.Logger;
063import org.slf4j.LoggerFactory;
064
065/**
066 * Test of a long-lived scanner validating as we go.
067 */
068@Tag(RegionServerTests.TAG)
069@Tag(MediumTests.TAG)
070public class TestScanner {
071
072  private static final Logger LOG = LoggerFactory.getLogger(TestScanner.class);
073  private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
074
075  private static final byte[] FIRST_ROW = HConstants.EMPTY_START_ROW;
076  private static final byte[][] COLS = { HConstants.CATALOG_FAMILY };
077  private static final byte[][] EXPLICIT_COLS =
078    { HConstants.REGIONINFO_QUALIFIER, HConstants.SERVER_QUALIFIER,
079    // TODO ryan
080    // HConstants.STARTCODE_QUALIFIER
081    };
082
083  static final TableDescriptor TESTTABLEDESC =
084    TableDescriptorBuilder.newBuilder(TableName.valueOf("testscanner"))
085      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(HConstants.CATALOG_FAMILY)
086        // Ten is an arbitrary number. Keep versions to help debugging.
087        .setMaxVersions(10).setBlockCacheEnabled(false).setBlocksize(8 * 1024).build())
088      .build();
089
090  /** HRegionInfo for root region */
091  public static final RegionInfo REGION_INFO =
092    RegionInfoBuilder.newBuilder(TESTTABLEDESC.getTableName()).build();
093
094  private static final byte[] ROW_KEY = REGION_INFO.getRegionName();
095
096  private static final long START_CODE = Long.MAX_VALUE;
097
098  private HRegion region;
099
100  private byte[] firstRowBytes, secondRowBytes, thirdRowBytes;
101  final private byte[] col1;
102
103  public TestScanner() {
104    super();
105
106    firstRowBytes = START_KEY_BYTES;
107    secondRowBytes = START_KEY_BYTES.clone();
108    // Increment the least significant character so we get to next row.
109    secondRowBytes[START_KEY_BYTES.length - 1]++;
110    thirdRowBytes = START_KEY_BYTES.clone();
111    thirdRowBytes[START_KEY_BYTES.length - 1] =
112      (byte) (thirdRowBytes[START_KEY_BYTES.length - 1] + 2);
113    col1 = Bytes.toBytes("column1");
114  }
115
116  /**
117   * Test basic stop row filter works.
118   */
119  @Test
120  public void testStopRow() throws Exception {
121    byte[] startrow = Bytes.toBytes("bbb");
122    byte[] stoprow = Bytes.toBytes("ccc");
123    try {
124      this.region = TEST_UTIL.createLocalHRegion(TESTTABLEDESC, null, null);
125      HTestConst.addContent(this.region, HConstants.CATALOG_FAMILY);
126      List<Cell> results = new ArrayList<>();
127      // Do simple test of getting one row only first.
128      Scan scan = new Scan().withStartRow(Bytes.toBytes("abc")).withStopRow(Bytes.toBytes("abd"));
129      scan.addFamily(HConstants.CATALOG_FAMILY);
130
131      InternalScanner s = region.getScanner(scan);
132      int count = 0;
133      while (s.next(results)) {
134        count++;
135      }
136      s.close();
137      assertEquals(0, count);
138      // Now do something a bit more imvolved.
139      scan = new Scan().withStartRow(startrow).withStopRow(stoprow);
140      scan.addFamily(HConstants.CATALOG_FAMILY);
141
142      s = region.getScanner(scan);
143      count = 0;
144      Cell kv = null;
145      results = new ArrayList<>();
146      for (boolean first = true; s.next(results);) {
147        kv = results.get(0);
148        if (first) {
149          assertTrue(CellUtil.matchingRows(kv, startrow));
150          first = false;
151        }
152        count++;
153      }
154      assertTrue(Bytes.BYTES_COMPARATOR.compare(stoprow, CellUtil.cloneRow(kv)) > 0);
155      // We got something back.
156      assertTrue(count > 10);
157      s.close();
158    } finally {
159      HBaseTestingUtil.closeRegionAndWAL(this.region);
160    }
161  }
162
163  void rowPrefixFilter(Scan scan) throws IOException {
164    List<Cell> results = new ArrayList<>();
165    scan.addFamily(HConstants.CATALOG_FAMILY);
166    InternalScanner s = region.getScanner(scan);
167    boolean hasMore = true;
168    while (hasMore) {
169      hasMore = s.next(results);
170      for (Cell kv : results) {
171        assertEquals((byte) 'a', CellUtil.cloneRow(kv)[0]);
172        assertEquals((byte) 'b', CellUtil.cloneRow(kv)[1]);
173      }
174      results.clear();
175    }
176    s.close();
177  }
178
179  void rowInclusiveStopFilter(Scan scan, byte[] stopRow) throws IOException {
180    List<Cell> results = new ArrayList<>();
181    scan.addFamily(HConstants.CATALOG_FAMILY);
182    InternalScanner s = region.getScanner(scan);
183    boolean hasMore = true;
184    while (hasMore) {
185      hasMore = s.next(results);
186      for (Cell kv : results) {
187        assertTrue(Bytes.compareTo(CellUtil.cloneRow(kv), stopRow) <= 0);
188      }
189      results.clear();
190    }
191    s.close();
192  }
193
194  @Test
195  public void testFilters() throws IOException {
196    try {
197      this.region = TEST_UTIL.createLocalHRegion(TESTTABLEDESC, null, null);
198      HTestConst.addContent(this.region, HConstants.CATALOG_FAMILY);
199      byte[] prefix = Bytes.toBytes("ab");
200      Filter newFilter = new PrefixFilter(prefix);
201      Scan scan = new Scan();
202      scan.setFilter(newFilter);
203      rowPrefixFilter(scan);
204
205      byte[] stopRow = Bytes.toBytes("bbc");
206      newFilter = new WhileMatchFilter(new InclusiveStopFilter(stopRow));
207      scan = new Scan();
208      scan.setFilter(newFilter);
209      rowInclusiveStopFilter(scan, stopRow);
210
211    } finally {
212      HBaseTestingUtil.closeRegionAndWAL(this.region);
213    }
214  }
215
216  /**
217   * Test that closing a scanner while a client is using it doesn't throw NPEs but instead a
218   * UnknownScannerException. HBASE-2503
219   */
220  @Test
221  public void testRaceBetweenClientAndTimeout() throws Exception {
222    try {
223      this.region = TEST_UTIL.createLocalHRegion(TESTTABLEDESC, null, null);
224      HTestConst.addContent(this.region, HConstants.CATALOG_FAMILY);
225      Scan scan = new Scan();
226      InternalScanner s = region.getScanner(scan);
227      List<Cell> results = new ArrayList<>();
228      try {
229        s.next(results);
230        s.close();
231        s.next(results);
232        fail("We don't want anything more, we should be failing");
233      } catch (UnknownScannerException ex) {
234        // ok!
235        return;
236      }
237    } finally {
238      HBaseTestingUtil.closeRegionAndWAL(this.region);
239    }
240  }
241
242  /**
243   * The test!
244   */
245  @Test
246  public void testScanner() throws IOException {
247    try {
248      region = TEST_UTIL.createLocalHRegion(TESTTABLEDESC, null, null);
249      Table table = new RegionAsTable(region);
250
251      // Write information to the meta table
252
253      Put put = new Put(ROW_KEY, EnvironmentEdgeManager.currentTime());
254
255      put.addColumn(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER,
256        RegionInfo.toByteArray(REGION_INFO));
257      table.put(put);
258
259      // What we just committed is in the memstore. Verify that we can get
260      // it back both with scanning and get
261
262      scan(false, null);
263      getRegionInfo(table);
264
265      // Close and re-open
266
267      ((HRegion) region).close();
268      region = HRegion.openHRegion(region, null);
269      table = new RegionAsTable(region);
270
271      // Verify we can get the data back now that it is on disk.
272
273      scan(false, null);
274      getRegionInfo(table);
275
276      // Store some new information
277
278      String address = HConstants.LOCALHOST_IP + ":" + HBaseTestingUtil.randomFreePort();
279
280      put = new Put(ROW_KEY, EnvironmentEdgeManager.currentTime());
281      put.addColumn(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER, Bytes.toBytes(address));
282
283      // put.add(HConstants.COL_STARTCODE, Bytes.toBytes(START_CODE));
284
285      table.put(put);
286
287      // Validate that we can still get the HRegionInfo, even though it is in
288      // an older row on disk and there is a newer row in the memstore
289
290      scan(true, address.toString());
291      getRegionInfo(table);
292
293      // flush cache
294      this.region.flush(true);
295
296      // Validate again
297
298      scan(true, address.toString());
299      getRegionInfo(table);
300
301      // Close and reopen
302
303      ((HRegion) region).close();
304      region = HRegion.openHRegion(region, null);
305      table = new RegionAsTable(region);
306
307      // Validate again
308
309      scan(true, address.toString());
310      getRegionInfo(table);
311
312      // Now update the information again
313
314      address = "bar.foo.com:4321";
315
316      put = new Put(ROW_KEY, EnvironmentEdgeManager.currentTime());
317
318      put.addColumn(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER, Bytes.toBytes(address));
319      table.put(put);
320
321      // Validate again
322
323      scan(true, address.toString());
324      getRegionInfo(table);
325
326      // flush cache
327
328      region.flush(true);
329
330      // Validate again
331
332      scan(true, address.toString());
333      getRegionInfo(table);
334
335      // Close and reopen
336
337      ((HRegion) this.region).close();
338      this.region = HRegion.openHRegion(region, null);
339      table = new RegionAsTable(this.region);
340
341      // Validate again
342
343      scan(true, address.toString());
344      getRegionInfo(table);
345
346    } finally {
347      // clean up
348      HBaseTestingUtil.closeRegionAndWAL(this.region);
349    }
350  }
351
352  /** Compare the HRegionInfo we read from HBase to what we stored */
353  private void validateRegionInfo(byte[] regionBytes) throws IOException {
354    RegionInfo info = RegionInfo.parseFromOrNull(regionBytes);
355
356    assertEquals(REGION_INFO.getRegionId(), info.getRegionId());
357    assertEquals(0, info.getStartKey().length);
358    assertEquals(0, info.getEndKey().length);
359    assertEquals(0, Bytes.compareTo(info.getRegionName(), REGION_INFO.getRegionName()));
360    // assertEquals(0, info.getTableDesc().compareTo(REGION_INFO.getTableDesc()));
361  }
362
363  /** Use a scanner to get the region info and then validate the results */
364  private void scan(boolean validateStartcode, String serverName) throws IOException {
365    InternalScanner scanner = null;
366    Scan scan = null;
367    List<Cell> results = new ArrayList<>();
368    byte[][][] scanColumns = { COLS, EXPLICIT_COLS };
369    for (int i = 0; i < scanColumns.length; i++) {
370      try {
371        scan = new Scan().withStartRow(FIRST_ROW);
372        for (int ii = 0; ii < EXPLICIT_COLS.length; ii++) {
373          scan.addColumn(COLS[0], EXPLICIT_COLS[ii]);
374        }
375        scanner = region.getScanner(scan);
376        while (scanner.next(results)) {
377          assertTrue(
378            hasColumn(results, HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER));
379          byte[] val = CellUtil.cloneValue(
380            getColumn(results, HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER));
381          validateRegionInfo(val);
382          if (validateStartcode) {
383            // assertTrue(hasColumn(results, HConstants.CATALOG_FAMILY,
384            // HConstants.STARTCODE_QUALIFIER));
385            // val = getColumn(results, HConstants.CATALOG_FAMILY,
386            // HConstants.STARTCODE_QUALIFIER).getValue();
387            assertNotNull(val);
388            assertFalse(val.length == 0);
389            long startCode = Bytes.toLong(val);
390            assertEquals(START_CODE, startCode);
391          }
392
393          if (serverName != null) {
394            assertTrue(hasColumn(results, HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER));
395            val = CellUtil.cloneValue(
396              getColumn(results, HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER));
397            assertNotNull(val);
398            assertFalse(val.length == 0);
399            String server = Bytes.toString(val);
400            assertEquals(0, server.compareTo(serverName));
401          }
402        }
403      } finally {
404        InternalScanner s = scanner;
405        scanner = null;
406        if (s != null) {
407          s.close();
408        }
409      }
410    }
411  }
412
413  private boolean hasColumn(final List<Cell> kvs, final byte[] family, final byte[] qualifier) {
414    for (Cell kv : kvs) {
415      if (CellUtil.matchingFamily(kv, family) && CellUtil.matchingQualifier(kv, qualifier)) {
416        return true;
417      }
418    }
419    return false;
420  }
421
422  private Cell getColumn(final List<Cell> kvs, final byte[] family, final byte[] qualifier) {
423    for (Cell kv : kvs) {
424      if (CellUtil.matchingFamily(kv, family) && CellUtil.matchingQualifier(kv, qualifier)) {
425        return kv;
426      }
427    }
428    return null;
429  }
430
431  /** Use get to retrieve the HRegionInfo and validate it */
432  private void getRegionInfo(Table table) throws IOException {
433    Get get = new Get(ROW_KEY);
434    get.addColumn(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER);
435    Result result = table.get(get);
436    byte[] bytes = result.value();
437    validateRegionInfo(bytes);
438  }
439
440  /**
441   * Tests to do a sync flush during the middle of a scan. This is testing the StoreScanner update
442   * readers code essentially. This is not highly concurrent, since its all 1 thread. HBase-910.
443   */
444  @Test
445  public void testScanAndSyncFlush() throws Exception {
446    this.region = TEST_UTIL.createLocalHRegion(TESTTABLEDESC, null, null);
447    Table hri = new RegionAsTable(region);
448    try {
449      LOG.info("Added: " + HTestConst.addContent(hri, Bytes.toString(HConstants.CATALOG_FAMILY),
450        Bytes.toString(HConstants.REGIONINFO_QUALIFIER)));
451      int count = count(hri, -1, false);
452      assertEquals(count, count(hri, 100, false)); // do a sync flush.
453    } catch (Exception e) {
454      LOG.error("Failed", e);
455      throw e;
456    } finally {
457      HBaseTestingUtil.closeRegionAndWAL(this.region);
458    }
459  }
460
461  /**
462   * Tests to do a concurrent flush (using a 2nd thread) while scanning. This tests both the
463   * StoreScanner update readers and the transition from memstore -> snapshot -> store file.
464   */
465  @Test
466  public void testScanAndRealConcurrentFlush() throws Exception {
467    this.region = TEST_UTIL.createLocalHRegion(TESTTABLEDESC, null, null);
468    Table hri = new RegionAsTable(region);
469    try {
470      LOG.info("Added: " + HTestConst.addContent(hri, Bytes.toString(HConstants.CATALOG_FAMILY),
471        Bytes.toString(HConstants.REGIONINFO_QUALIFIER)));
472      int count = count(hri, -1, false);
473      assertEquals(count, count(hri, 100, true)); // do a true concurrent background thread flush
474    } catch (Exception e) {
475      LOG.error("Failed", e);
476      throw e;
477    } finally {
478      HBaseTestingUtil.closeRegionAndWAL(this.region);
479    }
480  }
481
482  /**
483   * Make sure scanner returns correct result when we run a major compaction with deletes.
484   */
485  @Test
486  public void testScanAndConcurrentMajorCompact(TestInfo testInfo) throws Exception {
487    TableDescriptor htd =
488      TEST_UTIL.createTableDescriptor(TableName.valueOf(testInfo.getTestMethod().get().getName()),
489        ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER,
490        ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED);
491    this.region = TEST_UTIL.createLocalHRegion(htd, null, null);
492    Table hri = new RegionAsTable(region);
493
494    try {
495      HTestConst.addContent(hri, Bytes.toString(fam1), Bytes.toString(col1), firstRowBytes,
496        secondRowBytes);
497      HTestConst.addContent(hri, Bytes.toString(fam2), Bytes.toString(col1), firstRowBytes,
498        secondRowBytes);
499
500      Delete dc = new Delete(firstRowBytes);
501      /* delete column1 of firstRow */
502      dc.addColumns(fam1, col1);
503      region.delete(dc);
504      region.flush(true);
505
506      HTestConst.addContent(hri, Bytes.toString(fam1), Bytes.toString(col1), secondRowBytes,
507        thirdRowBytes);
508      HTestConst.addContent(hri, Bytes.toString(fam2), Bytes.toString(col1), secondRowBytes,
509        thirdRowBytes);
510      region.flush(true);
511
512      InternalScanner s = region.getScanner(new Scan());
513      // run a major compact, column1 of firstRow will be cleaned.
514      region.compact(true);
515
516      List<Cell> results = new ArrayList<>();
517      s.next(results);
518
519      // make sure returns column2 of firstRow
520      assertEquals(1, results.size(), "result is not correct, keyValues : " + results);
521      assertTrue(CellUtil.matchingRows(results.get(0), firstRowBytes));
522      assertTrue(CellUtil.matchingFamily(results.get(0), fam2));
523
524      results = new ArrayList<>();
525      s.next(results);
526
527      // get secondRow
528      assertEquals(2, results.size());
529      assertTrue(CellUtil.matchingRows(results.get(0), secondRowBytes));
530      assertTrue(CellUtil.matchingFamily(results.get(0), fam1));
531      assertTrue(CellUtil.matchingFamily(results.get(1), fam2));
532    } finally {
533      HBaseTestingUtil.closeRegionAndWAL(this.region);
534    }
535  }
536
537  /**
538   * Count table.
539   * @param countTable Table
540   * @param flushIndex At what row we start the flush.
541   * @param concurrent if the flush should be concurrent or sync.
542   * @return Count of rows found.
543   */
544  private int count(final Table countTable, final int flushIndex, boolean concurrent)
545    throws Exception {
546    LOG.info("Taking out counting scan");
547    Scan scan = new Scan();
548    for (byte[] qualifier : EXPLICIT_COLS) {
549      scan.addColumn(HConstants.CATALOG_FAMILY, qualifier);
550    }
551    ResultScanner s = countTable.getScanner(scan);
552    int count = 0;
553    boolean justFlushed = false;
554    while (s.next() != null) {
555      if (justFlushed) {
556        LOG.info("after next() just after next flush");
557        justFlushed = false;
558      }
559      count++;
560      if (flushIndex == count) {
561        LOG.info("Starting flush at flush index " + flushIndex);
562        Thread t = new Thread() {
563          @Override
564          public void run() {
565            try {
566              region.flush(true);
567              LOG.info("Finishing flush");
568            } catch (IOException e) {
569              LOG.info("Failed flush cache");
570            }
571          }
572        };
573        t.start();
574        if (!concurrent) {
575          // sync flush
576          t.join();
577        }
578        LOG.info("Continuing on after kicking off background flush");
579        justFlushed = true;
580      }
581    }
582    s.close();
583    LOG.info("Found " + count + " items");
584    return count;
585  }
586}