001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import java.io.FileNotFoundException;
021import java.io.IOException;
022import java.util.Arrays;
023import java.util.List;
024import java.util.stream.Collectors;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.fs.FileStatus;
027import org.apache.hadoop.fs.FileSystem;
028import org.apache.hadoop.fs.Path;
029import org.apache.hadoop.hbase.Cell;
030import org.apache.hadoop.hbase.CellScanner;
031import org.apache.hadoop.hbase.HBaseClassTestRule;
032import org.apache.hadoop.hbase.HBaseTestingUtil;
033import org.apache.hadoop.hbase.StartTestingClusterOption;
034import org.apache.hadoop.hbase.TableName;
035import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner;
036import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
037import org.apache.hadoop.hbase.regionserver.HRegion;
038import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
039import org.apache.hadoop.hbase.regionserver.HRegionServer;
040import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
041import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
042import org.apache.hadoop.hbase.testclassification.ClientTests;
043import org.apache.hadoop.hbase.testclassification.LargeTests;
044import org.apache.hadoop.hbase.util.Bytes;
045import org.apache.hadoop.hbase.util.CommonFSUtils;
046import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
047import org.apache.hadoop.hbase.util.FSUtils;
048import org.apache.hadoop.hbase.util.HFileArchiveUtil;
049import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
050import org.junit.After;
051import org.junit.Assert;
052import org.junit.ClassRule;
053import org.junit.Rule;
054import org.junit.Test;
055import org.junit.experimental.categories.Category;
056import org.junit.rules.TestName;
057import org.slf4j.Logger;
058import org.slf4j.LoggerFactory;
059
060@Category({LargeTests.class, ClientTests.class})
061public class TestTableSnapshotScanner {
062
063  @ClassRule
064  public static final HBaseClassTestRule CLASS_RULE =
065      HBaseClassTestRule.forClass(TestTableSnapshotScanner.class);
066
067  private static final Logger LOG = LoggerFactory.getLogger(TestTableSnapshotScanner.class);
068  private final HBaseTestingUtil UTIL = new HBaseTestingUtil();
069  private static final int NUM_REGION_SERVERS = 2;
070  private static final byte[][] FAMILIES = {Bytes.toBytes("f1"), Bytes.toBytes("f2")};
071  public static byte[] bbb = Bytes.toBytes("bbb");
072  public static byte[] yyy = Bytes.toBytes("yyy");
073
074  private FileSystem fs;
075  private Path rootDir;
076
077  @Rule
078  public TestName name = new TestName();
079
080  public static void blockUntilSplitFinished(HBaseTestingUtil util, TableName tableName,
081      int expectedRegionSize) throws Exception {
082    for (int i = 0; i < 100; i++) {
083      List<RegionInfo> hRegionInfoList = util.getAdmin().getRegions(tableName);
084      if (hRegionInfoList.size() >= expectedRegionSize) {
085        break;
086      }
087      Thread.sleep(1000);
088    }
089  }
090
091  public void setupCluster() throws Exception {
092    setupConf(UTIL.getConfiguration());
093    StartTestingClusterOption option = StartTestingClusterOption.builder()
094        .numRegionServers(NUM_REGION_SERVERS).numDataNodes(NUM_REGION_SERVERS)
095        .createRootDir(true).build();
096    UTIL.startMiniCluster(option);
097    rootDir = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir();
098    fs = rootDir.getFileSystem(UTIL.getConfiguration());
099  }
100
101  public void tearDownCluster() throws Exception {
102    UTIL.shutdownMiniCluster();
103  }
104
105  private static void setupConf(Configuration conf) {
106    // Enable snapshot
107    conf.setBoolean(SnapshotManager.HBASE_SNAPSHOT_ENABLED, true);
108  }
109
110  @After
111  public void tearDown() throws Exception {
112  }
113
114  public static void createTableAndSnapshot(HBaseTestingUtil util, TableName tableName,
115      String snapshotName, int numRegions)
116      throws Exception {
117    try {
118      util.deleteTable(tableName);
119    } catch(Exception ex) {
120      // ignore
121    }
122
123    if (numRegions > 1) {
124      util.createTable(tableName, FAMILIES, 1, bbb, yyy, numRegions);
125    } else {
126      util.createTable(tableName, FAMILIES);
127    }
128    Admin admin = util.getAdmin();
129
130    // put some stuff in the table
131    Table table = util.getConnection().getTable(tableName);
132    util.loadTable(table, FAMILIES);
133
134    Path rootDir = CommonFSUtils.getRootDir(util.getConfiguration());
135    FileSystem fs = rootDir.getFileSystem(util.getConfiguration());
136
137    SnapshotTestingUtils.createSnapshotAndValidate(admin, tableName,
138        Arrays.asList(FAMILIES), null, snapshotName, rootDir, fs, true);
139
140    // load different values
141    byte[] value = Bytes.toBytes("after_snapshot_value");
142    util.loadTable(table, FAMILIES, value);
143
144    // cause flush to create new files in the region
145    admin.flush(tableName);
146    table.close();
147  }
148
149  @Test
150  public void testNoDuplicateResultsWhenSplitting() throws Exception {
151    setupCluster();
152    TableName tableName = TableName.valueOf("testNoDuplicateResultsWhenSplitting");
153    String snapshotName = "testSnapshotBug";
154    try {
155      if (UTIL.getAdmin().tableExists(tableName)) {
156        UTIL.deleteTable(tableName);
157      }
158
159      UTIL.createTable(tableName, FAMILIES);
160      Admin admin = UTIL.getAdmin();
161
162      // put some stuff in the table
163      Table table = UTIL.getConnection().getTable(tableName);
164      UTIL.loadTable(table, FAMILIES);
165
166      // split to 2 regions
167      admin.split(tableName, Bytes.toBytes("eee"));
168      blockUntilSplitFinished(UTIL, tableName, 2);
169
170      Path rootDir = CommonFSUtils.getRootDir(UTIL.getConfiguration());
171      FileSystem fs = rootDir.getFileSystem(UTIL.getConfiguration());
172
173      SnapshotTestingUtils.createSnapshotAndValidate(admin, tableName,
174        Arrays.asList(FAMILIES), null, snapshotName, rootDir, fs, true);
175
176      // load different values
177      byte[] value = Bytes.toBytes("after_snapshot_value");
178      UTIL.loadTable(table, FAMILIES, value);
179
180      // cause flush to create new files in the region
181      admin.flush(tableName);
182      table.close();
183
184      Path restoreDir = UTIL.getDataTestDirOnTestFS(snapshotName);
185      Scan scan = new Scan().withStartRow(bbb).withStopRow(yyy); // limit the scan
186
187      TableSnapshotScanner scanner =
188          new TableSnapshotScanner(UTIL.getConfiguration(), restoreDir, snapshotName, scan);
189
190      verifyScanner(scanner, bbb, yyy);
191      scanner.close();
192    } catch (Exception e) {
193      e.printStackTrace();
194    } finally {
195      UTIL.getAdmin().deleteSnapshot(snapshotName);
196      UTIL.deleteTable(tableName);
197      tearDownCluster();
198    }
199  }
200
201
202  @Test
203  public void testScanLimit() throws Exception {
204    setupCluster();
205    final TableName tableName = TableName.valueOf(name.getMethodName());
206    final String snapshotName = tableName + "Snapshot";
207    TableSnapshotScanner scanner = null;
208    try {
209      createTableAndSnapshot(UTIL, tableName, snapshotName, 50);
210      Path restoreDir = UTIL.getDataTestDirOnTestFS(snapshotName);
211      Scan scan = new Scan().withStartRow(bbb).setLimit(100); // limit the scan
212
213      scanner = new TableSnapshotScanner(UTIL.getConfiguration(), restoreDir, snapshotName, scan);
214      int count = 0;
215      while (true) {
216        Result result = scanner.next();
217        if (result == null) {
218          break;
219        }
220        count++;
221      }
222      Assert.assertEquals(100, count);
223    } finally {
224      if (scanner != null) {
225        scanner.close();
226      }
227      UTIL.getAdmin().deleteSnapshot(snapshotName);
228      UTIL.deleteTable(tableName);
229      tearDownCluster();
230    }
231  }
232
233  @Test
234  public void testWithSingleRegion() throws Exception {
235    testScanner(UTIL, "testWithSingleRegion", 1, false);
236  }
237
238  @Test
239  public void testWithMultiRegion() throws Exception {
240    testScanner(UTIL, "testWithMultiRegion", 10, false);
241  }
242
243  @Test
244  public void testWithOfflineHBaseMultiRegion() throws Exception {
245    testScanner(UTIL, "testWithMultiRegion", 20, true);
246  }
247
248  @Test
249  public void testScannerWithRestoreScanner() throws Exception {
250    setupCluster();
251    TableName tableName = TableName.valueOf("testScanner");
252    String snapshotName = "testScannerWithRestoreScanner";
253    try {
254      createTableAndSnapshot(UTIL, tableName, snapshotName, 50);
255      Path restoreDir = UTIL.getDataTestDirOnTestFS(snapshotName);
256      Scan scan = new Scan().withStartRow(bbb).withStopRow(yyy); // limit the scan
257
258      Configuration conf = UTIL.getConfiguration();
259      Path rootDir = CommonFSUtils.getRootDir(conf);
260
261      TableSnapshotScanner scanner0 =
262          new TableSnapshotScanner(conf, restoreDir, snapshotName, scan);
263      verifyScanner(scanner0, bbb, yyy);
264      scanner0.close();
265
266      // restore snapshot.
267      RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName);
268
269      // scan the snapshot without restoring snapshot
270      TableSnapshotScanner scanner =
271          new TableSnapshotScanner(conf, rootDir, restoreDir, snapshotName, scan, true);
272      verifyScanner(scanner, bbb, yyy);
273      scanner.close();
274
275      // check whether the snapshot has been deleted by the close of scanner.
276      scanner = new TableSnapshotScanner(conf, rootDir, restoreDir, snapshotName, scan, true);
277      verifyScanner(scanner, bbb, yyy);
278      scanner.close();
279
280      // restore snapshot again.
281      RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName);
282
283      // check whether the snapshot has been deleted by the close of scanner.
284      scanner = new TableSnapshotScanner(conf, rootDir, restoreDir, snapshotName, scan, true);
285      verifyScanner(scanner, bbb, yyy);
286      scanner.close();
287    } finally {
288      UTIL.getAdmin().deleteSnapshot(snapshotName);
289      UTIL.deleteTable(tableName);
290      tearDownCluster();
291    }
292  }
293
294  private void testScanner(HBaseTestingUtil util, String snapshotName, int numRegions,
295      boolean shutdownCluster) throws Exception {
296    setupCluster();
297    TableName tableName = TableName.valueOf("testScanner");
298    try {
299      createTableAndSnapshot(util, tableName, snapshotName, numRegions);
300
301      if (shutdownCluster) {
302        util.shutdownMiniHBaseCluster();
303      }
304
305      Path restoreDir = util.getDataTestDirOnTestFS(snapshotName);
306      Scan scan = new Scan().withStartRow(bbb).withStopRow(yyy); // limit the scan
307
308      TableSnapshotScanner scanner = new TableSnapshotScanner(UTIL.getConfiguration(), restoreDir,
309        snapshotName, scan);
310
311      verifyScanner(scanner, bbb, yyy);
312      scanner.close();
313    } finally {
314      if (!shutdownCluster) {
315        util.getAdmin().deleteSnapshot(snapshotName);
316        util.deleteTable(tableName);
317        tearDownCluster();
318      }
319    }
320  }
321
322  private void verifyScanner(ResultScanner scanner, byte[] startRow, byte[] stopRow)
323      throws IOException, InterruptedException {
324
325    HBaseTestingUtil.SeenRowTracker rowTracker =
326        new HBaseTestingUtil.SeenRowTracker(startRow, stopRow);
327
328    while (true) {
329      Result result = scanner.next();
330      if (result == null) {
331        break;
332      }
333      verifyRow(result);
334      rowTracker.addRow(result.getRow());
335    }
336
337    // validate all rows are seen
338    rowTracker.validate();
339  }
340
341  private static void verifyRow(Result result) throws IOException {
342    byte[] row = result.getRow();
343    CellScanner scanner = result.cellScanner();
344    while (scanner.advance()) {
345      Cell cell = scanner.current();
346
347      //assert that all Cells in the Result have the same key
348     Assert.assertEquals(0, Bytes.compareTo(row, 0, row.length,
349         cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()));
350    }
351
352    for (int j = 0; j < FAMILIES.length; j++) {
353      byte[] actual = result.getValue(FAMILIES[j], FAMILIES[j]);
354      Assert.assertArrayEquals("Row in snapshot does not match, expected:" + Bytes.toString(row)
355          + " ,actual:" + Bytes.toString(actual), row, actual);
356    }
357  }
358
359  @Test
360  public void testMergeRegion() throws Exception {
361    setupCluster();
362    TableName tableName = TableName.valueOf("testMergeRegion");
363    String snapshotName = tableName.getNameAsString() + "_snapshot";
364    Configuration conf = UTIL.getConfiguration();
365    Path rootDir = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir();
366    long timeout = 20000; // 20s
367    try (Admin admin = UTIL.getAdmin()) {
368      List<String> serverList = admin.getRegionServers().stream().map(sn -> sn.getServerName())
369          .collect(Collectors.toList());
370      // create table with 3 regions
371      Table table = UTIL.createTable(tableName, FAMILIES, 1, bbb, yyy, 3);
372      List<RegionInfo> regions = admin.getRegions(tableName);
373      Assert.assertEquals(3, regions.size());
374      RegionInfo region0 = regions.get(0);
375      RegionInfo region1 = regions.get(1);
376      RegionInfo region2 = regions.get(2);
377      // put some data in the table
378      UTIL.loadTable(table, FAMILIES);
379      admin.flush(tableName);
380      // wait flush is finished
381      UTIL.waitFor(timeout, () -> {
382        try {
383          Path tableDir = CommonFSUtils.getTableDir(rootDir, tableName);
384          for (RegionInfo region : regions) {
385            Path regionDir = new Path(tableDir, region.getEncodedName());
386            for (Path familyDir : FSUtils.getFamilyDirs(fs, regionDir)) {
387              if (fs.listStatus(familyDir).length != 1) {
388                return false;
389              }
390            }
391          }
392          return true;
393        } catch (IOException e) {
394          LOG.warn("Failed check if flush is finished", e);
395          return false;
396        }
397      });
398      // merge 2 regions
399      admin.compactionSwitch(false, serverList);
400      admin.mergeRegionsAsync(region0.getEncodedNameAsBytes(), region1.getEncodedNameAsBytes(),
401        true);
402      UTIL.waitFor(timeout, () -> admin.getRegions(tableName).size() == 2);
403      List<RegionInfo> mergedRegions = admin.getRegions(tableName);
404      RegionInfo mergedRegion =
405          mergedRegions.get(0).getEncodedName().equals(region2.getEncodedName())
406              ? mergedRegions.get(1)
407              : mergedRegions.get(0);
408      // snapshot
409      admin.snapshot(snapshotName, tableName);
410      Assert.assertEquals(1, admin.listSnapshots().size());
411      // major compact
412      admin.compactionSwitch(true, serverList);
413      admin.majorCompactRegion(mergedRegion.getRegionName());
414      // wait until merged region has no reference
415      UTIL.waitFor(timeout, () -> {
416        try {
417          for (RegionServerThread regionServerThread : UTIL.getMiniHBaseCluster()
418              .getRegionServerThreads()) {
419            HRegionServer regionServer = regionServerThread.getRegionServer();
420            for (HRegion subRegion : regionServer.getRegions(tableName)) {
421              if (subRegion.getRegionInfo().getEncodedName()
422                  .equals(mergedRegion.getEncodedName())) {
423                regionServer.getCompactedHFilesDischarger().chore();
424              }
425            }
426          }
427          Path tableDir = CommonFSUtils.getTableDir(rootDir, tableName);
428          HRegionFileSystem regionFs = HRegionFileSystem
429              .openRegionFromFileSystem(UTIL.getConfiguration(), fs, tableDir, mergedRegion, true);
430          return !regionFs.hasReferences(admin.getDescriptor(tableName));
431        } catch (IOException e) {
432          LOG.warn("Failed check merged region has no reference", e);
433          return false;
434        }
435      });
436      // run catalog janitor to clean and wait for parent regions are archived
437      UTIL.getMiniHBaseCluster().getMaster().getCatalogJanitor().choreForTesting();
438      UTIL.waitFor(timeout, () -> {
439        try {
440          Path tableDir = CommonFSUtils.getTableDir(rootDir, tableName);
441          for (FileStatus fileStatus : fs.listStatus(tableDir)) {
442            String name = fileStatus.getPath().getName();
443            if (name.equals(region0.getEncodedName()) || name.equals(region1.getEncodedName())) {
444              return false;
445            }
446          }
447          return true;
448        } catch (IOException e) {
449          LOG.warn("Check if parent regions are archived error", e);
450          return false;
451        }
452      });
453      // set file modify time and then run cleaner
454      long time = EnvironmentEdgeManager.currentTime() - TimeToLiveHFileCleaner.DEFAULT_TTL * 1000;
455      traverseAndSetFileTime(HFileArchiveUtil.getArchivePath(conf), time);
456      UTIL.getMiniHBaseCluster().getMaster().getHFileCleaner().runCleaner();
457      // scan snapshot
458      try (TableSnapshotScanner scanner = new TableSnapshotScanner(conf,
459        UTIL.getDataTestDirOnTestFS(snapshotName), snapshotName,
460        new Scan().withStartRow(bbb).withStopRow(yyy))) {
461        verifyScanner(scanner, bbb, yyy);
462      }
463    } catch (Exception e) {
464      LOG.error("scan snapshot error", e);
465      Assert.fail("Should not throw Exception: " + e.getMessage());
466    } finally {
467      tearDownCluster();
468    }
469  }
470
471  @Test
472  public void testDeleteTableWithMergedRegions() throws Exception {
473    setupCluster();
474    final TableName tableName = TableName.valueOf(this.name.getMethodName());
475    String snapshotName = tableName.getNameAsString() + "_snapshot";
476    Configuration conf = UTIL.getConfiguration();
477    try (Admin admin = UTIL.getConnection().getAdmin()) {
478      // disable compaction
479      admin.compactionSwitch(false,
480        admin.getRegionServers().stream().map(s -> s.getServerName()).collect(Collectors.toList()));
481      // create table
482      Table table = UTIL.createTable(tableName, FAMILIES, 1, bbb, yyy, 3);
483      List<RegionInfo> regions = admin.getRegions(tableName);
484      Assert.assertEquals(3, regions.size());
485      // write some data
486      UTIL.loadTable(table, FAMILIES);
487      // merge region
488      admin.mergeRegionsAsync(new byte[][] { regions.get(0).getEncodedNameAsBytes(),
489          regions.get(1).getEncodedNameAsBytes() },
490        false).get();
491      regions = admin.getRegions(tableName);
492      Assert.assertEquals(2, regions.size());
493      // snapshot
494      admin.snapshot(snapshotName, tableName);
495      // verify snapshot
496      try (TableSnapshotScanner scanner =
497          new TableSnapshotScanner(conf, UTIL.getDataTestDirOnTestFS(snapshotName), snapshotName,
498              new Scan().withStartRow(bbb).withStopRow(yyy))) {
499        verifyScanner(scanner, bbb, yyy);
500      }
501      // drop table
502      admin.disableTable(tableName);
503      admin.deleteTable(tableName);
504      // verify snapshot
505      try (TableSnapshotScanner scanner =
506          new TableSnapshotScanner(conf, UTIL.getDataTestDirOnTestFS(snapshotName), snapshotName,
507              new Scan().withStartRow(bbb).withStopRow(yyy))) {
508        verifyScanner(scanner, bbb, yyy);
509      }
510    }
511  }
512
513  private void traverseAndSetFileTime(Path path, long time) throws IOException {
514    fs.setTimes(path, time, -1);
515    if (fs.isDirectory(path)) {
516      List<FileStatus> allPaths = Arrays.asList(fs.listStatus(path));
517      List<FileStatus> subDirs =
518          allPaths.stream().filter(FileStatus::isDirectory).collect(Collectors.toList());
519      List<FileStatus> files =
520          allPaths.stream().filter(FileStatus::isFile).collect(Collectors.toList());
521      for (FileStatus subDir : subDirs) {
522        traverseAndSetFileTime(subDir.getPath(), time);
523      }
524      for (FileStatus file : files) {
525        fs.setTimes(file.getPath(), time, -1);
526      }
527    }
528  }
529}