001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.HBaseTestingUtil.START_KEY_BYTES;
021import static org.apache.hadoop.hbase.HBaseTestingUtil.fam1;
022import static org.apache.hadoop.hbase.HBaseTestingUtil.fam2;
023import static org.junit.Assert.assertEquals;
024import static org.junit.Assert.assertFalse;
025import static org.junit.Assert.assertNotNull;
026import static org.junit.Assert.assertTrue;
027import static org.junit.Assert.fail;
028
029import java.io.IOException;
030import java.util.ArrayList;
031import java.util.List;
032import org.apache.hadoop.hbase.Cell;
033import org.apache.hadoop.hbase.CellUtil;
034import org.apache.hadoop.hbase.HBaseClassTestRule;
035import org.apache.hadoop.hbase.HBaseTestingUtil;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.HTestConst;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.hadoop.hbase.UnknownScannerException;
040import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
041import org.apache.hadoop.hbase.client.Delete;
042import org.apache.hadoop.hbase.client.Get;
043import org.apache.hadoop.hbase.client.Put;
044import org.apache.hadoop.hbase.client.RegionInfo;
045import org.apache.hadoop.hbase.client.RegionInfoBuilder;
046import org.apache.hadoop.hbase.client.Result;
047import org.apache.hadoop.hbase.client.ResultScanner;
048import org.apache.hadoop.hbase.client.Scan;
049import org.apache.hadoop.hbase.client.Table;
050import org.apache.hadoop.hbase.client.TableDescriptor;
051import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
052import org.apache.hadoop.hbase.filter.Filter;
053import org.apache.hadoop.hbase.filter.InclusiveStopFilter;
054import org.apache.hadoop.hbase.filter.PrefixFilter;
055import org.apache.hadoop.hbase.filter.WhileMatchFilter;
056import org.apache.hadoop.hbase.testclassification.MediumTests;
057import org.apache.hadoop.hbase.testclassification.RegionServerTests;
058import org.apache.hadoop.hbase.util.Bytes;
059import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
060import org.junit.ClassRule;
061import org.junit.Rule;
062import org.junit.Test;
063import org.junit.experimental.categories.Category;
064import org.junit.rules.TestName;
065import org.slf4j.Logger;
066import org.slf4j.LoggerFactory;
067
068/**
069 * Test of a long-lived scanner validating as we go.
070 */
071@Category({ RegionServerTests.class, MediumTests.class })
072public class TestScanner {
073
074  @ClassRule
075  public static final HBaseClassTestRule CLASS_RULE =
076    HBaseClassTestRule.forClass(TestScanner.class);
077
078  @Rule
079  public TestName name = new TestName();
080
081  private static final Logger LOG = LoggerFactory.getLogger(TestScanner.class);
082  private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
083
084  private static final byte[] FIRST_ROW = HConstants.EMPTY_START_ROW;
085  private static final byte[][] COLS = { HConstants.CATALOG_FAMILY };
086  private static final byte[][] EXPLICIT_COLS =
087    { HConstants.REGIONINFO_QUALIFIER, HConstants.SERVER_QUALIFIER,
088    // TODO ryan
089    // HConstants.STARTCODE_QUALIFIER
090    };
091
092  static final TableDescriptor TESTTABLEDESC =
093    TableDescriptorBuilder.newBuilder(TableName.valueOf("testscanner"))
094      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(HConstants.CATALOG_FAMILY)
095        // Ten is an arbitrary number. Keep versions to help debugging.
096        .setMaxVersions(10).setBlockCacheEnabled(false).setBlocksize(8 * 1024).build())
097      .build();
098
099  /** HRegionInfo for root region */
100  public static final RegionInfo REGION_INFO =
101    RegionInfoBuilder.newBuilder(TESTTABLEDESC.getTableName()).build();
102
103  private static final byte[] ROW_KEY = REGION_INFO.getRegionName();
104
105  private static final long START_CODE = Long.MAX_VALUE;
106
107  private HRegion region;
108
109  private byte[] firstRowBytes, secondRowBytes, thirdRowBytes;
110  final private byte[] col1;
111
112  public TestScanner() {
113    super();
114
115    firstRowBytes = START_KEY_BYTES;
116    secondRowBytes = START_KEY_BYTES.clone();
117    // Increment the least significant character so we get to next row.
118    secondRowBytes[START_KEY_BYTES.length - 1]++;
119    thirdRowBytes = START_KEY_BYTES.clone();
120    thirdRowBytes[START_KEY_BYTES.length - 1] =
121      (byte) (thirdRowBytes[START_KEY_BYTES.length - 1] + 2);
122    col1 = Bytes.toBytes("column1");
123  }
124
125  /**
126   * Test basic stop row filter works.
127   */
128  @Test
129  public void testStopRow() throws Exception {
130    byte[] startrow = Bytes.toBytes("bbb");
131    byte[] stoprow = Bytes.toBytes("ccc");
132    try {
133      this.region = TEST_UTIL.createLocalHRegion(TESTTABLEDESC, null, null);
134      HTestConst.addContent(this.region, HConstants.CATALOG_FAMILY);
135      List<Cell> results = new ArrayList<>();
136      // Do simple test of getting one row only first.
137      Scan scan = new Scan().withStartRow(Bytes.toBytes("abc")).withStopRow(Bytes.toBytes("abd"));
138      scan.addFamily(HConstants.CATALOG_FAMILY);
139
140      InternalScanner s = region.getScanner(scan);
141      int count = 0;
142      while (s.next(results)) {
143        count++;
144      }
145      s.close();
146      assertEquals(0, count);
147      // Now do something a bit more imvolved.
148      scan = new Scan().withStartRow(startrow).withStopRow(stoprow);
149      scan.addFamily(HConstants.CATALOG_FAMILY);
150
151      s = region.getScanner(scan);
152      count = 0;
153      Cell kv = null;
154      results = new ArrayList<>();
155      for (boolean first = true; s.next(results);) {
156        kv = results.get(0);
157        if (first) {
158          assertTrue(CellUtil.matchingRows(kv, startrow));
159          first = false;
160        }
161        count++;
162      }
163      assertTrue(Bytes.BYTES_COMPARATOR.compare(stoprow, CellUtil.cloneRow(kv)) > 0);
164      // We got something back.
165      assertTrue(count > 10);
166      s.close();
167    } finally {
168      HBaseTestingUtil.closeRegionAndWAL(this.region);
169    }
170  }
171
172  void rowPrefixFilter(Scan scan) throws IOException {
173    List<Cell> results = new ArrayList<>();
174    scan.addFamily(HConstants.CATALOG_FAMILY);
175    InternalScanner s = region.getScanner(scan);
176    boolean hasMore = true;
177    while (hasMore) {
178      hasMore = s.next(results);
179      for (Cell kv : results) {
180        assertEquals((byte) 'a', CellUtil.cloneRow(kv)[0]);
181        assertEquals((byte) 'b', CellUtil.cloneRow(kv)[1]);
182      }
183      results.clear();
184    }
185    s.close();
186  }
187
188  void rowInclusiveStopFilter(Scan scan, byte[] stopRow) throws IOException {
189    List<Cell> results = new ArrayList<>();
190    scan.addFamily(HConstants.CATALOG_FAMILY);
191    InternalScanner s = region.getScanner(scan);
192    boolean hasMore = true;
193    while (hasMore) {
194      hasMore = s.next(results);
195      for (Cell kv : results) {
196        assertTrue(Bytes.compareTo(CellUtil.cloneRow(kv), stopRow) <= 0);
197      }
198      results.clear();
199    }
200    s.close();
201  }
202
203  @Test
204  public void testFilters() throws IOException {
205    try {
206      this.region = TEST_UTIL.createLocalHRegion(TESTTABLEDESC, null, null);
207      HTestConst.addContent(this.region, HConstants.CATALOG_FAMILY);
208      byte[] prefix = Bytes.toBytes("ab");
209      Filter newFilter = new PrefixFilter(prefix);
210      Scan scan = new Scan();
211      scan.setFilter(newFilter);
212      rowPrefixFilter(scan);
213
214      byte[] stopRow = Bytes.toBytes("bbc");
215      newFilter = new WhileMatchFilter(new InclusiveStopFilter(stopRow));
216      scan = new Scan();
217      scan.setFilter(newFilter);
218      rowInclusiveStopFilter(scan, stopRow);
219
220    } finally {
221      HBaseTestingUtil.closeRegionAndWAL(this.region);
222    }
223  }
224
225  /**
226   * Test that closing a scanner while a client is using it doesn't throw NPEs but instead a
227   * UnknownScannerException. HBASE-2503
228   */
229  @Test
230  public void testRaceBetweenClientAndTimeout() throws Exception {
231    try {
232      this.region = TEST_UTIL.createLocalHRegion(TESTTABLEDESC, null, null);
233      HTestConst.addContent(this.region, HConstants.CATALOG_FAMILY);
234      Scan scan = new Scan();
235      InternalScanner s = region.getScanner(scan);
236      List<Cell> results = new ArrayList<>();
237      try {
238        s.next(results);
239        s.close();
240        s.next(results);
241        fail("We don't want anything more, we should be failing");
242      } catch (UnknownScannerException ex) {
243        // ok!
244        return;
245      }
246    } finally {
247      HBaseTestingUtil.closeRegionAndWAL(this.region);
248    }
249  }
250
251  /**
252   * The test!
253   */
254  @Test
255  public void testScanner() throws IOException {
256    try {
257      region = TEST_UTIL.createLocalHRegion(TESTTABLEDESC, null, null);
258      Table table = new RegionAsTable(region);
259
260      // Write information to the meta table
261
262      Put put = new Put(ROW_KEY, EnvironmentEdgeManager.currentTime());
263
264      put.addColumn(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER,
265        RegionInfo.toByteArray(REGION_INFO));
266      table.put(put);
267
268      // What we just committed is in the memstore. Verify that we can get
269      // it back both with scanning and get
270
271      scan(false, null);
272      getRegionInfo(table);
273
274      // Close and re-open
275
276      ((HRegion) region).close();
277      region = HRegion.openHRegion(region, null);
278      table = new RegionAsTable(region);
279
280      // Verify we can get the data back now that it is on disk.
281
282      scan(false, null);
283      getRegionInfo(table);
284
285      // Store some new information
286
287      String address = HConstants.LOCALHOST_IP + ":" + HBaseTestingUtil.randomFreePort();
288
289      put = new Put(ROW_KEY, EnvironmentEdgeManager.currentTime());
290      put.addColumn(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER, Bytes.toBytes(address));
291
292      // put.add(HConstants.COL_STARTCODE, Bytes.toBytes(START_CODE));
293
294      table.put(put);
295
296      // Validate that we can still get the HRegionInfo, even though it is in
297      // an older row on disk and there is a newer row in the memstore
298
299      scan(true, address.toString());
300      getRegionInfo(table);
301
302      // flush cache
303      this.region.flush(true);
304
305      // Validate again
306
307      scan(true, address.toString());
308      getRegionInfo(table);
309
310      // Close and reopen
311
312      ((HRegion) region).close();
313      region = HRegion.openHRegion(region, null);
314      table = new RegionAsTable(region);
315
316      // Validate again
317
318      scan(true, address.toString());
319      getRegionInfo(table);
320
321      // Now update the information again
322
323      address = "bar.foo.com:4321";
324
325      put = new Put(ROW_KEY, EnvironmentEdgeManager.currentTime());
326
327      put.addColumn(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER, Bytes.toBytes(address));
328      table.put(put);
329
330      // Validate again
331
332      scan(true, address.toString());
333      getRegionInfo(table);
334
335      // flush cache
336
337      region.flush(true);
338
339      // Validate again
340
341      scan(true, address.toString());
342      getRegionInfo(table);
343
344      // Close and reopen
345
346      ((HRegion) this.region).close();
347      this.region = HRegion.openHRegion(region, null);
348      table = new RegionAsTable(this.region);
349
350      // Validate again
351
352      scan(true, address.toString());
353      getRegionInfo(table);
354
355    } finally {
356      // clean up
357      HBaseTestingUtil.closeRegionAndWAL(this.region);
358    }
359  }
360
361  /** Compare the HRegionInfo we read from HBase to what we stored */
362  private void validateRegionInfo(byte[] regionBytes) throws IOException {
363    RegionInfo info = RegionInfo.parseFromOrNull(regionBytes);
364
365    assertEquals(REGION_INFO.getRegionId(), info.getRegionId());
366    assertEquals(0, info.getStartKey().length);
367    assertEquals(0, info.getEndKey().length);
368    assertEquals(0, Bytes.compareTo(info.getRegionName(), REGION_INFO.getRegionName()));
369    // assertEquals(0, info.getTableDesc().compareTo(REGION_INFO.getTableDesc()));
370  }
371
372  /** Use a scanner to get the region info and then validate the results */
373  private void scan(boolean validateStartcode, String serverName) throws IOException {
374    InternalScanner scanner = null;
375    Scan scan = null;
376    List<Cell> results = new ArrayList<>();
377    byte[][][] scanColumns = { COLS, EXPLICIT_COLS };
378    for (int i = 0; i < scanColumns.length; i++) {
379      try {
380        scan = new Scan().withStartRow(FIRST_ROW);
381        for (int ii = 0; ii < EXPLICIT_COLS.length; ii++) {
382          scan.addColumn(COLS[0], EXPLICIT_COLS[ii]);
383        }
384        scanner = region.getScanner(scan);
385        while (scanner.next(results)) {
386          assertTrue(
387            hasColumn(results, HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER));
388          byte[] val = CellUtil.cloneValue(
389            getColumn(results, HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER));
390          validateRegionInfo(val);
391          if (validateStartcode) {
392            // assertTrue(hasColumn(results, HConstants.CATALOG_FAMILY,
393            // HConstants.STARTCODE_QUALIFIER));
394            // val = getColumn(results, HConstants.CATALOG_FAMILY,
395            // HConstants.STARTCODE_QUALIFIER).getValue();
396            assertNotNull(val);
397            assertFalse(val.length == 0);
398            long startCode = Bytes.toLong(val);
399            assertEquals(START_CODE, startCode);
400          }
401
402          if (serverName != null) {
403            assertTrue(hasColumn(results, HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER));
404            val = CellUtil.cloneValue(
405              getColumn(results, HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER));
406            assertNotNull(val);
407            assertFalse(val.length == 0);
408            String server = Bytes.toString(val);
409            assertEquals(0, server.compareTo(serverName));
410          }
411        }
412      } finally {
413        InternalScanner s = scanner;
414        scanner = null;
415        if (s != null) {
416          s.close();
417        }
418      }
419    }
420  }
421
422  private boolean hasColumn(final List<Cell> kvs, final byte[] family, final byte[] qualifier) {
423    for (Cell kv : kvs) {
424      if (CellUtil.matchingFamily(kv, family) && CellUtil.matchingQualifier(kv, qualifier)) {
425        return true;
426      }
427    }
428    return false;
429  }
430
431  private Cell getColumn(final List<Cell> kvs, final byte[] family, final byte[] qualifier) {
432    for (Cell kv : kvs) {
433      if (CellUtil.matchingFamily(kv, family) && CellUtil.matchingQualifier(kv, qualifier)) {
434        return kv;
435      }
436    }
437    return null;
438  }
439
440  /** Use get to retrieve the HRegionInfo and validate it */
441  private void getRegionInfo(Table table) throws IOException {
442    Get get = new Get(ROW_KEY);
443    get.addColumn(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER);
444    Result result = table.get(get);
445    byte[] bytes = result.value();
446    validateRegionInfo(bytes);
447  }
448
449  /**
450   * Tests to do a sync flush during the middle of a scan. This is testing the StoreScanner update
451   * readers code essentially. This is not highly concurrent, since its all 1 thread. HBase-910.
452   */
453  @Test
454  public void testScanAndSyncFlush() throws Exception {
455    this.region = TEST_UTIL.createLocalHRegion(TESTTABLEDESC, null, null);
456    Table hri = new RegionAsTable(region);
457    try {
458      LOG.info("Added: " + HTestConst.addContent(hri, Bytes.toString(HConstants.CATALOG_FAMILY),
459        Bytes.toString(HConstants.REGIONINFO_QUALIFIER)));
460      int count = count(hri, -1, false);
461      assertEquals(count, count(hri, 100, false)); // do a sync flush.
462    } catch (Exception e) {
463      LOG.error("Failed", e);
464      throw e;
465    } finally {
466      HBaseTestingUtil.closeRegionAndWAL(this.region);
467    }
468  }
469
470  /**
471   * Tests to do a concurrent flush (using a 2nd thread) while scanning. This tests both the
472   * StoreScanner update readers and the transition from memstore -> snapshot -> store file.
473   */
474  @Test
475  public void testScanAndRealConcurrentFlush() throws Exception {
476    this.region = TEST_UTIL.createLocalHRegion(TESTTABLEDESC, null, null);
477    Table hri = new RegionAsTable(region);
478    try {
479      LOG.info("Added: " + HTestConst.addContent(hri, Bytes.toString(HConstants.CATALOG_FAMILY),
480        Bytes.toString(HConstants.REGIONINFO_QUALIFIER)));
481      int count = count(hri, -1, false);
482      assertEquals(count, count(hri, 100, true)); // do a true concurrent background thread flush
483    } catch (Exception e) {
484      LOG.error("Failed", e);
485      throw e;
486    } finally {
487      HBaseTestingUtil.closeRegionAndWAL(this.region);
488    }
489  }
490
491  /**
492   * Make sure scanner returns correct result when we run a major compaction with deletes.
493   */
494  @Test
495  public void testScanAndConcurrentMajorCompact() throws Exception {
496    TableDescriptor htd = TEST_UTIL.createTableDescriptor(TableName.valueOf(name.getMethodName()),
497      ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER,
498      ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED);
499    this.region = TEST_UTIL.createLocalHRegion(htd, null, null);
500    Table hri = new RegionAsTable(region);
501
502    try {
503      HTestConst.addContent(hri, Bytes.toString(fam1), Bytes.toString(col1), firstRowBytes,
504        secondRowBytes);
505      HTestConst.addContent(hri, Bytes.toString(fam2), Bytes.toString(col1), firstRowBytes,
506        secondRowBytes);
507
508      Delete dc = new Delete(firstRowBytes);
509      /* delete column1 of firstRow */
510      dc.addColumns(fam1, col1);
511      region.delete(dc);
512      region.flush(true);
513
514      HTestConst.addContent(hri, Bytes.toString(fam1), Bytes.toString(col1), secondRowBytes,
515        thirdRowBytes);
516      HTestConst.addContent(hri, Bytes.toString(fam2), Bytes.toString(col1), secondRowBytes,
517        thirdRowBytes);
518      region.flush(true);
519
520      InternalScanner s = region.getScanner(new Scan());
521      // run a major compact, column1 of firstRow will be cleaned.
522      region.compact(true);
523
524      List<Cell> results = new ArrayList<>();
525      s.next(results);
526
527      // make sure returns column2 of firstRow
528      assertTrue("result is not correct, keyValues : " + results, results.size() == 1);
529      assertTrue(CellUtil.matchingRows(results.get(0), firstRowBytes));
530      assertTrue(CellUtil.matchingFamily(results.get(0), fam2));
531
532      results = new ArrayList<>();
533      s.next(results);
534
535      // get secondRow
536      assertTrue(results.size() == 2);
537      assertTrue(CellUtil.matchingRows(results.get(0), secondRowBytes));
538      assertTrue(CellUtil.matchingFamily(results.get(0), fam1));
539      assertTrue(CellUtil.matchingFamily(results.get(1), fam2));
540    } finally {
541      HBaseTestingUtil.closeRegionAndWAL(this.region);
542    }
543  }
544
545  /**
546   * Count table.
547   * @param hri        Region
548   * @param flushIndex At what row we start the flush.
549   * @param concurrent if the flush should be concurrent or sync.
550   * @return Count of rows found.
551   */
552  private int count(final Table countTable, final int flushIndex, boolean concurrent)
553    throws Exception {
554    LOG.info("Taking out counting scan");
555    Scan scan = new Scan();
556    for (byte[] qualifier : EXPLICIT_COLS) {
557      scan.addColumn(HConstants.CATALOG_FAMILY, qualifier);
558    }
559    ResultScanner s = countTable.getScanner(scan);
560    int count = 0;
561    boolean justFlushed = false;
562    while (s.next() != null) {
563      if (justFlushed) {
564        LOG.info("after next() just after next flush");
565        justFlushed = false;
566      }
567      count++;
568      if (flushIndex == count) {
569        LOG.info("Starting flush at flush index " + flushIndex);
570        Thread t = new Thread() {
571          @Override
572          public void run() {
573            try {
574              region.flush(true);
575              LOG.info("Finishing flush");
576            } catch (IOException e) {
577              LOG.info("Failed flush cache");
578            }
579          }
580        };
581        t.start();
582        if (!concurrent) {
583          // sync flush
584          t.join();
585        }
586        LOG.info("Continuing on after kicking off background flush");
587        justFlushed = true;
588      }
589    }
590    s.close();
591    LOG.info("Found " + count + " items");
592    return count;
593  }
594}