019package org.apache.hadoop.hbase.snapshot;
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Collections;
024import java.util.List;
025import java.util.concurrent.atomic.AtomicBoolean;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.fs.FileStatus;
028import org.apache.hadoop.fs.FileSystem;
029import org.apache.hadoop.fs.Path;
030import org.apache.hadoop.hbase.HBaseClassTestRule;
031import org.apache.hadoop.hbase.HBaseTestingUtility;
032import org.apache.hadoop.hbase.TableName;
033import org.apache.hadoop.hbase.TableNameTestRule;
034import org.apache.hadoop.hbase.client.Put;
035import org.apache.hadoop.hbase.client.Table;
036import org.apache.hadoop.hbase.master.HMaster;
037import org.apache.hadoop.hbase.master.snapshot.SnapshotHFileCleaner;
038import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
039import org.apache.hadoop.hbase.testclassification.MediumTests;
040import org.apache.hadoop.hbase.util.Bytes;
041import org.apache.hadoop.hbase.util.CommonFSUtils;
042import org.apache.hadoop.hbase.util.FSVisitor;
043import org.junit.AfterClass;
044import org.junit.Assert;
045import org.junit.BeforeClass;
046import org.junit.ClassRule;
047import org.junit.Rule;
048import org.junit.Test;
049import org.junit.experimental.categories.Category;
050import org.slf4j.Logger;
051import org.slf4j.LoggerFactory;
053import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
054import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
057 * Test Case for HBASE-21387
058 */
059@Category({ MediumTests.class })
060public class TestSnapshotWhenChoreCleaning {
062  @ClassRule
063  public static final HBaseClassTestRule CLASS_RULE =
064      HBaseClassTestRule.forClass(TestSnapshotWhenChoreCleaning.class);
066  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
067  private static final Configuration CONF = TEST_UTIL.getConfiguration();
068  private static final Logger LOG = LoggerFactory.getLogger(TestSnapshotClientRetries.class);
069  private static final TableName TABLE_NAME = TableName.valueOf("testTable");
070  private static final int MAX_SPLIT_KEYS_NUM = 100;
071  private static final byte[] FAMILY = Bytes.toBytes("family");
072  private static final byte[] QUALIFIER = Bytes.toBytes("qualifier");
073  private static final byte[] VALUE = Bytes.toBytes("value");
074  private static Table TABLE;
076  @Rule
077  public TableNameTestRule testTable = new TableNameTestRule();
079  @BeforeClass
080  public static void setUp() throws Exception {
081    // Set the hbase.snapshot.thread.pool.max to 1;
082    CONF.setInt("hbase.snapshot.thread.pool.max", 1);
083    // Enable snapshot
084    CONF.setBoolean(SnapshotManager.HBASE_SNAPSHOT_ENABLED, true);
085    // Start MiniCluster.
086    TEST_UTIL.startMiniCluster(3);
087    // Create talbe
088    createTable();
089  }
091  private static byte[] integerToBytes(int i) {
092    return Bytes.toBytes(String.format("%06d", i));
093  }
095  private static void createTable() throws IOException {
096    byte[][] splitKeys = new byte[MAX_SPLIT_KEYS_NUM][];
097    for (int i = 0; i < splitKeys.length; i++) {
098      splitKeys[i] = integerToBytes(i);
099    }
100    TABLE = TEST_UTIL.createTable(TABLE_NAME, FAMILY, splitKeys);
101  }
103  @AfterClass
104  public static void tearDown() throws Exception {
105    TEST_UTIL.shutdownMiniCluster();
106  }
108  private static void loadDataAndFlush() throws IOException {
109    for (int i = 0; i < MAX_SPLIT_KEYS_NUM; i++) {
110      Put put = new Put(integerToBytes(i)).addColumn(FAMILY, QUALIFIER,
111        Bytes.add(VALUE, Bytes.toBytes(i)));
112      TABLE.put(put);
113    }
114    TEST_UTIL.flush(TABLE_NAME);
115  }
117  private static List<Path> listHFileNames(final FileSystem fs, final Path tableDir)
118      throws IOException {
119    final List<Path> hfiles = new ArrayList<>();
120    FSVisitor.visitTableStoreFiles(fs, tableDir, (region, family, hfileName) -> {
121      hfiles.add(new Path(new Path(new Path(tableDir, region), family), hfileName));
122    });
123    Collections.sort(hfiles);
124    return hfiles;
125  }
127  private static boolean isAnySnapshots(FileSystem fs) throws IOException {
128    Path snapshotDir = SnapshotDescriptionUtils.getSnapshotsDir(CommonFSUtils.getRootDir(CONF));
129    FileStatus[] snapFiles = fs.listStatus(snapshotDir);
130    if (snapFiles.length == 0) {
131      return false;
132    }
133    Path firstPath = snapFiles[0].getPath();
134    LOG.info("firstPath in isAnySnapshots: " + firstPath);
135    if (snapFiles.length == 1 && firstPath.getName().equals(".tmp")) {
136      FileStatus[] tmpSnapFiles = fs.listStatus(firstPath);
137      return tmpSnapFiles != null && tmpSnapFiles.length > 0;
138    }
139    return true;
140  }
142  @Test
143  public void testSnapshotWhenSnapshotHFileCleanerRunning() throws Exception {
144    // Load data and flush to generate huge number of HFiles.
145    loadDataAndFlush();
147    SnapshotHFileCleaner cleaner = new SnapshotHFileCleaner();
148    cleaner.init(ImmutableMap.of(HMaster.MASTER, TEST_UTIL.getHBaseCluster().getMaster()));
149    cleaner.setConf(CONF);
151    FileSystem fs = CommonFSUtils.getCurrentFileSystem(CONF);
152    List<Path> fileNames =
153        listHFileNames(fs, CommonFSUtils.getTableDir(CommonFSUtils.getRootDir(CONF), TABLE_NAME));
154    List<FileStatus> files = new ArrayList<>();
155    for (Path fileName : fileNames) {
156      files.add(fs.getFileStatus(fileName));
157    }
159    TEST_UTIL.getAdmin().snapshot("snapshotName_prev", TABLE_NAME);
160    Assert.assertEquals(Lists.newArrayList(cleaner.getDeletableFiles(files)).size(), 0);
161    TEST_UTIL.getAdmin().deleteSnapshot("snapshotName_prev");
162    cleaner.getFileCacheForTesting().triggerCacheRefreshForTesting();
163    Assert.assertEquals(Lists.newArrayList(cleaner.getDeletableFiles(files)).size(), 100);
165    Runnable snapshotRunnable = () -> {
166      try {
167        // The thread will be busy on taking snapshot;
168        for (int k = 0; k < 5; k++) {
169          TEST_UTIL.getAdmin().snapshot("snapshotName_" + k, TABLE_NAME);
170        }
171      } catch (Exception e) {
172        LOG.error("Snapshot failed: ", e);
173      }
174    };
175    final AtomicBoolean success = new AtomicBoolean(true);
176    Runnable cleanerRunnable = () -> {
177      try {
178        while (!isAnySnapshots(fs)) {
179          LOG.info("Not found any snapshot, sleep 100ms");
180          Thread.sleep(100);
181        }
182        for (int k = 0; k < 5; k++) {
183          cleaner.getFileCacheForTesting().triggerCacheRefreshForTesting();
184          Iterable<FileStatus> toDeleteFiles = cleaner.getDeletableFiles(files);
185          List<FileStatus> deletableFiles = Lists.newArrayList(toDeleteFiles);
186          LOG.info("Size of deletableFiles is: " + deletableFiles.size());
187          for (int i = 0; i < deletableFiles.size(); i++) {
188            LOG.debug("toDeleteFiles[{}] is: {}", i, deletableFiles.get(i));
189          }
190          if (deletableFiles.size() > 0) {
191            success.set(false);
192          }
193        }
194      } catch (Exception e) {
195        LOG.error("Chore cleaning failed: ", e);
196      }
197    };
198    Thread t1 = new Thread(snapshotRunnable);
199    t1.start();
200    Thread t2 = new Thread(cleanerRunnable);
201    t2.start();
202    t1.join();
203    t2.join();
204    Assert.assertTrue(success.get());
205  }