001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY_BYTES;
021import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
022import static org.apache.hadoop.hbase.HBaseTestingUtility.fam2;
023import static org.junit.Assert.assertEquals;
024import static org.junit.Assert.assertFalse;
025import static org.junit.Assert.assertNotNull;
026import static org.junit.Assert.assertTrue;
027import static org.junit.Assert.fail;
028
029import java.io.IOException;
030import java.util.ArrayList;
031import java.util.List;
032import org.apache.hadoop.hbase.Cell;
033import org.apache.hadoop.hbase.CellUtil;
034import org.apache.hadoop.hbase.HBaseClassTestRule;
035import org.apache.hadoop.hbase.HBaseTestCase;
036import org.apache.hadoop.hbase.HBaseTestingUtility;
037import org.apache.hadoop.hbase.HColumnDescriptor;
038import org.apache.hadoop.hbase.HConstants;
039import org.apache.hadoop.hbase.HRegionInfo;
040import org.apache.hadoop.hbase.HTableDescriptor;
041import org.apache.hadoop.hbase.TableName;
042import org.apache.hadoop.hbase.UnknownScannerException;
043import org.apache.hadoop.hbase.client.Delete;
044import org.apache.hadoop.hbase.client.Get;
045import org.apache.hadoop.hbase.client.Put;
046import org.apache.hadoop.hbase.client.Result;
047import org.apache.hadoop.hbase.client.ResultScanner;
048import org.apache.hadoop.hbase.client.Scan;
049import org.apache.hadoop.hbase.client.Table;
050import org.apache.hadoop.hbase.filter.Filter;
051import org.apache.hadoop.hbase.filter.InclusiveStopFilter;
052import org.apache.hadoop.hbase.filter.PrefixFilter;
053import org.apache.hadoop.hbase.filter.WhileMatchFilter;
054import org.apache.hadoop.hbase.testclassification.RegionServerTests;
055import org.apache.hadoop.hbase.testclassification.SmallTests;
056import org.apache.hadoop.hbase.util.Bytes;
057import org.junit.ClassRule;
058import org.junit.Rule;
059import org.junit.Test;
060import org.junit.experimental.categories.Category;
061import org.junit.rules.TestName;
062import org.slf4j.Logger;
063import org.slf4j.LoggerFactory;
064
065/**
066 * Test of a long-lived scanner validating as we go.
067 */
068@Category({RegionServerTests.class, SmallTests.class})
069public class TestScanner {
070
071  @ClassRule
072  public static final HBaseClassTestRule CLASS_RULE =
073      HBaseClassTestRule.forClass(TestScanner.class);
074
075  @Rule public TestName name = new TestName();
076
077  private static final Logger LOG = LoggerFactory.getLogger(TestScanner.class);
078  private final static HBaseTestingUtility TEST_UTIL = HBaseTestingUtility.createLocalHTU();
079
080  private static final byte [] FIRST_ROW = HConstants.EMPTY_START_ROW;
081  private static final byte [][] COLS = { HConstants.CATALOG_FAMILY };
082  private static final byte [][] EXPLICIT_COLS = {
083    HConstants.REGIONINFO_QUALIFIER, HConstants.SERVER_QUALIFIER,
084      // TODO ryan
085      //HConstants.STARTCODE_QUALIFIER
086  };
087
088  static final HTableDescriptor TESTTABLEDESC =
089    new HTableDescriptor(TableName.valueOf("testscanner"));
090  static {
091    TESTTABLEDESC.addFamily(
092        new HColumnDescriptor(HConstants.CATALOG_FAMILY)
093            // Ten is an arbitrary number.  Keep versions to help debugging.
094            .setMaxVersions(10)
095            .setBlockCacheEnabled(false)
096            .setBlocksize(8 * 1024)
097    );
098  }
099  /** HRegionInfo for root region */
100  public static final HRegionInfo REGION_INFO =
101    new HRegionInfo(TESTTABLEDESC.getTableName(), HConstants.EMPTY_BYTE_ARRAY,
102    HConstants.EMPTY_BYTE_ARRAY);
103
104  private static final byte [] ROW_KEY = REGION_INFO.getRegionName();
105
106  private static final long START_CODE = Long.MAX_VALUE;
107
108  private HRegion region;
109
110  private byte[] firstRowBytes, secondRowBytes, thirdRowBytes;
111  final private byte[] col1;
112
113  public TestScanner() {
114    super();
115
116    firstRowBytes = START_KEY_BYTES;
117    secondRowBytes = START_KEY_BYTES.clone();
118    // Increment the least significant character so we get to next row.
119    secondRowBytes[START_KEY_BYTES.length - 1]++;
120    thirdRowBytes = START_KEY_BYTES.clone();
121    thirdRowBytes[START_KEY_BYTES.length - 1] =
122        (byte) (thirdRowBytes[START_KEY_BYTES.length - 1] + 2);
123    col1 = Bytes.toBytes("column1");
124  }
125
126  /**
127   * Test basic stop row filter works.
128   * @throws Exception
129   */
130  @Test
131  public void testStopRow() throws Exception {
132    byte [] startrow = Bytes.toBytes("bbb");
133    byte [] stoprow = Bytes.toBytes("ccc");
134    try {
135      this.region = TEST_UTIL.createLocalHRegion(TESTTABLEDESC, null, null);
136      HBaseTestCase.addContent(this.region, HConstants.CATALOG_FAMILY);
137      List<Cell> results = new ArrayList<>();
138      // Do simple test of getting one row only first.
139      Scan scan = new Scan(Bytes.toBytes("abc"), Bytes.toBytes("abd"));
140      scan.addFamily(HConstants.CATALOG_FAMILY);
141
142      InternalScanner s = region.getScanner(scan);
143      int count = 0;
144      while (s.next(results)) {
145        count++;
146      }
147      s.close();
148      assertEquals(0, count);
149      // Now do something a bit more imvolved.
150      scan = new Scan(startrow, stoprow);
151      scan.addFamily(HConstants.CATALOG_FAMILY);
152
153      s = region.getScanner(scan);
154      count = 0;
155      Cell kv = null;
156      results = new ArrayList<>();
157      for (boolean first = true; s.next(results);) {
158        kv = results.get(0);
159        if (first) {
160          assertTrue(CellUtil.matchingRows(kv,  startrow));
161          first = false;
162        }
163        count++;
164      }
165      assertTrue(Bytes.BYTES_COMPARATOR.compare(stoprow, CellUtil.cloneRow(kv)) > 0);
166      // We got something back.
167      assertTrue(count > 10);
168      s.close();
169    } finally {
170      HBaseTestingUtility.closeRegionAndWAL(this.region);
171    }
172  }
173
174  void rowPrefixFilter(Scan scan) throws IOException {
175    List<Cell> results = new ArrayList<>();
176    scan.addFamily(HConstants.CATALOG_FAMILY);
177    InternalScanner s = region.getScanner(scan);
178    boolean hasMore = true;
179    while (hasMore) {
180      hasMore = s.next(results);
181      for (Cell kv : results) {
182        assertEquals((byte)'a', CellUtil.cloneRow(kv)[0]);
183        assertEquals((byte)'b', CellUtil.cloneRow(kv)[1]);
184      }
185      results.clear();
186    }
187    s.close();
188  }
189
190  void rowInclusiveStopFilter(Scan scan, byte[] stopRow) throws IOException {
191    List<Cell> results = new ArrayList<>();
192    scan.addFamily(HConstants.CATALOG_FAMILY);
193    InternalScanner s = region.getScanner(scan);
194    boolean hasMore = true;
195    while (hasMore) {
196      hasMore = s.next(results);
197      for (Cell kv : results) {
198        assertTrue(Bytes.compareTo(CellUtil.cloneRow(kv), stopRow) <= 0);
199      }
200      results.clear();
201    }
202    s.close();
203  }
204
205  @Test
206  public void testFilters() throws IOException {
207    try {
208      this.region = TEST_UTIL.createLocalHRegion(TESTTABLEDESC, null, null);
209      HBaseTestCase.addContent(this.region, HConstants.CATALOG_FAMILY);
210      byte [] prefix = Bytes.toBytes("ab");
211      Filter newFilter = new PrefixFilter(prefix);
212      Scan scan = new Scan();
213      scan.setFilter(newFilter);
214      rowPrefixFilter(scan);
215
216      byte[] stopRow = Bytes.toBytes("bbc");
217      newFilter = new WhileMatchFilter(new InclusiveStopFilter(stopRow));
218      scan = new Scan();
219      scan.setFilter(newFilter);
220      rowInclusiveStopFilter(scan, stopRow);
221
222    } finally {
223      HBaseTestingUtility.closeRegionAndWAL(this.region);
224    }
225  }
226
227  /**
228   * Test that closing a scanner while a client is using it doesn't throw
229   * NPEs but instead a UnknownScannerException. HBASE-2503
230   * @throws Exception
231   */
232  @Test
233  public void testRaceBetweenClientAndTimeout() throws Exception {
234    try {
235      this.region = TEST_UTIL.createLocalHRegion(TESTTABLEDESC, null, null);
236      HBaseTestCase.addContent(this.region, HConstants.CATALOG_FAMILY);
237      Scan scan = new Scan();
238      InternalScanner s = region.getScanner(scan);
239      List<Cell> results = new ArrayList<>();
240      try {
241        s.next(results);
242        s.close();
243        s.next(results);
244        fail("We don't want anything more, we should be failing");
245      } catch (UnknownScannerException ex) {
246        // ok!
247        return;
248      }
249    } finally {
250      HBaseTestingUtility.closeRegionAndWAL(this.region);
251    }
252  }
253
254  /** The test!
255   * @throws IOException
256   */
257  @Test
258  public void testScanner() throws IOException {
259    try {
260      region = TEST_UTIL.createLocalHRegion(TESTTABLEDESC, null, null);
261      Table table = new RegionAsTable(region);
262
263      // Write information to the meta table
264
265      Put put = new Put(ROW_KEY, System.currentTimeMillis());
266
267      put.addColumn(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER,
268          REGION_INFO.toByteArray());
269      table.put(put);
270
271      // What we just committed is in the memstore. Verify that we can get
272      // it back both with scanning and get
273
274      scan(false, null);
275      getRegionInfo(table);
276
277      // Close and re-open
278
279      ((HRegion)region).close();
280      region = HRegion.openHRegion(region, null);
281      table = new RegionAsTable(region);
282
283      // Verify we can get the data back now that it is on disk.
284
285      scan(false, null);
286      getRegionInfo(table);
287
288      // Store some new information
289
290      String address = HConstants.LOCALHOST_IP + ":" + HBaseTestingUtility.randomFreePort();
291
292      put = new Put(ROW_KEY, System.currentTimeMillis());
293      put.addColumn(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER,
294          Bytes.toBytes(address));
295
296//      put.add(HConstants.COL_STARTCODE, Bytes.toBytes(START_CODE));
297
298      table.put(put);
299
300      // Validate that we can still get the HRegionInfo, even though it is in
301      // an older row on disk and there is a newer row in the memstore
302
303      scan(true, address.toString());
304      getRegionInfo(table);
305
306      // flush cache
307      this.region.flush(true);
308
309      // Validate again
310
311      scan(true, address.toString());
312      getRegionInfo(table);
313
314      // Close and reopen
315
316      ((HRegion)region).close();
317      region = HRegion.openHRegion(region,null);
318      table = new RegionAsTable(region);
319
320      // Validate again
321
322      scan(true, address.toString());
323      getRegionInfo(table);
324
325      // Now update the information again
326
327      address = "bar.foo.com:4321";
328
329      put = new Put(ROW_KEY, System.currentTimeMillis());
330
331      put.addColumn(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER, Bytes.toBytes(address));
332      table.put(put);
333
334      // Validate again
335
336      scan(true, address.toString());
337      getRegionInfo(table);
338
339      // flush cache
340
341      region.flush(true);
342
343      // Validate again
344
345      scan(true, address.toString());
346      getRegionInfo(table);
347
348      // Close and reopen
349
350      ((HRegion)this.region).close();
351      this.region = HRegion.openHRegion(region, null);
352      table = new RegionAsTable(this.region);
353
354      // Validate again
355
356      scan(true, address.toString());
357      getRegionInfo(table);
358
359    } finally {
360      // clean up
361      HBaseTestingUtility.closeRegionAndWAL(this.region);
362    }
363  }
364
365  /** Compare the HRegionInfo we read from HBase to what we stored */
366  private void validateRegionInfo(byte [] regionBytes) throws IOException {
367    HRegionInfo info = HRegionInfo.parseFromOrNull(regionBytes);
368
369    assertEquals(REGION_INFO.getRegionId(), info.getRegionId());
370    assertEquals(0, info.getStartKey().length);
371    assertEquals(0, info.getEndKey().length);
372    assertEquals(0, Bytes.compareTo(info.getRegionName(), REGION_INFO.getRegionName()));
373    //assertEquals(0, info.getTableDesc().compareTo(REGION_INFO.getTableDesc()));
374  }
375
376  /** Use a scanner to get the region info and then validate the results */
377  private void scan(boolean validateStartcode, String serverName)
378  throws IOException {
379    InternalScanner scanner = null;
380    Scan scan = null;
381    List<Cell> results = new ArrayList<>();
382    byte [][][] scanColumns = {
383        COLS,
384        EXPLICIT_COLS
385    };
386
387    for(int i = 0; i < scanColumns.length; i++) {
388      try {
389        scan = new Scan(FIRST_ROW);
390        for (int ii = 0; ii < EXPLICIT_COLS.length; ii++) {
391          scan.addColumn(COLS[0],  EXPLICIT_COLS[ii]);
392        }
393        scanner = region.getScanner(scan);
394        while (scanner.next(results)) {
395          assertTrue(hasColumn(results, HConstants.CATALOG_FAMILY,
396              HConstants.REGIONINFO_QUALIFIER));
397          byte [] val = CellUtil.cloneValue(getColumn(results, HConstants.CATALOG_FAMILY,
398              HConstants.REGIONINFO_QUALIFIER));
399          validateRegionInfo(val);
400          if(validateStartcode) {
401//            assertTrue(hasColumn(results, HConstants.CATALOG_FAMILY,
402//                HConstants.STARTCODE_QUALIFIER));
403//            val = getColumn(results, HConstants.CATALOG_FAMILY,
404//                HConstants.STARTCODE_QUALIFIER).getValue();
405            assertNotNull(val);
406            assertFalse(val.length == 0);
407            long startCode = Bytes.toLong(val);
408            assertEquals(START_CODE, startCode);
409          }
410
411          if(serverName != null) {
412            assertTrue(hasColumn(results, HConstants.CATALOG_FAMILY,
413                HConstants.SERVER_QUALIFIER));
414            val = CellUtil.cloneValue(getColumn(results, HConstants.CATALOG_FAMILY,
415                HConstants.SERVER_QUALIFIER));
416            assertNotNull(val);
417            assertFalse(val.length == 0);
418            String server = Bytes.toString(val);
419            assertEquals(0, server.compareTo(serverName));
420          }
421        }
422      } finally {
423        InternalScanner s = scanner;
424        scanner = null;
425        if(s != null) {
426          s.close();
427        }
428      }
429    }
430  }
431
432  private boolean hasColumn(final List<Cell> kvs, final byte [] family,
433      final byte [] qualifier) {
434    for (Cell kv: kvs) {
435      if (CellUtil.matchingFamily(kv, family) && CellUtil.matchingQualifier(kv, qualifier)) {
436        return true;
437      }
438    }
439    return false;
440  }
441
442  private Cell getColumn(final List<Cell> kvs, final byte [] family,
443      final byte [] qualifier) {
444    for (Cell kv: kvs) {
445      if (CellUtil.matchingFamily(kv, family) && CellUtil.matchingQualifier(kv, qualifier)) {
446        return kv;
447      }
448    }
449    return null;
450  }
451
452
453  /** Use get to retrieve the HRegionInfo and validate it */
454  private void getRegionInfo(Table table) throws IOException {
455    Get get = new Get(ROW_KEY);
456    get.addColumn(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER);
457    Result result = table.get(get);
458    byte [] bytes = result.value();
459    validateRegionInfo(bytes);
460  }
461
462  /**
463   * Tests to do a sync flush during the middle of a scan. This is testing the StoreScanner
464   * update readers code essentially.  This is not highly concurrent, since its all 1 thread.
465   * HBase-910.
466   * @throws Exception
467   */
468  @Test
469  public void testScanAndSyncFlush() throws Exception {
470    this.region = TEST_UTIL.createLocalHRegion(TESTTABLEDESC, null, null);
471    Table hri = new RegionAsTable(region);
472    try {
473        LOG.info("Added: " +
474          HBaseTestCase.addContent(hri, Bytes.toString(HConstants.CATALOG_FAMILY),
475            Bytes.toString(HConstants.REGIONINFO_QUALIFIER)));
476      int count = count(hri, -1, false);
477      assertEquals(count, count(hri, 100, false)); // do a sync flush.
478    } catch (Exception e) {
479      LOG.error("Failed", e);
480      throw e;
481    } finally {
482      HBaseTestingUtility.closeRegionAndWAL(this.region);
483    }
484  }
485
486  /**
487   * Tests to do a concurrent flush (using a 2nd thread) while scanning.  This tests both
488   * the StoreScanner update readers and the transition from memstore -> snapshot -> store file.
489   *
490   * @throws Exception
491   */
492  @Test
493  public void testScanAndRealConcurrentFlush() throws Exception {
494    this.region = TEST_UTIL.createLocalHRegion(TESTTABLEDESC, null, null);
495    Table hri = new RegionAsTable(region);
496    try {
497        LOG.info("Added: " +
498          HBaseTestCase.addContent(hri, Bytes.toString(HConstants.CATALOG_FAMILY),
499            Bytes.toString(HConstants.REGIONINFO_QUALIFIER)));
500      int count = count(hri, -1, false);
501      assertEquals(count, count(hri, 100, true)); // do a true concurrent background thread flush
502    } catch (Exception e) {
503      LOG.error("Failed", e);
504      throw e;
505    } finally {
506      HBaseTestingUtility.closeRegionAndWAL(this.region);
507    }
508  }
509
510  /**
511   * Make sure scanner returns correct result when we run a major compaction
512   * with deletes.
513   *
514   * @throws Exception
515   */
516  @Test
517  @SuppressWarnings("deprecation")
518  public void testScanAndConcurrentMajorCompact() throws Exception {
519    HTableDescriptor htd = TEST_UTIL.createTableDescriptor(name.getMethodName());
520    this.region = TEST_UTIL.createLocalHRegion(htd, null, null);
521    Table hri = new RegionAsTable(region);
522
523    try {
524      HBaseTestCase.addContent(hri, Bytes.toString(fam1), Bytes.toString(col1),
525          firstRowBytes, secondRowBytes);
526      HBaseTestCase.addContent(hri, Bytes.toString(fam2), Bytes.toString(col1),
527          firstRowBytes, secondRowBytes);
528
529      Delete dc = new Delete(firstRowBytes);
530      /* delete column1 of firstRow */
531      dc.addColumns(fam1, col1);
532      region.delete(dc);
533      region.flush(true);
534
535      HBaseTestCase.addContent(hri, Bytes.toString(fam1), Bytes.toString(col1),
536          secondRowBytes, thirdRowBytes);
537      HBaseTestCase.addContent(hri, Bytes.toString(fam2), Bytes.toString(col1),
538          secondRowBytes, thirdRowBytes);
539      region.flush(true);
540
541      InternalScanner s = region.getScanner(new Scan());
542      // run a major compact, column1 of firstRow will be cleaned.
543      region.compact(true);
544
545      List<Cell> results = new ArrayList<>();
546      s.next(results);
547
548      // make sure returns column2 of firstRow
549      assertTrue("result is not correct, keyValues : " + results,
550          results.size() == 1);
551      assertTrue(CellUtil.matchingRows(results.get(0), firstRowBytes)); 
552      assertTrue(CellUtil.matchingFamily(results.get(0), fam2));
553
554      results = new ArrayList<>();
555      s.next(results);
556
557      // get secondRow
558      assertTrue(results.size() == 2);
559      assertTrue(CellUtil.matchingRows(results.get(0), secondRowBytes));
560      assertTrue(CellUtil.matchingFamily(results.get(0), fam1));
561      assertTrue(CellUtil.matchingFamily(results.get(1), fam2));
562    } finally {
563      HBaseTestingUtility.closeRegionAndWAL(this.region);
564    }
565  }
566
567
568  /*
569   * @param hri Region
570   * @param flushIndex At what row we start the flush.
571   * @param concurrent if the flush should be concurrent or sync.
572   * @return Count of rows found.
573   * @throws IOException
574   */
575  private int count(final Table countTable, final int flushIndex, boolean concurrent)
576  throws IOException {
577    LOG.info("Taking out counting scan");
578    Scan scan = new Scan();
579    for (byte [] qualifier: EXPLICIT_COLS) {
580      scan.addColumn(HConstants.CATALOG_FAMILY, qualifier);
581    }
582    ResultScanner s = countTable.getScanner(scan);
583    int count = 0;
584    boolean justFlushed = false;
585    while (s.next() != null) {
586      if (justFlushed) {
587        LOG.info("after next() just after next flush");
588        justFlushed = false;
589      }
590      count++;
591      if (flushIndex == count) {
592        LOG.info("Starting flush at flush index " + flushIndex);
593        Thread t = new Thread() {
594          @Override
595          public void run() {
596            try {
597              region.flush(true);
598              LOG.info("Finishing flush");
599            } catch (IOException e) {
600              LOG.info("Failed flush cache");
601            }
602          }
603        };
604        if (concurrent) {
605          t.start(); // concurrently flush.
606        } else {
607          t.run(); // sync flush
608        }
609        LOG.info("Continuing on after kicking off background flush");
610        justFlushed = true;
611      }
612    }
613    s.close();
614    LOG.info("Found " + count + " items");
615    return count;
616  }
617}