001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import java.io.IOException;
021import java.util.Arrays;
022import java.util.List;
023import java.util.stream.Collectors;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.fs.FileStatus;
026import org.apache.hadoop.fs.FileSystem;
027import org.apache.hadoop.fs.Path;
028import org.apache.hadoop.hbase.Cell;
029import org.apache.hadoop.hbase.CellScanner;
030import org.apache.hadoop.hbase.HBaseClassTestRule;
031import org.apache.hadoop.hbase.HBaseTestingUtil;
032import org.apache.hadoop.hbase.StartTestingClusterOption;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner;
035import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
036import org.apache.hadoop.hbase.regionserver.HRegion;
037import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
038import org.apache.hadoop.hbase.regionserver.HRegionServer;
039import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
040import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
041import org.apache.hadoop.hbase.testclassification.ClientTests;
042import org.apache.hadoop.hbase.testclassification.LargeTests;
043import org.apache.hadoop.hbase.util.Bytes;
044import org.apache.hadoop.hbase.util.CommonFSUtils;
045import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
046import org.apache.hadoop.hbase.util.FSUtils;
047import org.apache.hadoop.hbase.util.HFileArchiveUtil;
048import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
049import org.junit.After;
050import org.junit.Assert;
051import org.junit.Before;
052import org.junit.ClassRule;
053import org.junit.Rule;
054import org.junit.Test;
055import org.junit.experimental.categories.Category;
056import org.junit.rules.TestName;
057import org.slf4j.Logger;
058import org.slf4j.LoggerFactory;
059
060@Category({ LargeTests.class, ClientTests.class })
061public class TestTableSnapshotScanner {
062
063  @ClassRule
064  public static final HBaseClassTestRule CLASS_RULE =
065    HBaseClassTestRule.forClass(TestTableSnapshotScanner.class);
066
067  private static final Logger LOG = LoggerFactory.getLogger(TestTableSnapshotScanner.class);
068  private final HBaseTestingUtil UTIL = new HBaseTestingUtil();
069  private static final int NUM_REGION_SERVERS = 2;
070  private static final byte[][] FAMILIES = { Bytes.toBytes("f1"), Bytes.toBytes("f2") };
071  public static byte[] bbb = Bytes.toBytes("bbb");
072  public static byte[] yyy = Bytes.toBytes("yyy");
073
074  private FileSystem fs;
075  private Path rootDir;
076  private boolean clusterUp;
077
078  @Rule
079  public TestName name = new TestName();
080
081  public static void blockUntilSplitFinished(HBaseTestingUtil util, TableName tableName,
082    int expectedRegionSize) throws Exception {
083    for (int i = 0; i < 100; i++) {
084      List<RegionInfo> hRegionInfoList = util.getAdmin().getRegions(tableName);
085      if (hRegionInfoList.size() >= expectedRegionSize) {
086        break;
087      }
088      Thread.sleep(1000);
089    }
090  }
091
092  @Before
093  public void setupCluster() throws Exception {
094    setupConf(UTIL.getConfiguration());
095    StartTestingClusterOption option =
096      StartTestingClusterOption.builder().numRegionServers(NUM_REGION_SERVERS)
097        .numDataNodes(NUM_REGION_SERVERS).createRootDir(true).build();
098    UTIL.startMiniCluster(option);
099    clusterUp = true;
100    rootDir = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir();
101    fs = rootDir.getFileSystem(UTIL.getConfiguration());
102  }
103
104  @After
105  public void tearDownCluster() throws Exception {
106    if (clusterUp) {
107      UTIL.shutdownMiniCluster();
108    }
109  }
110
111  protected void setupConf(Configuration conf) {
112    // Enable snapshot
113    conf.setBoolean(SnapshotManager.HBASE_SNAPSHOT_ENABLED, true);
114  }
115
116  public static void createTableAndSnapshot(HBaseTestingUtil util, TableName tableName,
117    String snapshotName, int numRegions) throws Exception {
118    try {
119      util.deleteTable(tableName);
120    } catch (Exception ex) {
121      // ignore
122    }
123
124    if (numRegions > 1) {
125      util.createTable(tableName, FAMILIES, 1, bbb, yyy, numRegions);
126    } else {
127      util.createTable(tableName, FAMILIES);
128    }
129    Admin admin = util.getAdmin();
130
131    // put some stuff in the table
132    Table table = util.getConnection().getTable(tableName);
133    util.loadTable(table, FAMILIES);
134
135    Path rootDir = CommonFSUtils.getRootDir(util.getConfiguration());
136    FileSystem fs = rootDir.getFileSystem(util.getConfiguration());
137
138    SnapshotTestingUtils.createSnapshotAndValidate(admin, tableName, Arrays.asList(FAMILIES), null,
139      snapshotName, rootDir, fs, true);
140
141    // load different values
142    byte[] value = Bytes.toBytes("after_snapshot_value");
143    util.loadTable(table, FAMILIES, value);
144
145    // cause flush to create new files in the region
146    admin.flush(tableName);
147    table.close();
148  }
149
150  @Test
151  public void testNoDuplicateResultsWhenSplitting() throws Exception {
152    TableName tableName = TableName.valueOf("testNoDuplicateResultsWhenSplitting");
153    String snapshotName = "testSnapshotBug";
154    try {
155      if (UTIL.getAdmin().tableExists(tableName)) {
156        UTIL.deleteTable(tableName);
157      }
158
159      UTIL.createTable(tableName, FAMILIES);
160      Admin admin = UTIL.getAdmin();
161
162      // put some stuff in the table
163      Table table = UTIL.getConnection().getTable(tableName);
164      UTIL.loadTable(table, FAMILIES);
165
166      // split to 2 regions
167      admin.split(tableName, Bytes.toBytes("eee"));
168      blockUntilSplitFinished(UTIL, tableName, 2);
169
170      Path rootDir = CommonFSUtils.getRootDir(UTIL.getConfiguration());
171      FileSystem fs = rootDir.getFileSystem(UTIL.getConfiguration());
172
173      SnapshotTestingUtils.createSnapshotAndValidate(admin, tableName, Arrays.asList(FAMILIES),
174        null, snapshotName, rootDir, fs, true);
175
176      // load different values
177      byte[] value = Bytes.toBytes("after_snapshot_value");
178      UTIL.loadTable(table, FAMILIES, value);
179
180      // cause flush to create new files in the region
181      admin.flush(tableName);
182      table.close();
183
184      Path restoreDir = UTIL.getDataTestDirOnTestFS(snapshotName);
185      Scan scan = new Scan().withStartRow(bbb).withStopRow(yyy); // limit the scan
186
187      TableSnapshotScanner scanner =
188        new TableSnapshotScanner(UTIL.getConfiguration(), restoreDir, snapshotName, scan);
189
190      verifyScanner(scanner, bbb, yyy);
191      scanner.close();
192    } catch (Exception e) {
193      e.printStackTrace();
194    } finally {
195      UTIL.getAdmin().deleteSnapshot(snapshotName);
196      UTIL.deleteTable(tableName);
197    }
198  }
199
200  @Test
201  public void testScanLimit() throws Exception {
202    final TableName tableName = TableName.valueOf(name.getMethodName());
203    final String snapshotName = tableName + "Snapshot";
204    TableSnapshotScanner scanner = null;
205    try {
206      createTableAndSnapshot(UTIL, tableName, snapshotName, 50);
207      Path restoreDir = UTIL.getDataTestDirOnTestFS(snapshotName);
208      Scan scan = new Scan().withStartRow(bbb).setLimit(100); // limit the scan
209
210      scanner = new TableSnapshotScanner(UTIL.getConfiguration(), restoreDir, snapshotName, scan);
211      int count = 0;
212      while (true) {
213        Result result = scanner.next();
214        if (result == null) {
215          break;
216        }
217        count++;
218      }
219      Assert.assertEquals(100, count);
220    } finally {
221      if (scanner != null) {
222        scanner.close();
223      }
224      UTIL.getAdmin().deleteSnapshot(snapshotName);
225      UTIL.deleteTable(tableName);
226    }
227  }
228
229  @Test
230  public void testWithSingleRegion() throws Exception {
231    testScanner(UTIL, "testWithSingleRegion", 1, false);
232  }
233
234  @Test
235  public void testWithMultiRegion() throws Exception {
236    testScanner(UTIL, "testWithMultiRegion", 10, false);
237  }
238
239  @Test
240  public void testWithOfflineHBaseMultiRegion() throws Exception {
241    testScanner(UTIL, "testWithMultiRegion", 20, true);
242  }
243
244  @Test
245  public void testScannerWithRestoreScanner() throws Exception {
246    TableName tableName = TableName.valueOf("testScanner");
247    String snapshotName = "testScannerWithRestoreScanner";
248    try {
249      createTableAndSnapshot(UTIL, tableName, snapshotName, 50);
250      Path restoreDir = UTIL.getDataTestDirOnTestFS(snapshotName);
251      Scan scan = new Scan().withStartRow(bbb).withStopRow(yyy); // limit the scan
252
253      Configuration conf = UTIL.getConfiguration();
254      Path rootDir = CommonFSUtils.getRootDir(conf);
255
256      TableSnapshotScanner scanner0 =
257        new TableSnapshotScanner(conf, restoreDir, snapshotName, scan);
258      verifyScanner(scanner0, bbb, yyy);
259      scanner0.close();
260
261      // restore snapshot.
262      RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName);
263
264      // scan the snapshot without restoring snapshot
265      TableSnapshotScanner scanner =
266        new TableSnapshotScanner(conf, rootDir, restoreDir, snapshotName, scan, true);
267      verifyScanner(scanner, bbb, yyy);
268      scanner.close();
269
270      // check whether the snapshot has been deleted by the close of scanner.
271      scanner = new TableSnapshotScanner(conf, rootDir, restoreDir, snapshotName, scan, true);
272      verifyScanner(scanner, bbb, yyy);
273      scanner.close();
274
275      // restore snapshot again.
276      RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName);
277
278      // check whether the snapshot has been deleted by the close of scanner.
279      scanner = new TableSnapshotScanner(conf, rootDir, restoreDir, snapshotName, scan, true);
280      verifyScanner(scanner, bbb, yyy);
281      scanner.close();
282    } finally {
283      UTIL.getAdmin().deleteSnapshot(snapshotName);
284      UTIL.deleteTable(tableName);
285    }
286  }
287
288  private void testScanner(HBaseTestingUtil util, String snapshotName, int numRegions,
289    boolean shutdownCluster) throws Exception {
290    TableName tableName = TableName.valueOf("testScanner");
291    try {
292      createTableAndSnapshot(util, tableName, snapshotName, numRegions);
293
294      if (shutdownCluster) {
295        util.shutdownMiniHBaseCluster();
296        clusterUp = false;
297      }
298
299      Path restoreDir = util.getDataTestDirOnTestFS(snapshotName);
300      Scan scan = new Scan().withStartRow(bbb).withStopRow(yyy); // limit the scan
301
302      TableSnapshotScanner scanner =
303        new TableSnapshotScanner(UTIL.getConfiguration(), restoreDir, snapshotName, scan);
304
305      verifyScanner(scanner, bbb, yyy);
306      scanner.close();
307    } finally {
308      if (clusterUp) {
309        util.getAdmin().deleteSnapshot(snapshotName);
310        util.deleteTable(tableName);
311      }
312    }
313  }
314
315  private void verifyScanner(ResultScanner scanner, byte[] startRow, byte[] stopRow)
316    throws IOException, InterruptedException {
317
318    HBaseTestingUtil.SeenRowTracker rowTracker =
319      new HBaseTestingUtil.SeenRowTracker(startRow, stopRow);
320
321    while (true) {
322      Result result = scanner.next();
323      if (result == null) {
324        break;
325      }
326      verifyRow(result);
327      rowTracker.addRow(result.getRow());
328    }
329
330    // validate all rows are seen
331    rowTracker.validate();
332  }
333
334  private static void verifyRow(Result result) throws IOException {
335    byte[] row = result.getRow();
336    CellScanner scanner = result.cellScanner();
337    while (scanner.advance()) {
338      Cell cell = scanner.current();
339
340      // assert that all Cells in the Result have the same key
341      Assert.assertEquals(0, Bytes.compareTo(row, 0, row.length, cell.getRowArray(),
342        cell.getRowOffset(), cell.getRowLength()));
343    }
344
345    for (int j = 0; j < FAMILIES.length; j++) {
346      byte[] actual = result.getValue(FAMILIES[j], FAMILIES[j]);
347      Assert.assertArrayEquals("Row in snapshot does not match, expected:" + Bytes.toString(row)
348        + " ,actual:" + Bytes.toString(actual), row, actual);
349    }
350  }
351
352  @Test
353  public void testMergeRegion() throws Exception {
354    TableName tableName = TableName.valueOf("testMergeRegion");
355    String snapshotName = tableName.getNameAsString() + "_snapshot";
356    Configuration conf = UTIL.getConfiguration();
357    Path rootDir = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir();
358    long timeout = 20000; // 20s
359    try (Admin admin = UTIL.getAdmin()) {
360      List<String> serverList = admin.getRegionServers().stream().map(sn -> sn.getServerName())
361        .collect(Collectors.toList());
362      // create table with 3 regions
363      Table table = UTIL.createTable(tableName, FAMILIES, 1, bbb, yyy, 3);
364      List<RegionInfo> regions = admin.getRegions(tableName);
365      Assert.assertEquals(3, regions.size());
366      RegionInfo region0 = regions.get(0);
367      RegionInfo region1 = regions.get(1);
368      RegionInfo region2 = regions.get(2);
369      // put some data in the table
370      UTIL.loadTable(table, FAMILIES);
371      admin.flush(tableName);
372      // wait flush is finished
373      UTIL.waitFor(timeout, () -> {
374        try {
375          Path tableDir = CommonFSUtils.getTableDir(rootDir, tableName);
376          for (RegionInfo region : regions) {
377            Path regionDir = new Path(tableDir, region.getEncodedName());
378            for (Path familyDir : FSUtils.getFamilyDirs(fs, regionDir)) {
379              for (FileStatus fs : fs.listStatus(familyDir)) {
380                if (!fs.getPath().getName().equals(".filelist")) {
381                  return true;
382                }
383              }
384              return false;
385            }
386          }
387          return true;
388        } catch (IOException e) {
389          LOG.warn("Failed check if flush is finished", e);
390          return false;
391        }
392      });
393      // merge 2 regions
394      admin.compactionSwitch(false, serverList);
395      admin.mergeRegionsAsync(region0.getEncodedNameAsBytes(), region1.getEncodedNameAsBytes(),
396        true);
397      UTIL.waitFor(timeout, () -> admin.getRegions(tableName).size() == 2);
398      List<RegionInfo> mergedRegions = admin.getRegions(tableName);
399      RegionInfo mergedRegion =
400        mergedRegions.get(0).getEncodedName().equals(region2.getEncodedName())
401          ? mergedRegions.get(1)
402          : mergedRegions.get(0);
403      // snapshot
404      admin.snapshot(snapshotName, tableName);
405      Assert.assertEquals(1, admin.listSnapshots().size());
406      // major compact
407      admin.compactionSwitch(true, serverList);
408      admin.majorCompactRegion(mergedRegion.getRegionName());
409      // wait until merged region has no reference
410      UTIL.waitFor(timeout, () -> {
411        try {
412          for (RegionServerThread regionServerThread : UTIL.getMiniHBaseCluster()
413            .getRegionServerThreads()) {
414            HRegionServer regionServer = regionServerThread.getRegionServer();
415            for (HRegion subRegion : regionServer.getRegions(tableName)) {
416              if (
417                subRegion.getRegionInfo().getEncodedName().equals(mergedRegion.getEncodedName())
418              ) {
419                regionServer.getCompactedHFilesDischarger().chore();
420              }
421            }
422          }
423          Path tableDir = CommonFSUtils.getTableDir(rootDir, tableName);
424          HRegionFileSystem regionFs = HRegionFileSystem
425            .openRegionFromFileSystem(UTIL.getConfiguration(), fs, tableDir, mergedRegion, true);
426          return !regionFs.hasReferences(admin.getDescriptor(tableName));
427        } catch (IOException e) {
428          LOG.warn("Failed check merged region has no reference", e);
429          return false;
430        }
431      });
432      // run catalog janitor to clean and wait for parent regions are archived
433      UTIL.getMiniHBaseCluster().getMaster().getCatalogJanitor().choreForTesting();
434      UTIL.waitFor(timeout, () -> {
435        try {
436          Path tableDir = CommonFSUtils.getTableDir(rootDir, tableName);
437          for (FileStatus fileStatus : fs.listStatus(tableDir)) {
438            String name = fileStatus.getPath().getName();
439            if (name.equals(region0.getEncodedName()) || name.equals(region1.getEncodedName())) {
440              return false;
441            }
442          }
443          return true;
444        } catch (IOException e) {
445          LOG.warn("Check if parent regions are archived error", e);
446          return false;
447        }
448      });
449      // set file modify time and then run cleaner
450      long time = EnvironmentEdgeManager.currentTime() - TimeToLiveHFileCleaner.DEFAULT_TTL * 1000;
451      traverseAndSetFileTime(HFileArchiveUtil.getArchivePath(conf), time);
452      UTIL.getMiniHBaseCluster().getMaster().getHFileCleaner().triggerCleanerNow().get();
453      // scan snapshot
454      try (TableSnapshotScanner scanner =
455        new TableSnapshotScanner(conf, UTIL.getDataTestDirOnTestFS(snapshotName), snapshotName,
456          new Scan().withStartRow(bbb).withStopRow(yyy))) {
457        verifyScanner(scanner, bbb, yyy);
458      }
459    } catch (Exception e) {
460      LOG.error("scan snapshot error", e);
461      Assert.fail("Should not throw Exception: " + e.getMessage());
462    }
463  }
464
465  @Test
466  public void testDeleteTableWithMergedRegions() throws Exception {
467    final TableName tableName = TableName.valueOf(this.name.getMethodName());
468    String snapshotName = tableName.getNameAsString() + "_snapshot";
469    Configuration conf = UTIL.getConfiguration();
470    try (Admin admin = UTIL.getConnection().getAdmin()) {
471      // disable compaction
472      admin.compactionSwitch(false,
473        admin.getRegionServers().stream().map(s -> s.getServerName()).collect(Collectors.toList()));
474      // create table
475      Table table = UTIL.createTable(tableName, FAMILIES, 1, bbb, yyy, 3);
476      List<RegionInfo> regions = admin.getRegions(tableName);
477      Assert.assertEquals(3, regions.size());
478      // write some data
479      UTIL.loadTable(table, FAMILIES);
480      // merge region
481      admin.mergeRegionsAsync(new byte[][] { regions.get(0).getEncodedNameAsBytes(),
482        regions.get(1).getEncodedNameAsBytes() }, false).get();
483      regions = admin.getRegions(tableName);
484      Assert.assertEquals(2, regions.size());
485      // snapshot
486      admin.snapshot(snapshotName, tableName);
487      // verify snapshot
488      try (TableSnapshotScanner scanner =
489        new TableSnapshotScanner(conf, UTIL.getDataTestDirOnTestFS(snapshotName), snapshotName,
490          new Scan().withStartRow(bbb).withStopRow(yyy))) {
491        verifyScanner(scanner, bbb, yyy);
492      }
493      // drop table
494      admin.disableTable(tableName);
495      admin.deleteTable(tableName);
496      // verify snapshot
497      try (TableSnapshotScanner scanner =
498        new TableSnapshotScanner(conf, UTIL.getDataTestDirOnTestFS(snapshotName), snapshotName,
499          new Scan().withStartRow(bbb).withStopRow(yyy))) {
500        verifyScanner(scanner, bbb, yyy);
501      }
502    }
503  }
504
505  private void traverseAndSetFileTime(Path path, long time) throws IOException {
506    fs.setTimes(path, time, -1);
507    if (fs.isDirectory(path)) {
508      List<FileStatus> allPaths = Arrays.asList(fs.listStatus(path));
509      List<FileStatus> subDirs =
510        allPaths.stream().filter(FileStatus::isDirectory).collect(Collectors.toList());
511      List<FileStatus> files =
512        allPaths.stream().filter(FileStatus::isFile).collect(Collectors.toList());
513      for (FileStatus subDir : subDirs) {
514        traverseAndSetFileTime(subDir.getPath(), time);
515      }
516      for (FileStatus file : files) {
517        fs.setTimes(file.getPath(), time, -1);
518      }
519    }
520  }
521}