001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY_BYTES;
021import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
022import static org.apache.hadoop.hbase.HBaseTestingUtility.fam2;
023import static org.junit.Assert.assertEquals;
024import static org.junit.Assert.assertFalse;
025import static org.junit.Assert.assertNotNull;
026import static org.junit.Assert.assertTrue;
027import static org.junit.Assert.fail;
028import java.io.IOException;
029import java.util.ArrayList;
030import java.util.List;
031import org.apache.hadoop.hbase.Cell;
032import org.apache.hadoop.hbase.CellUtil;
033import org.apache.hadoop.hbase.HBaseClassTestRule;
034import org.apache.hadoop.hbase.HBaseTestCase;
035import org.apache.hadoop.hbase.HBaseTestingUtility;
036import org.apache.hadoop.hbase.HColumnDescriptor;
037import org.apache.hadoop.hbase.HConstants;
038import org.apache.hadoop.hbase.HRegionInfo;
039import org.apache.hadoop.hbase.HTableDescriptor;
040import org.apache.hadoop.hbase.TableName;
041import org.apache.hadoop.hbase.UnknownScannerException;
042import org.apache.hadoop.hbase.client.Delete;
043import org.apache.hadoop.hbase.client.Get;
044import org.apache.hadoop.hbase.client.Put;
045import org.apache.hadoop.hbase.client.Result;
046import org.apache.hadoop.hbase.client.ResultScanner;
047import org.apache.hadoop.hbase.client.Scan;
048import org.apache.hadoop.hbase.client.Table;
049import org.apache.hadoop.hbase.filter.Filter;
050import org.apache.hadoop.hbase.filter.InclusiveStopFilter;
051import org.apache.hadoop.hbase.filter.PrefixFilter;
052import org.apache.hadoop.hbase.filter.WhileMatchFilter;
053import org.apache.hadoop.hbase.testclassification.MediumTests;
054import org.apache.hadoop.hbase.testclassification.RegionServerTests;
055import org.apache.hadoop.hbase.util.Bytes;
056import org.junit.ClassRule;
057import org.junit.Rule;
058import org.junit.Test;
059import org.junit.experimental.categories.Category;
060import org.junit.rules.TestName;
061import org.slf4j.Logger;
062import org.slf4j.LoggerFactory;
063
064/**
065 * Test of a long-lived scanner validating as we go.
066 */
067@Category({RegionServerTests.class, MediumTests.class})
068public class TestScanner {
069
070  @ClassRule
071  public static final HBaseClassTestRule CLASS_RULE =
072      HBaseClassTestRule.forClass(TestScanner.class);
073
074  @Rule public TestName name = new TestName();
075
076  private static final Logger LOG = LoggerFactory.getLogger(TestScanner.class);
077  private final static HBaseTestingUtility TEST_UTIL = HBaseTestingUtility.createLocalHTU();
078
079  private static final byte [] FIRST_ROW = HConstants.EMPTY_START_ROW;
080  private static final byte [][] COLS = { HConstants.CATALOG_FAMILY };
081  private static final byte [][] EXPLICIT_COLS = {
082    HConstants.REGIONINFO_QUALIFIER, HConstants.SERVER_QUALIFIER,
083      // TODO ryan
084      //HConstants.STARTCODE_QUALIFIER
085  };
086
087  static final HTableDescriptor TESTTABLEDESC =
088    new HTableDescriptor(TableName.valueOf("testscanner"));
089  static {
090    TESTTABLEDESC.addFamily(
091        new HColumnDescriptor(HConstants.CATALOG_FAMILY)
092            // Ten is an arbitrary number.  Keep versions to help debugging.
093            .setMaxVersions(10)
094            .setBlockCacheEnabled(false)
095            .setBlocksize(8 * 1024)
096    );
097  }
098  /** HRegionInfo for root region */
099  public static final HRegionInfo REGION_INFO =
100    new HRegionInfo(TESTTABLEDESC.getTableName(), HConstants.EMPTY_BYTE_ARRAY,
101    HConstants.EMPTY_BYTE_ARRAY);
102
103  private static final byte [] ROW_KEY = REGION_INFO.getRegionName();
104
105  private static final long START_CODE = Long.MAX_VALUE;
106
107  private HRegion region;
108
109  private byte[] firstRowBytes, secondRowBytes, thirdRowBytes;
110  final private byte[] col1;
111
112  public TestScanner() {
113    super();
114
115    firstRowBytes = START_KEY_BYTES;
116    secondRowBytes = START_KEY_BYTES.clone();
117    // Increment the least significant character so we get to next row.
118    secondRowBytes[START_KEY_BYTES.length - 1]++;
119    thirdRowBytes = START_KEY_BYTES.clone();
120    thirdRowBytes[START_KEY_BYTES.length - 1] =
121        (byte) (thirdRowBytes[START_KEY_BYTES.length - 1] + 2);
122    col1 = Bytes.toBytes("column1");
123  }
124
125  /**
126   * Test basic stop row filter works.
127   */
128  @Test
129  public void testStopRow() throws Exception {
130    byte [] startrow = Bytes.toBytes("bbb");
131    byte [] stoprow = Bytes.toBytes("ccc");
132    try {
133      this.region = TEST_UTIL.createLocalHRegion(TESTTABLEDESC, null, null);
134      HBaseTestCase.addContent(this.region, HConstants.CATALOG_FAMILY);
135      List<Cell> results = new ArrayList<>();
136      // Do simple test of getting one row only first.
137      Scan scan = new Scan(Bytes.toBytes("abc"), Bytes.toBytes("abd"));
138      scan.addFamily(HConstants.CATALOG_FAMILY);
139
140      InternalScanner s = region.getScanner(scan);
141      int count = 0;
142      while (s.next(results)) {
143        count++;
144      }
145      s.close();
146      assertEquals(0, count);
147      // Now do something a bit more imvolved.
148      scan = new Scan(startrow, stoprow);
149      scan.addFamily(HConstants.CATALOG_FAMILY);
150
151      s = region.getScanner(scan);
152      count = 0;
153      Cell kv = null;
154      results = new ArrayList<>();
155      for (boolean first = true; s.next(results);) {
156        kv = results.get(0);
157        if (first) {
158          assertTrue(CellUtil.matchingRows(kv,  startrow));
159          first = false;
160        }
161        count++;
162      }
163      assertTrue(Bytes.BYTES_COMPARATOR.compare(stoprow, CellUtil.cloneRow(kv)) > 0);
164      // We got something back.
165      assertTrue(count > 10);
166      s.close();
167    } finally {
168      HBaseTestingUtility.closeRegionAndWAL(this.region);
169    }
170  }
171
172  void rowPrefixFilter(Scan scan) throws IOException {
173    List<Cell> results = new ArrayList<>();
174    scan.addFamily(HConstants.CATALOG_FAMILY);
175    InternalScanner s = region.getScanner(scan);
176    boolean hasMore = true;
177    while (hasMore) {
178      hasMore = s.next(results);
179      for (Cell kv : results) {
180        assertEquals((byte)'a', CellUtil.cloneRow(kv)[0]);
181        assertEquals((byte)'b', CellUtil.cloneRow(kv)[1]);
182      }
183      results.clear();
184    }
185    s.close();
186  }
187
188  void rowInclusiveStopFilter(Scan scan, byte[] stopRow) throws IOException {
189    List<Cell> results = new ArrayList<>();
190    scan.addFamily(HConstants.CATALOG_FAMILY);
191    InternalScanner s = region.getScanner(scan);
192    boolean hasMore = true;
193    while (hasMore) {
194      hasMore = s.next(results);
195      for (Cell kv : results) {
196        assertTrue(Bytes.compareTo(CellUtil.cloneRow(kv), stopRow) <= 0);
197      }
198      results.clear();
199    }
200    s.close();
201  }
202
203  @Test
204  public void testFilters() throws IOException {
205    try {
206      this.region = TEST_UTIL.createLocalHRegion(TESTTABLEDESC, null, null);
207      HBaseTestCase.addContent(this.region, HConstants.CATALOG_FAMILY);
208      byte [] prefix = Bytes.toBytes("ab");
209      Filter newFilter = new PrefixFilter(prefix);
210      Scan scan = new Scan();
211      scan.setFilter(newFilter);
212      rowPrefixFilter(scan);
213
214      byte[] stopRow = Bytes.toBytes("bbc");
215      newFilter = new WhileMatchFilter(new InclusiveStopFilter(stopRow));
216      scan = new Scan();
217      scan.setFilter(newFilter);
218      rowInclusiveStopFilter(scan, stopRow);
219
220    } finally {
221      HBaseTestingUtility.closeRegionAndWAL(this.region);
222    }
223  }
224
225  /**
226   * Test that closing a scanner while a client is using it doesn't throw
227   * NPEs but instead a UnknownScannerException. HBASE-2503
228   */
229  @Test
230  public void testRaceBetweenClientAndTimeout() throws Exception {
231    try {
232      this.region = TEST_UTIL.createLocalHRegion(TESTTABLEDESC, null, null);
233      HBaseTestCase.addContent(this.region, HConstants.CATALOG_FAMILY);
234      Scan scan = new Scan();
235      InternalScanner s = region.getScanner(scan);
236      List<Cell> results = new ArrayList<>();
237      try {
238        s.next(results);
239        s.close();
240        s.next(results);
241        fail("We don't want anything more, we should be failing");
242      } catch (UnknownScannerException ex) {
243        // ok!
244        return;
245      }
246    } finally {
247      HBaseTestingUtility.closeRegionAndWAL(this.region);
248    }
249  }
250
251  /** The test!
252   */
253  @Test
254  public void testScanner() throws IOException {
255    try {
256      region = TEST_UTIL.createLocalHRegion(TESTTABLEDESC, null, null);
257      Table table = new RegionAsTable(region);
258
259      // Write information to the meta table
260
261      Put put = new Put(ROW_KEY, System.currentTimeMillis());
262
263      put.addColumn(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER,
264          REGION_INFO.toByteArray());
265      table.put(put);
266
267      // What we just committed is in the memstore. Verify that we can get
268      // it back both with scanning and get
269
270      scan(false, null);
271      getRegionInfo(table);
272
273      // Close and re-open
274
275      ((HRegion)region).close();
276      region = HRegion.openHRegion(region, null);
277      table = new RegionAsTable(region);
278
279      // Verify we can get the data back now that it is on disk.
280
281      scan(false, null);
282      getRegionInfo(table);
283
284      // Store some new information
285
286      String address = HConstants.LOCALHOST_IP + ":" + HBaseTestingUtility.randomFreePort();
287
288      put = new Put(ROW_KEY, System.currentTimeMillis());
289      put.addColumn(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER,
290          Bytes.toBytes(address));
291
292//      put.add(HConstants.COL_STARTCODE, Bytes.toBytes(START_CODE));
293
294      table.put(put);
295
296      // Validate that we can still get the HRegionInfo, even though it is in
297      // an older row on disk and there is a newer row in the memstore
298
299      scan(true, address.toString());
300      getRegionInfo(table);
301
302      // flush cache
303      this.region.flush(true);
304
305      // Validate again
306
307      scan(true, address.toString());
308      getRegionInfo(table);
309
310      // Close and reopen
311
312      ((HRegion)region).close();
313      region = HRegion.openHRegion(region,null);
314      table = new RegionAsTable(region);
315
316      // Validate again
317
318      scan(true, address.toString());
319      getRegionInfo(table);
320
321      // Now update the information again
322
323      address = "bar.foo.com:4321";
324
325      put = new Put(ROW_KEY, System.currentTimeMillis());
326
327      put.addColumn(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER, Bytes.toBytes(address));
328      table.put(put);
329
330      // Validate again
331
332      scan(true, address.toString());
333      getRegionInfo(table);
334
335      // flush cache
336
337      region.flush(true);
338
339      // Validate again
340
341      scan(true, address.toString());
342      getRegionInfo(table);
343
344      // Close and reopen
345
346      ((HRegion)this.region).close();
347      this.region = HRegion.openHRegion(region, null);
348      table = new RegionAsTable(this.region);
349
350      // Validate again
351
352      scan(true, address.toString());
353      getRegionInfo(table);
354
355    } finally {
356      // clean up
357      HBaseTestingUtility.closeRegionAndWAL(this.region);
358    }
359  }
360
361  /** Compare the HRegionInfo we read from HBase to what we stored */
362  private void validateRegionInfo(byte [] regionBytes) throws IOException {
363    HRegionInfo info = HRegionInfo.parseFromOrNull(regionBytes);
364
365    assertEquals(REGION_INFO.getRegionId(), info.getRegionId());
366    assertEquals(0, info.getStartKey().length);
367    assertEquals(0, info.getEndKey().length);
368    assertEquals(0, Bytes.compareTo(info.getRegionName(), REGION_INFO.getRegionName()));
369    //assertEquals(0, info.getTableDesc().compareTo(REGION_INFO.getTableDesc()));
370  }
371
372  /** Use a scanner to get the region info and then validate the results */
373  private void scan(boolean validateStartcode, String serverName)
374      throws IOException {
375    InternalScanner scanner = null;
376    Scan scan = null;
377    List<Cell> results = new ArrayList<>();
378    byte [][][] scanColumns = {COLS, EXPLICIT_COLS};
379    for(int i = 0; i < scanColumns.length; i++) {
380      try {
381        scan = new Scan(FIRST_ROW);
382        for (int ii = 0; ii < EXPLICIT_COLS.length; ii++) {
383          scan.addColumn(COLS[0],  EXPLICIT_COLS[ii]);
384        }
385        scanner = region.getScanner(scan);
386        while (scanner.next(results)) {
387          assertTrue(hasColumn(results, HConstants.CATALOG_FAMILY,
388              HConstants.REGIONINFO_QUALIFIER));
389          byte [] val = CellUtil.cloneValue(getColumn(results, HConstants.CATALOG_FAMILY,
390              HConstants.REGIONINFO_QUALIFIER));
391          validateRegionInfo(val);
392          if(validateStartcode) {
393//            assertTrue(hasColumn(results, HConstants.CATALOG_FAMILY,
394//                HConstants.STARTCODE_QUALIFIER));
395//            val = getColumn(results, HConstants.CATALOG_FAMILY,
396//                HConstants.STARTCODE_QUALIFIER).getValue();
397            assertNotNull(val);
398            assertFalse(val.length == 0);
399            long startCode = Bytes.toLong(val);
400            assertEquals(START_CODE, startCode);
401          }
402
403          if(serverName != null) {
404            assertTrue(hasColumn(results, HConstants.CATALOG_FAMILY,
405                HConstants.SERVER_QUALIFIER));
406            val = CellUtil.cloneValue(getColumn(results, HConstants.CATALOG_FAMILY,
407                HConstants.SERVER_QUALIFIER));
408            assertNotNull(val);
409            assertFalse(val.length == 0);
410            String server = Bytes.toString(val);
411            assertEquals(0, server.compareTo(serverName));
412          }
413        }
414      } finally {
415        InternalScanner s = scanner;
416        scanner = null;
417        if(s != null) {
418          s.close();
419        }
420      }
421    }
422  }
423
424  private boolean hasColumn(final List<Cell> kvs, final byte [] family,
425      final byte [] qualifier) {
426    for (Cell kv: kvs) {
427      if (CellUtil.matchingFamily(kv, family) && CellUtil.matchingQualifier(kv, qualifier)) {
428        return true;
429      }
430    }
431    return false;
432  }
433
434  private Cell getColumn(final List<Cell> kvs, final byte [] family,
435      final byte [] qualifier) {
436    for (Cell kv: kvs) {
437      if (CellUtil.matchingFamily(kv, family) && CellUtil.matchingQualifier(kv, qualifier)) {
438        return kv;
439      }
440    }
441    return null;
442  }
443
444
445  /** Use get to retrieve the HRegionInfo and validate it */
446  private void getRegionInfo(Table table) throws IOException {
447    Get get = new Get(ROW_KEY);
448    get.addColumn(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER);
449    Result result = table.get(get);
450    byte [] bytes = result.value();
451    validateRegionInfo(bytes);
452  }
453
454  /**
455   * Tests to do a sync flush during the middle of a scan. This is testing the StoreScanner
456   * update readers code essentially.  This is not highly concurrent, since its all 1 thread.
457   * HBase-910.
458   */
459  @Test
460  public void testScanAndSyncFlush() throws Exception {
461    this.region = TEST_UTIL.createLocalHRegion(TESTTABLEDESC, null, null);
462    Table hri = new RegionAsTable(region);
463    try {
464      LOG.info("Added: " +
465        HBaseTestCase.addContent(hri, Bytes.toString(HConstants.CATALOG_FAMILY),
466          Bytes.toString(HConstants.REGIONINFO_QUALIFIER)));
467      int count = count(hri, -1, false);
468      assertEquals(count, count(hri, 100, false)); // do a sync flush.
469    } catch (Exception e) {
470      LOG.error("Failed", e);
471      throw e;
472    } finally {
473      HBaseTestingUtility.closeRegionAndWAL(this.region);
474    }
475  }
476
477  /**
478   * Tests to do a concurrent flush (using a 2nd thread) while scanning.  This tests both
479   * the StoreScanner update readers and the transition from memstore -> snapshot -> store file.
480   */
481  @Test
482  public void testScanAndRealConcurrentFlush() throws Exception {
483    this.region = TEST_UTIL.createLocalHRegion(TESTTABLEDESC, null, null);
484    Table hri = new RegionAsTable(region);
485    try {
486      LOG.info("Added: " +
487        HBaseTestCase.addContent(hri, Bytes.toString(HConstants.CATALOG_FAMILY),
488          Bytes.toString(HConstants.REGIONINFO_QUALIFIER)));
489      int count = count(hri, -1, false);
490      assertEquals(count, count(hri, 100, true)); // do a true concurrent background thread flush
491    } catch (Exception e) {
492      LOG.error("Failed", e);
493      throw e;
494    } finally {
495      HBaseTestingUtility.closeRegionAndWAL(this.region);
496    }
497  }
498
499  /**
500   * Make sure scanner returns correct result when we run a major compaction
501   * with deletes.
502   */
503  @Test
504  @SuppressWarnings("deprecation")
505  public void testScanAndConcurrentMajorCompact() throws Exception {
506    HTableDescriptor htd = TEST_UTIL.createTableDescriptor(name.getMethodName());
507    this.region = TEST_UTIL.createLocalHRegion(htd, null, null);
508    Table hri = new RegionAsTable(region);
509
510    try {
511      HBaseTestCase.addContent(hri, Bytes.toString(fam1), Bytes.toString(col1),
512          firstRowBytes, secondRowBytes);
513      HBaseTestCase.addContent(hri, Bytes.toString(fam2), Bytes.toString(col1),
514          firstRowBytes, secondRowBytes);
515
516      Delete dc = new Delete(firstRowBytes);
517      /* delete column1 of firstRow */
518      dc.addColumns(fam1, col1);
519      region.delete(dc);
520      region.flush(true);
521
522      HBaseTestCase.addContent(hri, Bytes.toString(fam1), Bytes.toString(col1),
523          secondRowBytes, thirdRowBytes);
524      HBaseTestCase.addContent(hri, Bytes.toString(fam2), Bytes.toString(col1),
525          secondRowBytes, thirdRowBytes);
526      region.flush(true);
527
528      InternalScanner s = region.getScanner(new Scan());
529      // run a major compact, column1 of firstRow will be cleaned.
530      region.compact(true);
531
532      List<Cell> results = new ArrayList<>();
533      s.next(results);
534
535      // make sure returns column2 of firstRow
536      assertTrue("result is not correct, keyValues : " + results,
537          results.size() == 1);
538      assertTrue(CellUtil.matchingRows(results.get(0), firstRowBytes)); 
539      assertTrue(CellUtil.matchingFamily(results.get(0), fam2));
540
541      results = new ArrayList<>();
542      s.next(results);
543
544      // get secondRow
545      assertTrue(results.size() == 2);
546      assertTrue(CellUtil.matchingRows(results.get(0), secondRowBytes));
547      assertTrue(CellUtil.matchingFamily(results.get(0), fam1));
548      assertTrue(CellUtil.matchingFamily(results.get(1), fam2));
549    } finally {
550      HBaseTestingUtility.closeRegionAndWAL(this.region);
551    }
552  }
553
554
555  /*
556   * @param hri Region
557   * @param flushIndex At what row we start the flush.
558   * @param concurrent if the flush should be concurrent or sync.
559   * @return Count of rows found.
560   * @throws IOException
561   */
562  private int count(final Table countTable, final int flushIndex, boolean concurrent)
563      throws IOException {
564    LOG.info("Taking out counting scan");
565    Scan scan = new Scan();
566    for (byte [] qualifier: EXPLICIT_COLS) {
567      scan.addColumn(HConstants.CATALOG_FAMILY, qualifier);
568    }
569    ResultScanner s = countTable.getScanner(scan);
570    int count = 0;
571    boolean justFlushed = false;
572    while (s.next() != null) {
573      if (justFlushed) {
574        LOG.info("after next() just after next flush");
575        justFlushed = false;
576      }
577      count++;
578      if (flushIndex == count) {
579        LOG.info("Starting flush at flush index " + flushIndex);
580        Thread t = new Thread() {
581          @Override
582          public void run() {
583            try {
584              region.flush(true);
585              LOG.info("Finishing flush");
586            } catch (IOException e) {
587              LOG.info("Failed flush cache");
588            }
589          }
590        };
591        if (concurrent) {
592          t.start(); // concurrently flush.
593        } else {
594          t.run(); // sync flush
595        }
596        LOG.info("Continuing on after kicking off background flush");
597        justFlushed = true;
598      }
599    }
600    s.close();
601    LOG.info("Found " + count + " items");
602    return count;
603  }
604}