001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import java.io.FileNotFoundException;
021import java.io.IOException;
022import java.util.Arrays;
023import java.util.List;
024import java.util.stream.Collectors;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.fs.FileStatus;
027import org.apache.hadoop.fs.FileSystem;
028import org.apache.hadoop.fs.Path;
029import org.apache.hadoop.hbase.Cell;
030import org.apache.hadoop.hbase.CellScanner;
031import org.apache.hadoop.hbase.HBaseClassTestRule;
032import org.apache.hadoop.hbase.HBaseTestingUtility;
033import org.apache.hadoop.hbase.StartMiniClusterOption;
034import org.apache.hadoop.hbase.TableName;
035import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner;
036import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
037import org.apache.hadoop.hbase.regionserver.HRegion;
038import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
039import org.apache.hadoop.hbase.regionserver.HRegionServer;
040import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
041import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
042import org.apache.hadoop.hbase.testclassification.ClientTests;
043import org.apache.hadoop.hbase.testclassification.LargeTests;
044import org.apache.hadoop.hbase.util.Bytes;
045import org.apache.hadoop.hbase.util.CommonFSUtils;
046import org.apache.hadoop.hbase.util.FSUtils;
047import org.apache.hadoop.hbase.util.HFileArchiveUtil;
048import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
049import org.junit.After;
050import org.junit.Assert;
051import org.junit.ClassRule;
052import org.junit.Rule;
053import org.junit.Test;
054import org.junit.experimental.categories.Category;
055import org.junit.rules.TestName;
056import org.slf4j.Logger;
057import org.slf4j.LoggerFactory;
058
059@Category({LargeTests.class, ClientTests.class})
060public class TestTableSnapshotScanner {
061
062  @ClassRule
063  public static final HBaseClassTestRule CLASS_RULE =
064      HBaseClassTestRule.forClass(TestTableSnapshotScanner.class);
065
066  private static final Logger LOG = LoggerFactory.getLogger(TestTableSnapshotScanner.class);
067  private final HBaseTestingUtility UTIL = new HBaseTestingUtility();
068  private static final int NUM_REGION_SERVERS = 2;
069  private static final byte[][] FAMILIES = {Bytes.toBytes("f1"), Bytes.toBytes("f2")};
070  public static byte[] bbb = Bytes.toBytes("bbb");
071  public static byte[] yyy = Bytes.toBytes("yyy");
072
073  private FileSystem fs;
074  private Path rootDir;
075
076  @Rule
077  public TestName name = new TestName();
078
079  public static void blockUntilSplitFinished(HBaseTestingUtility util, TableName tableName,
080      int expectedRegionSize) throws Exception {
081    for (int i = 0; i < 100; i++) {
082      List<RegionInfo> hRegionInfoList = util.getAdmin().getRegions(tableName);
083      if (hRegionInfoList.size() >= expectedRegionSize) {
084        break;
085      }
086      Thread.sleep(1000);
087    }
088  }
089
090  public void setupCluster() throws Exception {
091    setupConf(UTIL.getConfiguration());
092    StartMiniClusterOption option = StartMiniClusterOption.builder()
093        .numRegionServers(NUM_REGION_SERVERS).numDataNodes(NUM_REGION_SERVERS)
094        .createRootDir(true).build();
095    UTIL.startMiniCluster(option);
096    rootDir = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir();
097    fs = rootDir.getFileSystem(UTIL.getConfiguration());
098  }
099
100  public void tearDownCluster() throws Exception {
101    UTIL.shutdownMiniCluster();
102  }
103
104  private static void setupConf(Configuration conf) {
105    // Enable snapshot
106    conf.setBoolean(SnapshotManager.HBASE_SNAPSHOT_ENABLED, true);
107  }
108
109  @After
110  public void tearDown() throws Exception {
111  }
112
113  public static void createTableAndSnapshot(HBaseTestingUtility util, TableName tableName,
114      String snapshotName, int numRegions)
115      throws Exception {
116    try {
117      util.deleteTable(tableName);
118    } catch(Exception ex) {
119      // ignore
120    }
121
122    if (numRegions > 1) {
123      util.createTable(tableName, FAMILIES, 1, bbb, yyy, numRegions);
124    } else {
125      util.createTable(tableName, FAMILIES);
126    }
127    Admin admin = util.getAdmin();
128
129    // put some stuff in the table
130    Table table = util.getConnection().getTable(tableName);
131    util.loadTable(table, FAMILIES);
132
133    Path rootDir = CommonFSUtils.getRootDir(util.getConfiguration());
134    FileSystem fs = rootDir.getFileSystem(util.getConfiguration());
135
136    SnapshotTestingUtils.createSnapshotAndValidate(admin, tableName,
137        Arrays.asList(FAMILIES), null, snapshotName, rootDir, fs, true);
138
139    // load different values
140    byte[] value = Bytes.toBytes("after_snapshot_value");
141    util.loadTable(table, FAMILIES, value);
142
143    // cause flush to create new files in the region
144    admin.flush(tableName);
145    table.close();
146  }
147
148  @Test
149  public void testNoDuplicateResultsWhenSplitting() throws Exception {
150    setupCluster();
151    TableName tableName = TableName.valueOf("testNoDuplicateResultsWhenSplitting");
152    String snapshotName = "testSnapshotBug";
153    try {
154      if (UTIL.getAdmin().tableExists(tableName)) {
155        UTIL.deleteTable(tableName);
156      }
157
158      UTIL.createTable(tableName, FAMILIES);
159      Admin admin = UTIL.getAdmin();
160
161      // put some stuff in the table
162      Table table = UTIL.getConnection().getTable(tableName);
163      UTIL.loadTable(table, FAMILIES);
164
165      // split to 2 regions
166      admin.split(tableName, Bytes.toBytes("eee"));
167      blockUntilSplitFinished(UTIL, tableName, 2);
168
169      Path rootDir = CommonFSUtils.getRootDir(UTIL.getConfiguration());
170      FileSystem fs = rootDir.getFileSystem(UTIL.getConfiguration());
171
172      SnapshotTestingUtils.createSnapshotAndValidate(admin, tableName,
173        Arrays.asList(FAMILIES), null, snapshotName, rootDir, fs, true);
174
175      // load different values
176      byte[] value = Bytes.toBytes("after_snapshot_value");
177      UTIL.loadTable(table, FAMILIES, value);
178
179      // cause flush to create new files in the region
180      admin.flush(tableName);
181      table.close();
182
183      Path restoreDir = UTIL.getDataTestDirOnTestFS(snapshotName);
184      Scan scan = new Scan().withStartRow(bbb).withStopRow(yyy); // limit the scan
185
186      TableSnapshotScanner scanner =
187          new TableSnapshotScanner(UTIL.getConfiguration(), restoreDir, snapshotName, scan);
188
189      verifyScanner(scanner, bbb, yyy);
190      scanner.close();
191    } finally {
192      UTIL.getAdmin().deleteSnapshot(snapshotName);
193      UTIL.deleteTable(tableName);
194      tearDownCluster();
195    }
196  }
197
198
199  @Test
200  public void testScanLimit() throws Exception {
201    setupCluster();
202    final TableName tableName = TableName.valueOf(name.getMethodName());
203    final String snapshotName = tableName + "Snapshot";
204    TableSnapshotScanner scanner = null;
205    try {
206      createTableAndSnapshot(UTIL, tableName, snapshotName, 50);
207      Path restoreDir = UTIL.getDataTestDirOnTestFS(snapshotName);
208      Scan scan = new Scan().withStartRow(bbb).setLimit(100); // limit the scan
209
210      scanner = new TableSnapshotScanner(UTIL.getConfiguration(), restoreDir, snapshotName, scan);
211      int count = 0;
212      while (true) {
213        Result result = scanner.next();
214        if (result == null) {
215          break;
216        }
217        count++;
218      }
219      Assert.assertEquals(100, count);
220    } finally {
221      if (scanner != null) {
222        scanner.close();
223      }
224      UTIL.getAdmin().deleteSnapshot(snapshotName);
225      UTIL.deleteTable(tableName);
226      tearDownCluster();
227    }
228  }
229
230  @Test
231  public void testWithSingleRegion() throws Exception {
232    testScanner(UTIL, "testWithSingleRegion", 1, false);
233  }
234
235  @Test
236  public void testWithMultiRegion() throws Exception {
237    testScanner(UTIL, "testWithMultiRegion", 10, false);
238  }
239
240  @Test
241  public void testWithOfflineHBaseMultiRegion() throws Exception {
242    testScanner(UTIL, "testWithMultiRegion", 20, true);
243  }
244
245  @Test
246  public void testScannerWithRestoreScanner() throws Exception {
247    setupCluster();
248    TableName tableName = TableName.valueOf("testScanner");
249    String snapshotName = "testScannerWithRestoreScanner";
250    try {
251      createTableAndSnapshot(UTIL, tableName, snapshotName, 50);
252      Path restoreDir = UTIL.getDataTestDirOnTestFS(snapshotName);
253      Scan scan = new Scan().withStartRow(bbb).withStopRow(yyy); // limit the scan
254
255      Configuration conf = UTIL.getConfiguration();
256      Path rootDir = CommonFSUtils.getRootDir(conf);
257
258      TableSnapshotScanner scanner0 =
259          new TableSnapshotScanner(conf, restoreDir, snapshotName, scan);
260      verifyScanner(scanner0, bbb, yyy);
261      scanner0.close();
262
263      // restore snapshot.
264      RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName);
265
266      // scan the snapshot without restoring snapshot
267      TableSnapshotScanner scanner =
268          new TableSnapshotScanner(conf, rootDir, restoreDir, snapshotName, scan, true);
269      verifyScanner(scanner, bbb, yyy);
270      scanner.close();
271
272      // check whether the snapshot has been deleted by the close of scanner.
273      scanner = new TableSnapshotScanner(conf, rootDir, restoreDir, snapshotName, scan, true);
274      verifyScanner(scanner, bbb, yyy);
275      scanner.close();
276
277      // restore snapshot again.
278      RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName);
279
280      // check whether the snapshot has been deleted by the close of scanner.
281      scanner = new TableSnapshotScanner(conf, rootDir, restoreDir, snapshotName, scan, true);
282      verifyScanner(scanner, bbb, yyy);
283      scanner.close();
284    } finally {
285      UTIL.getAdmin().deleteSnapshot(snapshotName);
286      UTIL.deleteTable(tableName);
287      tearDownCluster();
288    }
289  }
290
291  private void testScanner(HBaseTestingUtility util, String snapshotName, int numRegions,
292      boolean shutdownCluster) throws Exception {
293    setupCluster();
294    TableName tableName = TableName.valueOf("testScanner");
295    try {
296      createTableAndSnapshot(util, tableName, snapshotName, numRegions);
297
298      if (shutdownCluster) {
299        util.shutdownMiniHBaseCluster();
300      }
301
302      Path restoreDir = util.getDataTestDirOnTestFS(snapshotName);
303      Scan scan = new Scan().withStartRow(bbb).withStopRow(yyy); // limit the scan
304
305      TableSnapshotScanner scanner = new TableSnapshotScanner(UTIL.getConfiguration(), restoreDir,
306        snapshotName, scan);
307
308      verifyScanner(scanner, bbb, yyy);
309      scanner.close();
310    } finally {
311      if (!shutdownCluster) {
312        util.getAdmin().deleteSnapshot(snapshotName);
313        util.deleteTable(tableName);
314        tearDownCluster();
315      }
316    }
317  }
318
319  private void verifyScanner(ResultScanner scanner, byte[] startRow, byte[] stopRow)
320      throws IOException, InterruptedException {
321
322    HBaseTestingUtility.SeenRowTracker rowTracker =
323        new HBaseTestingUtility.SeenRowTracker(startRow, stopRow);
324
325    while (true) {
326      Result result = scanner.next();
327      if (result == null) {
328        break;
329      }
330      verifyRow(result);
331      rowTracker.addRow(result.getRow());
332    }
333
334    // validate all rows are seen
335    rowTracker.validate();
336  }
337
338  private static void verifyRow(Result result) throws IOException {
339    byte[] row = result.getRow();
340    CellScanner scanner = result.cellScanner();
341    while (scanner.advance()) {
342      Cell cell = scanner.current();
343
344      //assert that all Cells in the Result have the same key
345     Assert.assertEquals(0, Bytes.compareTo(row, 0, row.length,
346         cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()));
347    }
348
349    for (int j = 0; j < FAMILIES.length; j++) {
350      byte[] actual = result.getValue(FAMILIES[j], FAMILIES[j]);
351      Assert.assertArrayEquals("Row in snapshot does not match, expected:" + Bytes.toString(row)
352          + " ,actual:" + Bytes.toString(actual), row, actual);
353    }
354  }
355
356  @Test
357  public void testMergeRegion() throws Exception {
358    setupCluster();
359    TableName tableName = TableName.valueOf("testMergeRegion");
360    String snapshotName = tableName.getNameAsString() + "_snapshot";
361    Configuration conf = UTIL.getConfiguration();
362    Path rootDir = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir();
363    long timeout = 20000; // 20s
364    try (Admin admin = UTIL.getAdmin()) {
365      List<String> serverList = admin.getRegionServers().stream().map(sn -> sn.getServerName())
366          .collect(Collectors.toList());
367      // create table with 3 regions
368      Table table = UTIL.createTable(tableName, FAMILIES, 1, bbb, yyy, 3);
369      List<RegionInfo> regions = admin.getRegions(tableName);
370      Assert.assertEquals(3, regions.size());
371      RegionInfo region0 = regions.get(0);
372      RegionInfo region1 = regions.get(1);
373      RegionInfo region2 = regions.get(2);
374      // put some data in the table
375      UTIL.loadTable(table, FAMILIES);
376      admin.flush(tableName);
377      // wait flush is finished
378      UTIL.waitFor(timeout, () -> {
379        try {
380          Path tableDir = CommonFSUtils.getTableDir(rootDir, tableName);
381          for (RegionInfo region : regions) {
382            Path regionDir = new Path(tableDir, region.getEncodedName());
383            for (Path familyDir : FSUtils.getFamilyDirs(fs, regionDir)) {
384              if (fs.listStatus(familyDir).length != 1) {
385                return false;
386              }
387            }
388          }
389          return true;
390        } catch (IOException e) {
391          LOG.warn("Failed check if flush is finished", e);
392          return false;
393        }
394      });
395      // merge 2 regions
396      admin.compactionSwitch(false, serverList);
397      admin.mergeRegionsAsync(region0.getEncodedNameAsBytes(), region1.getEncodedNameAsBytes(),
398        true);
399      UTIL.waitFor(timeout, () -> admin.getRegions(tableName).size() == 2);
400      List<RegionInfo> mergedRegions = admin.getRegions(tableName);
401      RegionInfo mergedRegion =
402          mergedRegions.get(0).getEncodedName().equals(region2.getEncodedName())
403              ? mergedRegions.get(1)
404              : mergedRegions.get(0);
405      // snapshot
406      admin.snapshot(snapshotName, tableName);
407      Assert.assertEquals(1, admin.listSnapshots().size());
408      // major compact
409      admin.compactionSwitch(true, serverList);
410      admin.majorCompactRegion(mergedRegion.getRegionName());
411      // wait until merged region has no reference
412      UTIL.waitFor(timeout, () -> {
413        try {
414          for (RegionServerThread regionServerThread : UTIL.getMiniHBaseCluster()
415              .getRegionServerThreads()) {
416            HRegionServer regionServer = regionServerThread.getRegionServer();
417            for (HRegion subRegion : regionServer.getRegions(tableName)) {
418              if (subRegion.getRegionInfo().getEncodedName()
419                  .equals(mergedRegion.getEncodedName())) {
420                regionServer.getCompactedHFilesDischarger().chore();
421              }
422            }
423          }
424          Path tableDir = CommonFSUtils.getTableDir(rootDir, tableName);
425          HRegionFileSystem regionFs = HRegionFileSystem
426              .openRegionFromFileSystem(UTIL.getConfiguration(), fs, tableDir, mergedRegion, true);
427          return !regionFs.hasReferences(admin.getDescriptor(tableName));
428        } catch (IOException e) {
429          LOG.warn("Failed check merged region has no reference", e);
430          return false;
431        }
432      });
433      // run catalog janitor to clean and wait for parent regions are archived
434      UTIL.getMiniHBaseCluster().getMaster().getCatalogJanitor().choreForTesting();
435      UTIL.waitFor(timeout, () -> {
436        try {
437          Path tableDir = CommonFSUtils.getTableDir(rootDir, tableName);
438          for (FileStatus fileStatus : fs.listStatus(tableDir)) {
439            String name = fileStatus.getPath().getName();
440            if (name.equals(region0.getEncodedName()) || name.equals(region1.getEncodedName())) {
441              return false;
442            }
443          }
444          return true;
445        } catch (IOException e) {
446          LOG.warn("Check if parent regions are archived error", e);
447          return false;
448        }
449      });
450      // set file modify time and then run cleaner
451      long time = System.currentTimeMillis() - TimeToLiveHFileCleaner.DEFAULT_TTL * 1000;
452      traverseAndSetFileTime(HFileArchiveUtil.getArchivePath(conf), time);
453      UTIL.getMiniHBaseCluster().getMaster().getHFileCleaner().runCleaner();
454      // scan snapshot
455      try (TableSnapshotScanner scanner = new TableSnapshotScanner(conf,
456        UTIL.getDataTestDirOnTestFS(snapshotName), snapshotName,
457        new Scan().withStartRow(bbb).withStopRow(yyy))) {
458        verifyScanner(scanner, bbb, yyy);
459      }
460    } catch (Exception e) {
461      LOG.error("scan snapshot error", e);
462      Assert.fail("Should not throw FileNotFoundException");
463      Assert.assertTrue(e.getCause() != null);
464      Assert.assertTrue(e.getCause().getCause() instanceof FileNotFoundException);
465    } finally {
466      tearDownCluster();
467    }
468  }
469
470  private void traverseAndSetFileTime(Path path, long time) throws IOException {
471    fs.setTimes(path, time, -1);
472    if (fs.isDirectory(path)) {
473      List<FileStatus> allPaths = Arrays.asList(fs.listStatus(path));
474      List<FileStatus> subDirs =
475          allPaths.stream().filter(FileStatus::isDirectory).collect(Collectors.toList());
476      List<FileStatus> files =
477          allPaths.stream().filter(FileStatus::isFile).collect(Collectors.toList());
478      for (FileStatus subDir : subDirs) {
479        traverseAndSetFileTime(subDir.getPath(), time);
480      }
481      for (FileStatus file : files) {
482        fs.setTimes(file.getPath(), time, -1);
483      }
484    }
485  }
486}