001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import java.io.FileNotFoundException;
021import java.io.IOException;
022import java.util.Arrays;
023import java.util.List;
024import java.util.stream.Collectors;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.fs.FileStatus;
027import org.apache.hadoop.fs.FileSystem;
028import org.apache.hadoop.fs.Path;
029import org.apache.hadoop.hbase.Cell;
030import org.apache.hadoop.hbase.CellScanner;
031import org.apache.hadoop.hbase.HBaseClassTestRule;
032import org.apache.hadoop.hbase.HBaseTestingUtility;
033import org.apache.hadoop.hbase.HRegionInfo;
034import org.apache.hadoop.hbase.StartMiniClusterOption;
035import org.apache.hadoop.hbase.TableName;
036import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner;
037import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
038import org.apache.hadoop.hbase.regionserver.HRegion;
039import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
040import org.apache.hadoop.hbase.regionserver.HRegionServer;
041import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
042import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
043import org.apache.hadoop.hbase.testclassification.ClientTests;
044import org.apache.hadoop.hbase.testclassification.LargeTests;
045import org.apache.hadoop.hbase.util.Bytes;
046import org.apache.hadoop.hbase.util.FSUtils;
047import org.apache.hadoop.hbase.util.HFileArchiveUtil;
048import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
049import org.junit.After;
050import org.junit.Assert;
051import org.junit.ClassRule;
052import org.junit.Test;
053import org.junit.experimental.categories.Category;
054import org.slf4j.Logger;
055import org.slf4j.LoggerFactory;
056
057@Category({LargeTests.class, ClientTests.class})
058public class TestTableSnapshotScanner {
059
060  @ClassRule
061  public static final HBaseClassTestRule CLASS_RULE =
062      HBaseClassTestRule.forClass(TestTableSnapshotScanner.class);
063
064  private static final Logger LOG = LoggerFactory.getLogger(TestTableSnapshotScanner.class);
065  private final HBaseTestingUtility UTIL = new HBaseTestingUtility();
066  private static final int NUM_REGION_SERVERS = 2;
067  private static final byte[][] FAMILIES = {Bytes.toBytes("f1"), Bytes.toBytes("f2")};
068  public static byte[] bbb = Bytes.toBytes("bbb");
069  public static byte[] yyy = Bytes.toBytes("yyy");
070
071  private FileSystem fs;
072  private Path rootDir;
073
074  public static void blockUntilSplitFinished(HBaseTestingUtility util, TableName tableName,
075      int expectedRegionSize) throws Exception {
076    for (int i = 0; i < 100; i++) {
077      List<HRegionInfo> hRegionInfoList = util.getAdmin().getTableRegions(tableName);
078      if (hRegionInfoList.size() >= expectedRegionSize) {
079        break;
080      }
081      Thread.sleep(1000);
082    }
083  }
084
085  public void setupCluster() throws Exception {
086    setupConf(UTIL.getConfiguration());
087    StartMiniClusterOption option = StartMiniClusterOption.builder()
088        .numRegionServers(NUM_REGION_SERVERS).numDataNodes(NUM_REGION_SERVERS)
089        .createRootDir(true).build();
090    UTIL.startMiniCluster(option);
091    rootDir = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir();
092    fs = rootDir.getFileSystem(UTIL.getConfiguration());
093  }
094
095  public void tearDownCluster() throws Exception {
096    UTIL.shutdownMiniCluster();
097  }
098
099  private static void setupConf(Configuration conf) {
100    // Enable snapshot
101    conf.setBoolean(SnapshotManager.HBASE_SNAPSHOT_ENABLED, true);
102  }
103
104  @After
105  public void tearDown() throws Exception {
106  }
107
108  public static void createTableAndSnapshot(HBaseTestingUtility util, TableName tableName,
109      String snapshotName, int numRegions)
110      throws Exception {
111    try {
112      util.deleteTable(tableName);
113    } catch(Exception ex) {
114      // ignore
115    }
116
117    if (numRegions > 1) {
118      util.createTable(tableName, FAMILIES, 1, bbb, yyy, numRegions);
119    } else {
120      util.createTable(tableName, FAMILIES);
121    }
122    Admin admin = util.getAdmin();
123
124    // put some stuff in the table
125    Table table = util.getConnection().getTable(tableName);
126    util.loadTable(table, FAMILIES);
127
128    Path rootDir = FSUtils.getRootDir(util.getConfiguration());
129    FileSystem fs = rootDir.getFileSystem(util.getConfiguration());
130
131    SnapshotTestingUtils.createSnapshotAndValidate(admin, tableName,
132        Arrays.asList(FAMILIES), null, snapshotName, rootDir, fs, true);
133
134    // load different values
135    byte[] value = Bytes.toBytes("after_snapshot_value");
136    util.loadTable(table, FAMILIES, value);
137
138    // cause flush to create new files in the region
139    admin.flush(tableName);
140    table.close();
141  }
142
143  @Test
144  public void testNoDuplicateResultsWhenSplitting() throws Exception {
145    setupCluster();
146    TableName tableName = TableName.valueOf("testNoDuplicateResultsWhenSplitting");
147    String snapshotName = "testSnapshotBug";
148    try {
149      if (UTIL.getAdmin().tableExists(tableName)) {
150        UTIL.deleteTable(tableName);
151      }
152
153      UTIL.createTable(tableName, FAMILIES);
154      Admin admin = UTIL.getAdmin();
155
156      // put some stuff in the table
157      Table table = UTIL.getConnection().getTable(tableName);
158      UTIL.loadTable(table, FAMILIES);
159
160      // split to 2 regions
161      admin.split(tableName, Bytes.toBytes("eee"));
162      blockUntilSplitFinished(UTIL, tableName, 2);
163
164      Path rootDir = FSUtils.getRootDir(UTIL.getConfiguration());
165      FileSystem fs = rootDir.getFileSystem(UTIL.getConfiguration());
166
167      SnapshotTestingUtils.createSnapshotAndValidate(admin, tableName,
168        Arrays.asList(FAMILIES), null, snapshotName, rootDir, fs, true);
169
170      // load different values
171      byte[] value = Bytes.toBytes("after_snapshot_value");
172      UTIL.loadTable(table, FAMILIES, value);
173
174      // cause flush to create new files in the region
175      admin.flush(tableName);
176      table.close();
177
178      Path restoreDir = UTIL.getDataTestDirOnTestFS(snapshotName);
179      Scan scan = new Scan().withStartRow(bbb).withStopRow(yyy); // limit the scan
180
181      TableSnapshotScanner scanner =
182          new TableSnapshotScanner(UTIL.getConfiguration(), restoreDir, snapshotName, scan);
183
184      verifyScanner(scanner, bbb, yyy);
185      scanner.close();
186    } finally {
187      UTIL.getAdmin().deleteSnapshot(snapshotName);
188      UTIL.deleteTable(tableName);
189      tearDownCluster();
190    }
191  }
192
193  @Test
194  public void testWithSingleRegion() throws Exception {
195    testScanner(UTIL, "testWithSingleRegion", 1, false);
196  }
197
198  @Test
199  public void testWithMultiRegion() throws Exception {
200    testScanner(UTIL, "testWithMultiRegion", 10, false);
201  }
202
203  @Test
204  public void testWithOfflineHBaseMultiRegion() throws Exception {
205    testScanner(UTIL, "testWithMultiRegion", 20, true);
206  }
207
208  @Test
209  public void testScannerWithRestoreScanner() throws Exception {
210    setupCluster();
211    TableName tableName = TableName.valueOf("testScanner");
212    String snapshotName = "testScannerWithRestoreScanner";
213    try {
214      createTableAndSnapshot(UTIL, tableName, snapshotName, 50);
215      Path restoreDir = UTIL.getDataTestDirOnTestFS(snapshotName);
216      Scan scan = new Scan(bbb, yyy); // limit the scan
217
218      Configuration conf = UTIL.getConfiguration();
219      Path rootDir = FSUtils.getRootDir(conf);
220
221      TableSnapshotScanner scanner0 =
222          new TableSnapshotScanner(conf, restoreDir, snapshotName, scan);
223      verifyScanner(scanner0, bbb, yyy);
224      scanner0.close();
225
226      // restore snapshot.
227      RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName);
228
229      // scan the snapshot without restoring snapshot
230      TableSnapshotScanner scanner =
231          new TableSnapshotScanner(conf, rootDir, restoreDir, snapshotName, scan, true);
232      verifyScanner(scanner, bbb, yyy);
233      scanner.close();
234
235      // check whether the snapshot has been deleted by the close of scanner.
236      scanner = new TableSnapshotScanner(conf, rootDir, restoreDir, snapshotName, scan, true);
237      verifyScanner(scanner, bbb, yyy);
238      scanner.close();
239
240      // restore snapshot again.
241      RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName);
242
243      // check whether the snapshot has been deleted by the close of scanner.
244      scanner = new TableSnapshotScanner(conf, rootDir, restoreDir, snapshotName, scan, true);
245      verifyScanner(scanner, bbb, yyy);
246      scanner.close();
247    } finally {
248      UTIL.getAdmin().deleteSnapshot(snapshotName);
249      UTIL.deleteTable(tableName);
250      tearDownCluster();
251    }
252  }
253
254  private void testScanner(HBaseTestingUtility util, String snapshotName, int numRegions,
255      boolean shutdownCluster) throws Exception {
256    setupCluster();
257    TableName tableName = TableName.valueOf("testScanner");
258    try {
259      createTableAndSnapshot(util, tableName, snapshotName, numRegions);
260
261      if (shutdownCluster) {
262        util.shutdownMiniHBaseCluster();
263      }
264
265      Path restoreDir = util.getDataTestDirOnTestFS(snapshotName);
266      Scan scan = new Scan(bbb, yyy); // limit the scan
267
268      TableSnapshotScanner scanner = new TableSnapshotScanner(UTIL.getConfiguration(), restoreDir,
269        snapshotName, scan);
270
271      verifyScanner(scanner, bbb, yyy);
272      scanner.close();
273    } finally {
274      if (!shutdownCluster) {
275        util.getAdmin().deleteSnapshot(snapshotName);
276        util.deleteTable(tableName);
277        tearDownCluster();
278      }
279    }
280  }
281
282  private void verifyScanner(ResultScanner scanner, byte[] startRow, byte[] stopRow)
283      throws IOException, InterruptedException {
284
285    HBaseTestingUtility.SeenRowTracker rowTracker =
286        new HBaseTestingUtility.SeenRowTracker(startRow, stopRow);
287
288    while (true) {
289      Result result = scanner.next();
290      if (result == null) {
291        break;
292      }
293      verifyRow(result);
294      rowTracker.addRow(result.getRow());
295    }
296
297    // validate all rows are seen
298    rowTracker.validate();
299  }
300
301  private static void verifyRow(Result result) throws IOException {
302    byte[] row = result.getRow();
303    CellScanner scanner = result.cellScanner();
304    while (scanner.advance()) {
305      Cell cell = scanner.current();
306
307      //assert that all Cells in the Result have the same key
308     Assert.assertEquals(0, Bytes.compareTo(row, 0, row.length,
309         cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()));
310    }
311
312    for (int j = 0; j < FAMILIES.length; j++) {
313      byte[] actual = result.getValue(FAMILIES[j], FAMILIES[j]);
314      Assert.assertArrayEquals("Row in snapshot does not match, expected:" + Bytes.toString(row)
315          + " ,actual:" + Bytes.toString(actual), row, actual);
316    }
317  }
318
319  @Test
320  public void testMergeRegion() throws Exception {
321    setupCluster();
322    TableName tableName = TableName.valueOf("testMergeRegion");
323    String snapshotName = tableName.getNameAsString() + "_snapshot";
324    Configuration conf = UTIL.getConfiguration();
325    Path rootDir = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir();
326    long timeout = 20000; // 20s
327    try (Admin admin = UTIL.getAdmin()) {
328      List<String> serverList = admin.getRegionServers().stream().map(sn -> sn.getServerName())
329          .collect(Collectors.toList());
330      // create table with 3 regions
331      Table table = UTIL.createTable(tableName, FAMILIES, 1, bbb, yyy, 3);
332      List<RegionInfo> regions = admin.getRegions(tableName);
333      Assert.assertEquals(3, regions.size());
334      RegionInfo region0 = regions.get(0);
335      RegionInfo region1 = regions.get(1);
336      RegionInfo region2 = regions.get(2);
337      // put some data in the table
338      UTIL.loadTable(table, FAMILIES);
339      admin.flush(tableName);
340      // wait flush is finished
341      UTIL.waitFor(timeout, () -> {
342        try {
343          Path tableDir = FSUtils.getTableDir(rootDir, tableName);
344          for (RegionInfo region : regions) {
345            Path regionDir = new Path(tableDir, region.getEncodedName());
346            for (Path familyDir : FSUtils.getFamilyDirs(fs, regionDir)) {
347              if (fs.listStatus(familyDir).length != 1) {
348                return false;
349              }
350            }
351          }
352          return true;
353        } catch (IOException e) {
354          LOG.warn("Failed check if flush is finished", e);
355          return false;
356        }
357      });
358      // merge 2 regions
359      admin.compactionSwitch(false, serverList);
360      admin.mergeRegionsAsync(region0.getEncodedNameAsBytes(), region1.getEncodedNameAsBytes(),
361        true);
362      UTIL.waitFor(timeout, () -> admin.getRegions(tableName).size() == 2);
363      List<RegionInfo> mergedRegions = admin.getRegions(tableName);
364      RegionInfo mergedRegion =
365          mergedRegions.get(0).getEncodedName().equals(region2.getEncodedName())
366              ? mergedRegions.get(1)
367              : mergedRegions.get(0);
368      // snapshot
369      admin.snapshot(snapshotName, tableName);
370      Assert.assertEquals(1, admin.listSnapshots().size());
371      // major compact
372      admin.compactionSwitch(true, serverList);
373      admin.majorCompactRegion(mergedRegion.getRegionName());
374      // wait until merged region has no reference
375      UTIL.waitFor(timeout, () -> {
376        try {
377          for (RegionServerThread regionServerThread : UTIL.getMiniHBaseCluster()
378              .getRegionServerThreads()) {
379            HRegionServer regionServer = regionServerThread.getRegionServer();
380            for (HRegion subRegion : regionServer.getRegions(tableName)) {
381              if (subRegion.getRegionInfo().getEncodedName()
382                  .equals(mergedRegion.getEncodedName())) {
383                regionServer.getCompactedHFilesDischarger().chore();
384              }
385            }
386          }
387          Path tableDir = FSUtils.getTableDir(rootDir, tableName);
388          HRegionFileSystem regionFs = HRegionFileSystem
389              .openRegionFromFileSystem(UTIL.getConfiguration(), fs, tableDir, mergedRegion, true);
390          return !regionFs.hasReferences(admin.getDescriptor(tableName));
391        } catch (IOException e) {
392          LOG.warn("Failed check merged region has no reference", e);
393          return false;
394        }
395      });
396      // run catalog janitor to clean and wait for parent regions are archived
397      UTIL.getMiniHBaseCluster().getMaster().getCatalogJanitor().choreForTesting();
398      UTIL.waitFor(timeout, () -> {
399        try {
400          Path tableDir = FSUtils.getTableDir(rootDir, tableName);
401          for (FileStatus fileStatus : fs.listStatus(tableDir)) {
402            String name = fileStatus.getPath().getName();
403            if (name.equals(region0.getEncodedName()) || name.equals(region1.getEncodedName())) {
404              return false;
405            }
406          }
407          return true;
408        } catch (IOException e) {
409          LOG.warn("Check if parent regions are archived error", e);
410          return false;
411        }
412      });
413      // set file modify time and then run cleaner
414      long time = System.currentTimeMillis() - TimeToLiveHFileCleaner.DEFAULT_TTL * 1000;
415      traverseAndSetFileTime(HFileArchiveUtil.getArchivePath(conf), time);
416      UTIL.getMiniHBaseCluster().getMaster().getHFileCleaner().runCleaner();
417      // scan snapshot
418      try (TableSnapshotScanner scanner = new TableSnapshotScanner(conf,
419          UTIL.getDataTestDirOnTestFS(snapshotName), snapshotName, new Scan(bbb, yyy))) {
420        verifyScanner(scanner, bbb, yyy);
421      }
422    } catch (Exception e) {
423      LOG.error("scan snapshot error", e);
424      Assert.fail("Should not throw FileNotFoundException");
425      Assert.assertTrue(e.getCause() != null);
426      Assert.assertTrue(e.getCause().getCause() instanceof FileNotFoundException);
427    } finally {
428      tearDownCluster();
429    }
430  }
431
432  private void traverseAndSetFileTime(Path path, long time) throws IOException {
433    fs.setTimes(path, time, -1);
434    if (fs.isDirectory(path)) {
435      List<FileStatus> allPaths = Arrays.asList(fs.listStatus(path));
436      List<FileStatus> subDirs =
437          allPaths.stream().filter(FileStatus::isDirectory).collect(Collectors.toList());
438      List<FileStatus> files =
439          allPaths.stream().filter(FileStatus::isFile).collect(Collectors.toList());
440      for (FileStatus subDir : subDirs) {
441        traverseAndSetFileTime(subDir.getPath(), time);
442      }
443      for (FileStatus file : files) {
444        fs.setTimes(file.getPath(), time, -1);
445      }
446    }
447  }
448}