001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import java.io.FileNotFoundException;
021import java.io.IOException;
022import java.util.Arrays;
023import java.util.List;
024import java.util.stream.Collectors;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.fs.FileStatus;
027import org.apache.hadoop.fs.FileSystem;
028import org.apache.hadoop.fs.Path;
029import org.apache.hadoop.hbase.Cell;
030import org.apache.hadoop.hbase.CellScanner;
031import org.apache.hadoop.hbase.HBaseClassTestRule;
032import org.apache.hadoop.hbase.HBaseTestingUtility;
033import org.apache.hadoop.hbase.HRegionInfo;
034import org.apache.hadoop.hbase.StartMiniClusterOption;
035import org.apache.hadoop.hbase.TableName;
036import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner;
037import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
038import org.apache.hadoop.hbase.regionserver.HRegion;
039import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
040import org.apache.hadoop.hbase.regionserver.HRegionServer;
041import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
042import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
043import org.apache.hadoop.hbase.testclassification.ClientTests;
044import org.apache.hadoop.hbase.testclassification.LargeTests;
045import org.apache.hadoop.hbase.util.Bytes;
046import org.apache.hadoop.hbase.util.CommonFSUtils;
047import org.apache.hadoop.hbase.util.FSUtils;
048import org.apache.hadoop.hbase.util.HFileArchiveUtil;
049import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
050import org.junit.After;
051import org.junit.Assert;
052import org.junit.ClassRule;
053import org.junit.Rule;
054import org.junit.Test;
055import org.junit.experimental.categories.Category;
056import org.junit.rules.TestName;
057import org.slf4j.Logger;
058import org.slf4j.LoggerFactory;
059
060@Category({LargeTests.class, ClientTests.class})
061public class TestTableSnapshotScanner {
062
063  @ClassRule
064  public static final HBaseClassTestRule CLASS_RULE =
065      HBaseClassTestRule.forClass(TestTableSnapshotScanner.class);
066
067  private static final Logger LOG = LoggerFactory.getLogger(TestTableSnapshotScanner.class);
068  private final HBaseTestingUtility UTIL = new HBaseTestingUtility();
069  private static final int NUM_REGION_SERVERS = 2;
070  private static final byte[][] FAMILIES = {Bytes.toBytes("f1"), Bytes.toBytes("f2")};
071  public static byte[] bbb = Bytes.toBytes("bbb");
072  public static byte[] yyy = Bytes.toBytes("yyy");
073
074  private FileSystem fs;
075  private Path rootDir;
076
077  @Rule
078  public TestName name = new TestName();
079
080  public static void blockUntilSplitFinished(HBaseTestingUtility util, TableName tableName,
081      int expectedRegionSize) throws Exception {
082    for (int i = 0; i < 100; i++) {
083      List<HRegionInfo> hRegionInfoList = util.getAdmin().getTableRegions(tableName);
084      if (hRegionInfoList.size() >= expectedRegionSize) {
085        break;
086      }
087      Thread.sleep(1000);
088    }
089  }
090
091  public void setupCluster() throws Exception {
092    setupConf(UTIL.getConfiguration());
093    StartMiniClusterOption option = StartMiniClusterOption.builder()
094        .numRegionServers(NUM_REGION_SERVERS).numDataNodes(NUM_REGION_SERVERS)
095        .createRootDir(true).build();
096    UTIL.startMiniCluster(option);
097    rootDir = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir();
098    fs = rootDir.getFileSystem(UTIL.getConfiguration());
099  }
100
101  public void tearDownCluster() throws Exception {
102    UTIL.shutdownMiniCluster();
103  }
104
105  private static void setupConf(Configuration conf) {
106    // Enable snapshot
107    conf.setBoolean(SnapshotManager.HBASE_SNAPSHOT_ENABLED, true);
108  }
109
110  @After
111  public void tearDown() throws Exception {
112  }
113
114  public static void createTableAndSnapshot(HBaseTestingUtility util, TableName tableName,
115      String snapshotName, int numRegions)
116      throws Exception {
117    try {
118      util.deleteTable(tableName);
119    } catch(Exception ex) {
120      // ignore
121    }
122
123    if (numRegions > 1) {
124      util.createTable(tableName, FAMILIES, 1, bbb, yyy, numRegions);
125    } else {
126      util.createTable(tableName, FAMILIES);
127    }
128    Admin admin = util.getAdmin();
129
130    // put some stuff in the table
131    Table table = util.getConnection().getTable(tableName);
132    util.loadTable(table, FAMILIES);
133
134    Path rootDir = CommonFSUtils.getRootDir(util.getConfiguration());
135    FileSystem fs = rootDir.getFileSystem(util.getConfiguration());
136
137    SnapshotTestingUtils.createSnapshotAndValidate(admin, tableName,
138        Arrays.asList(FAMILIES), null, snapshotName, rootDir, fs, true);
139
140    // load different values
141    byte[] value = Bytes.toBytes("after_snapshot_value");
142    util.loadTable(table, FAMILIES, value);
143
144    // cause flush to create new files in the region
145    admin.flush(tableName);
146    table.close();
147  }
148
149  @Test
150  public void testNoDuplicateResultsWhenSplitting() throws Exception {
151    setupCluster();
152    TableName tableName = TableName.valueOf("testNoDuplicateResultsWhenSplitting");
153    String snapshotName = "testSnapshotBug";
154    try {
155      if (UTIL.getAdmin().tableExists(tableName)) {
156        UTIL.deleteTable(tableName);
157      }
158
159      UTIL.createTable(tableName, FAMILIES);
160      Admin admin = UTIL.getAdmin();
161
162      // put some stuff in the table
163      Table table = UTIL.getConnection().getTable(tableName);
164      UTIL.loadTable(table, FAMILIES);
165
166      // split to 2 regions
167      admin.split(tableName, Bytes.toBytes("eee"));
168      blockUntilSplitFinished(UTIL, tableName, 2);
169
170      Path rootDir = CommonFSUtils.getRootDir(UTIL.getConfiguration());
171      FileSystem fs = rootDir.getFileSystem(UTIL.getConfiguration());
172
173      SnapshotTestingUtils.createSnapshotAndValidate(admin, tableName,
174        Arrays.asList(FAMILIES), null, snapshotName, rootDir, fs, true);
175
176      // load different values
177      byte[] value = Bytes.toBytes("after_snapshot_value");
178      UTIL.loadTable(table, FAMILIES, value);
179
180      // cause flush to create new files in the region
181      admin.flush(tableName);
182      table.close();
183
184      Path restoreDir = UTIL.getDataTestDirOnTestFS(snapshotName);
185      Scan scan = new Scan().withStartRow(bbb).withStopRow(yyy); // limit the scan
186
187      TableSnapshotScanner scanner =
188          new TableSnapshotScanner(UTIL.getConfiguration(), restoreDir, snapshotName, scan);
189
190      verifyScanner(scanner, bbb, yyy);
191      scanner.close();
192    } finally {
193      UTIL.getAdmin().deleteSnapshot(snapshotName);
194      UTIL.deleteTable(tableName);
195      tearDownCluster();
196    }
197  }
198
199
200  @Test
201  public void testScanLimit() throws Exception {
202    setupCluster();
203    final TableName tableName = TableName.valueOf(name.getMethodName());
204    final String snapshotName = tableName + "Snapshot";
205    TableSnapshotScanner scanner = null;
206    try {
207      createTableAndSnapshot(UTIL, tableName, snapshotName, 50);
208      Path restoreDir = UTIL.getDataTestDirOnTestFS(snapshotName);
209      Scan scan = new Scan().withStartRow(bbb).setLimit(100); // limit the scan
210
211      scanner = new TableSnapshotScanner(UTIL.getConfiguration(), restoreDir, snapshotName, scan);
212      int count = 0;
213      while (true) {
214        Result result = scanner.next();
215        if (result == null) {
216          break;
217        }
218        count++;
219      }
220      Assert.assertEquals(100, count);
221    } finally {
222      if (scanner != null) {
223        scanner.close();
224      }
225      UTIL.getAdmin().deleteSnapshot(snapshotName);
226      UTIL.deleteTable(tableName);
227      tearDownCluster();
228    }
229  }
230
231  @Test
232  public void testWithSingleRegion() throws Exception {
233    testScanner(UTIL, "testWithSingleRegion", 1, false);
234  }
235
236  @Test
237  public void testWithMultiRegion() throws Exception {
238    testScanner(UTIL, "testWithMultiRegion", 10, false);
239  }
240
241  @Test
242  public void testWithOfflineHBaseMultiRegion() throws Exception {
243    testScanner(UTIL, "testWithMultiRegion", 20, true);
244  }
245
246  @Test
247  public void testScannerWithRestoreScanner() throws Exception {
248    setupCluster();
249    TableName tableName = TableName.valueOf("testScanner");
250    String snapshotName = "testScannerWithRestoreScanner";
251    try {
252      createTableAndSnapshot(UTIL, tableName, snapshotName, 50);
253      Path restoreDir = UTIL.getDataTestDirOnTestFS(snapshotName);
254      Scan scan = new Scan(bbb, yyy); // limit the scan
255
256      Configuration conf = UTIL.getConfiguration();
257      Path rootDir = CommonFSUtils.getRootDir(conf);
258
259      TableSnapshotScanner scanner0 =
260          new TableSnapshotScanner(conf, restoreDir, snapshotName, scan);
261      verifyScanner(scanner0, bbb, yyy);
262      scanner0.close();
263
264      // restore snapshot.
265      RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName);
266
267      // scan the snapshot without restoring snapshot
268      TableSnapshotScanner scanner =
269          new TableSnapshotScanner(conf, rootDir, restoreDir, snapshotName, scan, true);
270      verifyScanner(scanner, bbb, yyy);
271      scanner.close();
272
273      // check whether the snapshot has been deleted by the close of scanner.
274      scanner = new TableSnapshotScanner(conf, rootDir, restoreDir, snapshotName, scan, true);
275      verifyScanner(scanner, bbb, yyy);
276      scanner.close();
277
278      // restore snapshot again.
279      RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName);
280
281      // check whether the snapshot has been deleted by the close of scanner.
282      scanner = new TableSnapshotScanner(conf, rootDir, restoreDir, snapshotName, scan, true);
283      verifyScanner(scanner, bbb, yyy);
284      scanner.close();
285    } finally {
286      UTIL.getAdmin().deleteSnapshot(snapshotName);
287      UTIL.deleteTable(tableName);
288      tearDownCluster();
289    }
290  }
291
292  private void testScanner(HBaseTestingUtility util, String snapshotName, int numRegions,
293      boolean shutdownCluster) throws Exception {
294    setupCluster();
295    TableName tableName = TableName.valueOf("testScanner");
296    try {
297      createTableAndSnapshot(util, tableName, snapshotName, numRegions);
298
299      if (shutdownCluster) {
300        util.shutdownMiniHBaseCluster();
301      }
302
303      Path restoreDir = util.getDataTestDirOnTestFS(snapshotName);
304      Scan scan = new Scan(bbb, yyy); // limit the scan
305
306      TableSnapshotScanner scanner = new TableSnapshotScanner(UTIL.getConfiguration(), restoreDir,
307        snapshotName, scan);
308
309      verifyScanner(scanner, bbb, yyy);
310      scanner.close();
311    } finally {
312      if (!shutdownCluster) {
313        util.getAdmin().deleteSnapshot(snapshotName);
314        util.deleteTable(tableName);
315        tearDownCluster();
316      }
317    }
318  }
319
320  private void verifyScanner(ResultScanner scanner, byte[] startRow, byte[] stopRow)
321      throws IOException, InterruptedException {
322
323    HBaseTestingUtility.SeenRowTracker rowTracker =
324        new HBaseTestingUtility.SeenRowTracker(startRow, stopRow);
325
326    while (true) {
327      Result result = scanner.next();
328      if (result == null) {
329        break;
330      }
331      verifyRow(result);
332      rowTracker.addRow(result.getRow());
333    }
334
335    // validate all rows are seen
336    rowTracker.validate();
337  }
338
339  private static void verifyRow(Result result) throws IOException {
340    byte[] row = result.getRow();
341    CellScanner scanner = result.cellScanner();
342    while (scanner.advance()) {
343      Cell cell = scanner.current();
344
345      //assert that all Cells in the Result have the same key
346     Assert.assertEquals(0, Bytes.compareTo(row, 0, row.length,
347         cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()));
348    }
349
350    for (int j = 0; j < FAMILIES.length; j++) {
351      byte[] actual = result.getValue(FAMILIES[j], FAMILIES[j]);
352      Assert.assertArrayEquals("Row in snapshot does not match, expected:" + Bytes.toString(row)
353          + " ,actual:" + Bytes.toString(actual), row, actual);
354    }
355  }
356
357  @Test
358  public void testMergeRegion() throws Exception {
359    setupCluster();
360    TableName tableName = TableName.valueOf("testMergeRegion");
361    String snapshotName = tableName.getNameAsString() + "_snapshot";
362    Configuration conf = UTIL.getConfiguration();
363    Path rootDir = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir();
364    long timeout = 20000; // 20s
365    try (Admin admin = UTIL.getAdmin()) {
366      List<String> serverList = admin.getRegionServers().stream().map(sn -> sn.getServerName())
367          .collect(Collectors.toList());
368      // create table with 3 regions
369      Table table = UTIL.createTable(tableName, FAMILIES, 1, bbb, yyy, 3);
370      List<RegionInfo> regions = admin.getRegions(tableName);
371      Assert.assertEquals(3, regions.size());
372      RegionInfo region0 = regions.get(0);
373      RegionInfo region1 = regions.get(1);
374      RegionInfo region2 = regions.get(2);
375      // put some data in the table
376      UTIL.loadTable(table, FAMILIES);
377      admin.flush(tableName);
378      // wait flush is finished
379      UTIL.waitFor(timeout, () -> {
380        try {
381          Path tableDir = CommonFSUtils.getTableDir(rootDir, tableName);
382          for (RegionInfo region : regions) {
383            Path regionDir = new Path(tableDir, region.getEncodedName());
384            for (Path familyDir : FSUtils.getFamilyDirs(fs, regionDir)) {
385              if (fs.listStatus(familyDir).length != 1) {
386                return false;
387              }
388            }
389          }
390          return true;
391        } catch (IOException e) {
392          LOG.warn("Failed check if flush is finished", e);
393          return false;
394        }
395      });
396      // merge 2 regions
397      admin.compactionSwitch(false, serverList);
398      admin.mergeRegionsAsync(region0.getEncodedNameAsBytes(), region1.getEncodedNameAsBytes(),
399        true);
400      UTIL.waitFor(timeout, () -> admin.getRegions(tableName).size() == 2);
401      List<RegionInfo> mergedRegions = admin.getRegions(tableName);
402      RegionInfo mergedRegion =
403          mergedRegions.get(0).getEncodedName().equals(region2.getEncodedName())
404              ? mergedRegions.get(1)
405              : mergedRegions.get(0);
406      // snapshot
407      admin.snapshot(snapshotName, tableName);
408      Assert.assertEquals(1, admin.listSnapshots().size());
409      // major compact
410      admin.compactionSwitch(true, serverList);
411      admin.majorCompactRegion(mergedRegion.getRegionName());
412      // wait until merged region has no reference
413      UTIL.waitFor(timeout, () -> {
414        try {
415          for (RegionServerThread regionServerThread : UTIL.getMiniHBaseCluster()
416              .getRegionServerThreads()) {
417            HRegionServer regionServer = regionServerThread.getRegionServer();
418            for (HRegion subRegion : regionServer.getRegions(tableName)) {
419              if (subRegion.getRegionInfo().getEncodedName()
420                  .equals(mergedRegion.getEncodedName())) {
421                regionServer.getCompactedHFilesDischarger().chore();
422              }
423            }
424          }
425          Path tableDir = CommonFSUtils.getTableDir(rootDir, tableName);
426          HRegionFileSystem regionFs = HRegionFileSystem
427              .openRegionFromFileSystem(UTIL.getConfiguration(), fs, tableDir, mergedRegion, true);
428          return !regionFs.hasReferences(admin.getDescriptor(tableName));
429        } catch (IOException e) {
430          LOG.warn("Failed check merged region has no reference", e);
431          return false;
432        }
433      });
434      // run catalog janitor to clean and wait for parent regions are archived
435      UTIL.getMiniHBaseCluster().getMaster().getCatalogJanitor().choreForTesting();
436      UTIL.waitFor(timeout, () -> {
437        try {
438          Path tableDir = CommonFSUtils.getTableDir(rootDir, tableName);
439          for (FileStatus fileStatus : fs.listStatus(tableDir)) {
440            String name = fileStatus.getPath().getName();
441            if (name.equals(region0.getEncodedName()) || name.equals(region1.getEncodedName())) {
442              return false;
443            }
444          }
445          return true;
446        } catch (IOException e) {
447          LOG.warn("Check if parent regions are archived error", e);
448          return false;
449        }
450      });
451      // set file modify time and then run cleaner
452      long time = System.currentTimeMillis() - TimeToLiveHFileCleaner.DEFAULT_TTL * 1000;
453      traverseAndSetFileTime(HFileArchiveUtil.getArchivePath(conf), time);
454      UTIL.getMiniHBaseCluster().getMaster().getHFileCleaner().runCleaner();
455      // scan snapshot
456      try (TableSnapshotScanner scanner = new TableSnapshotScanner(conf,
457          UTIL.getDataTestDirOnTestFS(snapshotName), snapshotName, new Scan(bbb, yyy))) {
458        verifyScanner(scanner, bbb, yyy);
459      }
460    } catch (Exception e) {
461      LOG.error("scan snapshot error", e);
462      Assert.fail("Should not throw FileNotFoundException");
463      Assert.assertTrue(e.getCause() != null);
464      Assert.assertTrue(e.getCause().getCause() instanceof FileNotFoundException);
465    } finally {
466      tearDownCluster();
467    }
468  }
469
470  private void traverseAndSetFileTime(Path path, long time) throws IOException {
471    fs.setTimes(path, time, -1);
472    if (fs.isDirectory(path)) {
473      List<FileStatus> allPaths = Arrays.asList(fs.listStatus(path));
474      List<FileStatus> subDirs =
475          allPaths.stream().filter(FileStatus::isDirectory).collect(Collectors.toList());
476      List<FileStatus> files =
477          allPaths.stream().filter(FileStatus::isFile).collect(Collectors.toList());
478      for (FileStatus subDir : subDirs) {
479        traverseAndSetFileTime(subDir.getPath(), time);
480      }
481      for (FileStatus file : files) {
482        fs.setTimes(file.getPath(), time, -1);
483      }
484    }
485  }
486}