001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.HBaseTestingUtil.START_KEY_BYTES;
021import static org.apache.hadoop.hbase.HBaseTestingUtil.fam1;
022import static org.apache.hadoop.hbase.HBaseTestingUtil.fam2;
023import static org.junit.Assert.assertEquals;
024import static org.junit.Assert.assertFalse;
025import static org.junit.Assert.assertNotNull;
026import static org.junit.Assert.assertTrue;
027import static org.junit.Assert.fail;
028
029import java.io.IOException;
030import java.util.ArrayList;
031import java.util.List;
032import org.apache.hadoop.hbase.Cell;
033import org.apache.hadoop.hbase.CellUtil;
034import org.apache.hadoop.hbase.HBaseClassTestRule;
035import org.apache.hadoop.hbase.HBaseTestingUtil;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.HTestConst;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.hadoop.hbase.UnknownScannerException;
040import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
041import org.apache.hadoop.hbase.client.Delete;
042import org.apache.hadoop.hbase.client.Get;
043import org.apache.hadoop.hbase.client.Put;
044import org.apache.hadoop.hbase.client.RegionInfo;
045import org.apache.hadoop.hbase.client.RegionInfoBuilder;
046import org.apache.hadoop.hbase.client.Result;
047import org.apache.hadoop.hbase.client.ResultScanner;
048import org.apache.hadoop.hbase.client.Scan;
049import org.apache.hadoop.hbase.client.Table;
050import org.apache.hadoop.hbase.client.TableDescriptor;
051import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
052import org.apache.hadoop.hbase.filter.Filter;
053import org.apache.hadoop.hbase.filter.InclusiveStopFilter;
054import org.apache.hadoop.hbase.filter.PrefixFilter;
055import org.apache.hadoop.hbase.filter.WhileMatchFilter;
056import org.apache.hadoop.hbase.testclassification.MediumTests;
057import org.apache.hadoop.hbase.testclassification.RegionServerTests;
058import org.apache.hadoop.hbase.util.Bytes;
059import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
060import org.junit.ClassRule;
061import org.junit.Rule;
062import org.junit.Test;
063import org.junit.experimental.categories.Category;
064import org.junit.rules.TestName;
065import org.slf4j.Logger;
066import org.slf4j.LoggerFactory;
067
068/**
069 * Test of a long-lived scanner validating as we go.
070 */
071@Category({RegionServerTests.class, MediumTests.class})
072public class TestScanner {
073
074  @ClassRule
075  public static final HBaseClassTestRule CLASS_RULE =
076      HBaseClassTestRule.forClass(TestScanner.class);
077
078  @Rule public TestName name = new TestName();
079
080  private static final Logger LOG = LoggerFactory.getLogger(TestScanner.class);
081  private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
082
083  private static final byte [] FIRST_ROW = HConstants.EMPTY_START_ROW;
084  private static final byte [][] COLS = { HConstants.CATALOG_FAMILY };
085  private static final byte [][] EXPLICIT_COLS = {
086    HConstants.REGIONINFO_QUALIFIER, HConstants.SERVER_QUALIFIER,
087      // TODO ryan
088      //HConstants.STARTCODE_QUALIFIER
089  };
090
091  static final TableDescriptor TESTTABLEDESC =
092    TableDescriptorBuilder.newBuilder(TableName.valueOf("testscanner"))
093      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(HConstants.CATALOG_FAMILY)
094        // Ten is an arbitrary number. Keep versions to help debugging.
095        .setMaxVersions(10).setBlockCacheEnabled(false).setBlocksize(8 * 1024).build())
096      .build();
097
098  /** HRegionInfo for root region */
099  public static final RegionInfo REGION_INFO =
100    RegionInfoBuilder.newBuilder(TESTTABLEDESC.getTableName()).build();
101
102  private static final byte[] ROW_KEY = REGION_INFO.getRegionName();
103
104  private static final long START_CODE = Long.MAX_VALUE;
105
106  private HRegion region;
107
108  private byte[] firstRowBytes, secondRowBytes, thirdRowBytes;
109  final private byte[] col1;
110
111  public TestScanner() {
112    super();
113
114    firstRowBytes = START_KEY_BYTES;
115    secondRowBytes = START_KEY_BYTES.clone();
116    // Increment the least significant character so we get to next row.
117    secondRowBytes[START_KEY_BYTES.length - 1]++;
118    thirdRowBytes = START_KEY_BYTES.clone();
119    thirdRowBytes[START_KEY_BYTES.length - 1] =
120        (byte) (thirdRowBytes[START_KEY_BYTES.length - 1] + 2);
121    col1 = Bytes.toBytes("column1");
122  }
123
124  /**
125   * Test basic stop row filter works.
126   */
127  @Test
128  public void testStopRow() throws Exception {
129    byte [] startrow = Bytes.toBytes("bbb");
130    byte [] stoprow = Bytes.toBytes("ccc");
131    try {
132      this.region = TEST_UTIL.createLocalHRegion(TESTTABLEDESC, null, null);
133      HTestConst.addContent(this.region, HConstants.CATALOG_FAMILY);
134      List<Cell> results = new ArrayList<>();
135      // Do simple test of getting one row only first.
136      Scan scan = new Scan().withStartRow(Bytes.toBytes("abc"))
137        .withStopRow(Bytes.toBytes("abd"));
138      scan.addFamily(HConstants.CATALOG_FAMILY);
139
140      InternalScanner s = region.getScanner(scan);
141      int count = 0;
142      while (s.next(results)) {
143        count++;
144      }
145      s.close();
146      assertEquals(0, count);
147      // Now do something a bit more imvolved.
148      scan = new Scan().withStartRow(startrow).withStopRow(stoprow);
149      scan.addFamily(HConstants.CATALOG_FAMILY);
150
151      s = region.getScanner(scan);
152      count = 0;
153      Cell kv = null;
154      results = new ArrayList<>();
155      for (boolean first = true; s.next(results);) {
156        kv = results.get(0);
157        if (first) {
158          assertTrue(CellUtil.matchingRows(kv,  startrow));
159          first = false;
160        }
161        count++;
162      }
163      assertTrue(Bytes.BYTES_COMPARATOR.compare(stoprow, CellUtil.cloneRow(kv)) > 0);
164      // We got something back.
165      assertTrue(count > 10);
166      s.close();
167    } finally {
168      HBaseTestingUtil.closeRegionAndWAL(this.region);
169    }
170  }
171
172  void rowPrefixFilter(Scan scan) throws IOException {
173    List<Cell> results = new ArrayList<>();
174    scan.addFamily(HConstants.CATALOG_FAMILY);
175    InternalScanner s = region.getScanner(scan);
176    boolean hasMore = true;
177    while (hasMore) {
178      hasMore = s.next(results);
179      for (Cell kv : results) {
180        assertEquals((byte)'a', CellUtil.cloneRow(kv)[0]);
181        assertEquals((byte)'b', CellUtil.cloneRow(kv)[1]);
182      }
183      results.clear();
184    }
185    s.close();
186  }
187
188  void rowInclusiveStopFilter(Scan scan, byte[] stopRow) throws IOException {
189    List<Cell> results = new ArrayList<>();
190    scan.addFamily(HConstants.CATALOG_FAMILY);
191    InternalScanner s = region.getScanner(scan);
192    boolean hasMore = true;
193    while (hasMore) {
194      hasMore = s.next(results);
195      for (Cell kv : results) {
196        assertTrue(Bytes.compareTo(CellUtil.cloneRow(kv), stopRow) <= 0);
197      }
198      results.clear();
199    }
200    s.close();
201  }
202
203  @Test
204  public void testFilters() throws IOException {
205    try {
206      this.region = TEST_UTIL.createLocalHRegion(TESTTABLEDESC, null, null);
207      HTestConst.addContent(this.region, HConstants.CATALOG_FAMILY);
208      byte [] prefix = Bytes.toBytes("ab");
209      Filter newFilter = new PrefixFilter(prefix);
210      Scan scan = new Scan();
211      scan.setFilter(newFilter);
212      rowPrefixFilter(scan);
213
214      byte[] stopRow = Bytes.toBytes("bbc");
215      newFilter = new WhileMatchFilter(new InclusiveStopFilter(stopRow));
216      scan = new Scan();
217      scan.setFilter(newFilter);
218      rowInclusiveStopFilter(scan, stopRow);
219
220    } finally {
221      HBaseTestingUtil.closeRegionAndWAL(this.region);
222    }
223  }
224
225  /**
226   * Test that closing a scanner while a client is using it doesn't throw
227   * NPEs but instead a UnknownScannerException. HBASE-2503
228   */
229  @Test
230  public void testRaceBetweenClientAndTimeout() throws Exception {
231    try {
232      this.region = TEST_UTIL.createLocalHRegion(TESTTABLEDESC, null, null);
233      HTestConst.addContent(this.region, HConstants.CATALOG_FAMILY);
234      Scan scan = new Scan();
235      InternalScanner s = region.getScanner(scan);
236      List<Cell> results = new ArrayList<>();
237      try {
238        s.next(results);
239        s.close();
240        s.next(results);
241        fail("We don't want anything more, we should be failing");
242      } catch (UnknownScannerException ex) {
243        // ok!
244        return;
245      }
246    } finally {
247      HBaseTestingUtil.closeRegionAndWAL(this.region);
248    }
249  }
250
251  /** The test!
252   */
253  @Test
254  public void testScanner() throws IOException {
255    try {
256      region = TEST_UTIL.createLocalHRegion(TESTTABLEDESC, null, null);
257      Table table = new RegionAsTable(region);
258
259      // Write information to the meta table
260
261      Put put = new Put(ROW_KEY, EnvironmentEdgeManager.currentTime());
262
263      put.addColumn(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER,
264          RegionInfo.toByteArray(REGION_INFO));
265      table.put(put);
266
267      // What we just committed is in the memstore. Verify that we can get
268      // it back both with scanning and get
269
270      scan(false, null);
271      getRegionInfo(table);
272
273      // Close and re-open
274
275      ((HRegion)region).close();
276      region = HRegion.openHRegion(region, null);
277      table = new RegionAsTable(region);
278
279      // Verify we can get the data back now that it is on disk.
280
281      scan(false, null);
282      getRegionInfo(table);
283
284      // Store some new information
285
286      String address = HConstants.LOCALHOST_IP + ":" + HBaseTestingUtil.randomFreePort();
287
288      put = new Put(ROW_KEY, EnvironmentEdgeManager.currentTime());
289      put.addColumn(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER,
290          Bytes.toBytes(address));
291
292//      put.add(HConstants.COL_STARTCODE, Bytes.toBytes(START_CODE));
293
294      table.put(put);
295
296      // Validate that we can still get the HRegionInfo, even though it is in
297      // an older row on disk and there is a newer row in the memstore
298
299      scan(true, address.toString());
300      getRegionInfo(table);
301
302      // flush cache
303      this.region.flush(true);
304
305      // Validate again
306
307      scan(true, address.toString());
308      getRegionInfo(table);
309
310      // Close and reopen
311
312      ((HRegion)region).close();
313      region = HRegion.openHRegion(region,null);
314      table = new RegionAsTable(region);
315
316      // Validate again
317
318      scan(true, address.toString());
319      getRegionInfo(table);
320
321      // Now update the information again
322
323      address = "bar.foo.com:4321";
324
325      put = new Put(ROW_KEY, EnvironmentEdgeManager.currentTime());
326
327      put.addColumn(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER, Bytes.toBytes(address));
328      table.put(put);
329
330      // Validate again
331
332      scan(true, address.toString());
333      getRegionInfo(table);
334
335      // flush cache
336
337      region.flush(true);
338
339      // Validate again
340
341      scan(true, address.toString());
342      getRegionInfo(table);
343
344      // Close and reopen
345
346      ((HRegion)this.region).close();
347      this.region = HRegion.openHRegion(region, null);
348      table = new RegionAsTable(this.region);
349
350      // Validate again
351
352      scan(true, address.toString());
353      getRegionInfo(table);
354
355    } finally {
356      // clean up
357      HBaseTestingUtil.closeRegionAndWAL(this.region);
358    }
359  }
360
361  /** Compare the HRegionInfo we read from HBase to what we stored */
362  private void validateRegionInfo(byte [] regionBytes) throws IOException {
363    RegionInfo info = RegionInfo.parseFromOrNull(regionBytes);
364
365    assertEquals(REGION_INFO.getRegionId(), info.getRegionId());
366    assertEquals(0, info.getStartKey().length);
367    assertEquals(0, info.getEndKey().length);
368    assertEquals(0, Bytes.compareTo(info.getRegionName(), REGION_INFO.getRegionName()));
369    //assertEquals(0, info.getTableDesc().compareTo(REGION_INFO.getTableDesc()));
370  }
371
372  /** Use a scanner to get the region info and then validate the results */
373  private void scan(boolean validateStartcode, String serverName)
374      throws IOException {
375    InternalScanner scanner = null;
376    Scan scan = null;
377    List<Cell> results = new ArrayList<>();
378    byte [][][] scanColumns = {COLS, EXPLICIT_COLS};
379    for(int i = 0; i < scanColumns.length; i++) {
380      try {
381        scan = new Scan().withStartRow(FIRST_ROW);
382        for (int ii = 0; ii < EXPLICIT_COLS.length; ii++) {
383          scan.addColumn(COLS[0],  EXPLICIT_COLS[ii]);
384        }
385        scanner = region.getScanner(scan);
386        while (scanner.next(results)) {
387          assertTrue(hasColumn(results, HConstants.CATALOG_FAMILY,
388              HConstants.REGIONINFO_QUALIFIER));
389          byte [] val = CellUtil.cloneValue(getColumn(results, HConstants.CATALOG_FAMILY,
390              HConstants.REGIONINFO_QUALIFIER));
391          validateRegionInfo(val);
392          if(validateStartcode) {
393//            assertTrue(hasColumn(results, HConstants.CATALOG_FAMILY,
394//                HConstants.STARTCODE_QUALIFIER));
395//            val = getColumn(results, HConstants.CATALOG_FAMILY,
396//                HConstants.STARTCODE_QUALIFIER).getValue();
397            assertNotNull(val);
398            assertFalse(val.length == 0);
399            long startCode = Bytes.toLong(val);
400            assertEquals(START_CODE, startCode);
401          }
402
403          if(serverName != null) {
404            assertTrue(hasColumn(results, HConstants.CATALOG_FAMILY,
405                HConstants.SERVER_QUALIFIER));
406            val = CellUtil.cloneValue(getColumn(results, HConstants.CATALOG_FAMILY,
407                HConstants.SERVER_QUALIFIER));
408            assertNotNull(val);
409            assertFalse(val.length == 0);
410            String server = Bytes.toString(val);
411            assertEquals(0, server.compareTo(serverName));
412          }
413        }
414      } finally {
415        InternalScanner s = scanner;
416        scanner = null;
417        if(s != null) {
418          s.close();
419        }
420      }
421    }
422  }
423
424  private boolean hasColumn(final List<Cell> kvs, final byte [] family,
425      final byte [] qualifier) {
426    for (Cell kv: kvs) {
427      if (CellUtil.matchingFamily(kv, family) && CellUtil.matchingQualifier(kv, qualifier)) {
428        return true;
429      }
430    }
431    return false;
432  }
433
434  private Cell getColumn(final List<Cell> kvs, final byte [] family,
435      final byte [] qualifier) {
436    for (Cell kv: kvs) {
437      if (CellUtil.matchingFamily(kv, family) && CellUtil.matchingQualifier(kv, qualifier)) {
438        return kv;
439      }
440    }
441    return null;
442  }
443
444
445  /** Use get to retrieve the HRegionInfo and validate it */
446  private void getRegionInfo(Table table) throws IOException {
447    Get get = new Get(ROW_KEY);
448    get.addColumn(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER);
449    Result result = table.get(get);
450    byte [] bytes = result.value();
451    validateRegionInfo(bytes);
452  }
453
454  /**
455   * Tests to do a sync flush during the middle of a scan. This is testing the StoreScanner
456   * update readers code essentially.  This is not highly concurrent, since its all 1 thread.
457   * HBase-910.
458   */
459  @Test
460  public void testScanAndSyncFlush() throws Exception {
461    this.region = TEST_UTIL.createLocalHRegion(TESTTABLEDESC, null, null);
462    Table hri = new RegionAsTable(region);
463    try {
464      LOG.info("Added: " +
465        HTestConst.addContent(hri, Bytes.toString(HConstants.CATALOG_FAMILY),
466          Bytes.toString(HConstants.REGIONINFO_QUALIFIER)));
467      int count = count(hri, -1, false);
468      assertEquals(count, count(hri, 100, false)); // do a sync flush.
469    } catch (Exception e) {
470      LOG.error("Failed", e);
471      throw e;
472    } finally {
473      HBaseTestingUtil.closeRegionAndWAL(this.region);
474    }
475  }
476
477  /**
478   * Tests to do a concurrent flush (using a 2nd thread) while scanning.  This tests both
479   * the StoreScanner update readers and the transition from memstore -> snapshot -> store file.
480   */
481  @Test
482  public void testScanAndRealConcurrentFlush() throws Exception {
483    this.region = TEST_UTIL.createLocalHRegion(TESTTABLEDESC, null, null);
484    Table hri = new RegionAsTable(region);
485    try {
486      LOG.info("Added: " +
487        HTestConst.addContent(hri, Bytes.toString(HConstants.CATALOG_FAMILY),
488          Bytes.toString(HConstants.REGIONINFO_QUALIFIER)));
489      int count = count(hri, -1, false);
490      assertEquals(count, count(hri, 100, true)); // do a true concurrent background thread flush
491    } catch (Exception e) {
492      LOG.error("Failed", e);
493      throw e;
494    } finally {
495      HBaseTestingUtil.closeRegionAndWAL(this.region);
496    }
497  }
498
499  /**
500   * Make sure scanner returns correct result when we run a major compaction
501   * with deletes.
502   */
503  @Test
504  public void testScanAndConcurrentMajorCompact() throws Exception {
505    TableDescriptor htd = TEST_UTIL.createTableDescriptor(TableName.valueOf(name.getMethodName()),
506      ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER,
507      ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED);
508    this.region = TEST_UTIL.createLocalHRegion(htd, null, null);
509    Table hri = new RegionAsTable(region);
510
511    try {
512      HTestConst.addContent(hri, Bytes.toString(fam1), Bytes.toString(col1),
513          firstRowBytes, secondRowBytes);
514      HTestConst.addContent(hri, Bytes.toString(fam2), Bytes.toString(col1),
515          firstRowBytes, secondRowBytes);
516
517      Delete dc = new Delete(firstRowBytes);
518      /* delete column1 of firstRow */
519      dc.addColumns(fam1, col1);
520      region.delete(dc);
521      region.flush(true);
522
523      HTestConst.addContent(hri, Bytes.toString(fam1), Bytes.toString(col1),
524          secondRowBytes, thirdRowBytes);
525      HTestConst.addContent(hri, Bytes.toString(fam2), Bytes.toString(col1),
526          secondRowBytes, thirdRowBytes);
527      region.flush(true);
528
529      InternalScanner s = region.getScanner(new Scan());
530      // run a major compact, column1 of firstRow will be cleaned.
531      region.compact(true);
532
533      List<Cell> results = new ArrayList<>();
534      s.next(results);
535
536      // make sure returns column2 of firstRow
537      assertTrue("result is not correct, keyValues : " + results,
538          results.size() == 1);
539      assertTrue(CellUtil.matchingRows(results.get(0), firstRowBytes));
540      assertTrue(CellUtil.matchingFamily(results.get(0), fam2));
541
542      results = new ArrayList<>();
543      s.next(results);
544
545      // get secondRow
546      assertTrue(results.size() == 2);
547      assertTrue(CellUtil.matchingRows(results.get(0), secondRowBytes));
548      assertTrue(CellUtil.matchingFamily(results.get(0), fam1));
549      assertTrue(CellUtil.matchingFamily(results.get(1), fam2));
550    } finally {
551      HBaseTestingUtil.closeRegionAndWAL(this.region);
552    }
553  }
554
555
556  /*
557   * @param hri Region
558   * @param flushIndex At what row we start the flush.
559   * @param concurrent if the flush should be concurrent or sync.
560   * @return Count of rows found.
561   * @throws IOException
562   */
563  private int count(final Table countTable, final int flushIndex, boolean concurrent)
564      throws IOException {
565    LOG.info("Taking out counting scan");
566    Scan scan = new Scan();
567    for (byte [] qualifier: EXPLICIT_COLS) {
568      scan.addColumn(HConstants.CATALOG_FAMILY, qualifier);
569    }
570    ResultScanner s = countTable.getScanner(scan);
571    int count = 0;
572    boolean justFlushed = false;
573    while (s.next() != null) {
574      if (justFlushed) {
575        LOG.info("after next() just after next flush");
576        justFlushed = false;
577      }
578      count++;
579      if (flushIndex == count) {
580        LOG.info("Starting flush at flush index " + flushIndex);
581        Thread t = new Thread() {
582          @Override
583          public void run() {
584            try {
585              region.flush(true);
586              LOG.info("Finishing flush");
587            } catch (IOException e) {
588              LOG.info("Failed flush cache");
589            }
590          }
591        };
592        if (concurrent) {
593          t.start(); // concurrently flush.
594        } else {
595          t.run(); // sync flush
596        }
597        LOG.info("Continuing on after kicking off background flush");
598        justFlushed = true;
599      }
600    }
601    s.close();
602    LOG.info("Found " + count + " items");
603    return count;
604  }
605}