001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.snapshot;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Collections;
023import java.util.List;
024import java.util.concurrent.atomic.AtomicBoolean;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.fs.FileStatus;
027import org.apache.hadoop.fs.FileSystem;
028import org.apache.hadoop.fs.Path;
029import org.apache.hadoop.hbase.HBaseClassTestRule;
030import org.apache.hadoop.hbase.HBaseTestingUtility;
031import org.apache.hadoop.hbase.TableName;
032import org.apache.hadoop.hbase.TableNameTestRule;
033import org.apache.hadoop.hbase.client.Put;
034import org.apache.hadoop.hbase.client.Table;
035import org.apache.hadoop.hbase.master.HMaster;
036import org.apache.hadoop.hbase.master.snapshot.SnapshotHFileCleaner;
037import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
038import org.apache.hadoop.hbase.testclassification.MediumTests;
039import org.apache.hadoop.hbase.util.Bytes;
040import org.apache.hadoop.hbase.util.CommonFSUtils;
041import org.apache.hadoop.hbase.util.FSVisitor;
042import org.junit.AfterClass;
043import org.junit.Assert;
044import org.junit.BeforeClass;
045import org.junit.ClassRule;
046import org.junit.Rule;
047import org.junit.Test;
048import org.junit.experimental.categories.Category;
049import org.slf4j.Logger;
050import org.slf4j.LoggerFactory;
051
052import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
053import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
054
055/**
056 * Test Case for HBASE-21387
057 */
058@Category({ MediumTests.class })
059public class TestSnapshotWhenChoreCleaning {
060
061  @ClassRule
062  public static final HBaseClassTestRule CLASS_RULE =
063    HBaseClassTestRule.forClass(TestSnapshotWhenChoreCleaning.class);
064
065  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
066  private static final Configuration CONF = TEST_UTIL.getConfiguration();
067  private static final Logger LOG = LoggerFactory.getLogger(TestSnapshotClientRetries.class);
068  private static final TableName TABLE_NAME = TableName.valueOf("testTable");
069  private static final int MAX_SPLIT_KEYS_NUM = 100;
070  private static final byte[] FAMILY = Bytes.toBytes("family");
071  private static final byte[] QUALIFIER = Bytes.toBytes("qualifier");
072  private static final byte[] VALUE = Bytes.toBytes("value");
073  private static Table TABLE;
074
075  @Rule
076  public TableNameTestRule testTable = new TableNameTestRule();
077
078  @BeforeClass
079  public static void setUp() throws Exception {
080    // Set the hbase.snapshot.thread.pool.max to 1;
081    CONF.setInt("hbase.snapshot.thread.pool.max", 1);
082    // Enable snapshot
083    CONF.setBoolean(SnapshotManager.HBASE_SNAPSHOT_ENABLED, true);
084    // Start MiniCluster.
085    TEST_UTIL.startMiniCluster(3);
086    // Create talbe
087    createTable();
088  }
089
090  private static byte[] integerToBytes(int i) {
091    return Bytes.toBytes(String.format("%06d", i));
092  }
093
094  private static void createTable() throws IOException {
095    byte[][] splitKeys = new byte[MAX_SPLIT_KEYS_NUM][];
096    for (int i = 0; i < splitKeys.length; i++) {
097      splitKeys[i] = integerToBytes(i);
098    }
099    TABLE = TEST_UTIL.createTable(TABLE_NAME, FAMILY, splitKeys);
100  }
101
102  @AfterClass
103  public static void tearDown() throws Exception {
104    TEST_UTIL.shutdownMiniCluster();
105  }
106
107  private static void loadDataAndFlush() throws IOException {
108    for (int i = 0; i < MAX_SPLIT_KEYS_NUM; i++) {
109      Put put =
110        new Put(integerToBytes(i)).addColumn(FAMILY, QUALIFIER, Bytes.add(VALUE, Bytes.toBytes(i)));
111      TABLE.put(put);
112    }
113    TEST_UTIL.flush(TABLE_NAME);
114  }
115
116  private static List<Path> listHFileNames(final FileSystem fs, final Path tableDir)
117    throws IOException {
118    final List<Path> hfiles = new ArrayList<>();
119    FSVisitor.visitTableStoreFiles(fs, tableDir, (region, family, hfileName) -> {
120      hfiles.add(new Path(new Path(new Path(tableDir, region), family), hfileName));
121    });
122    Collections.sort(hfiles);
123    return hfiles;
124  }
125
126  private static boolean isAnySnapshots(FileSystem fs) throws IOException {
127    Path snapshotDir = SnapshotDescriptionUtils.getSnapshotsDir(CommonFSUtils.getRootDir(CONF));
128    FileStatus[] snapFiles = fs.listStatus(snapshotDir);
129    if (snapFiles.length == 0) {
130      return false;
131    }
132    Path firstPath = snapFiles[0].getPath();
133    LOG.info("firstPath in isAnySnapshots: " + firstPath);
134    if (snapFiles.length == 1 && firstPath.getName().equals(".tmp")) {
135      FileStatus[] tmpSnapFiles = fs.listStatus(firstPath);
136      return tmpSnapFiles != null && tmpSnapFiles.length > 0;
137    }
138    return true;
139  }
140
141  @Test
142  public void testSnapshotWhenSnapshotHFileCleanerRunning() throws Exception {
143    // Load data and flush to generate huge number of HFiles.
144    loadDataAndFlush();
145
146    SnapshotHFileCleaner cleaner = new SnapshotHFileCleaner();
147    cleaner.init(ImmutableMap.of(HMaster.MASTER, TEST_UTIL.getHBaseCluster().getMaster()));
148    cleaner.setConf(CONF);
149
150    FileSystem fs = CommonFSUtils.getCurrentFileSystem(CONF);
151    List<Path> fileNames =
152      listHFileNames(fs, CommonFSUtils.getTableDir(CommonFSUtils.getRootDir(CONF), TABLE_NAME));
153    List<FileStatus> files = new ArrayList<>();
154    for (Path fileName : fileNames) {
155      files.add(fs.getFileStatus(fileName));
156    }
157
158    TEST_UTIL.getAdmin().snapshot("snapshotName_prev", TABLE_NAME);
159    Assert.assertEquals(Lists.newArrayList(cleaner.getDeletableFiles(files)).size(), 0);
160    TEST_UTIL.getAdmin().deleteSnapshot("snapshotName_prev");
161    cleaner.getFileCacheForTesting().triggerCacheRefreshForTesting();
162    Assert.assertEquals(Lists.newArrayList(cleaner.getDeletableFiles(files)).size(), 100);
163
164    Runnable snapshotRunnable = () -> {
165      try {
166        // The thread will be busy on taking snapshot;
167        for (int k = 0; k < 5; k++) {
168          TEST_UTIL.getAdmin().snapshot("snapshotName_" + k, TABLE_NAME);
169        }
170      } catch (Exception e) {
171        LOG.error("Snapshot failed: ", e);
172      }
173    };
174    final AtomicBoolean success = new AtomicBoolean(true);
175    Runnable cleanerRunnable = () -> {
176      try {
177        while (!isAnySnapshots(fs)) {
178          LOG.info("Not found any snapshot, sleep 100ms");
179          Thread.sleep(100);
180        }
181        for (int k = 0; k < 5; k++) {
182          cleaner.getFileCacheForTesting().triggerCacheRefreshForTesting();
183          Iterable<FileStatus> toDeleteFiles = cleaner.getDeletableFiles(files);
184          List<FileStatus> deletableFiles = Lists.newArrayList(toDeleteFiles);
185          LOG.info("Size of deletableFiles is: " + deletableFiles.size());
186          for (int i = 0; i < deletableFiles.size(); i++) {
187            LOG.debug("toDeleteFiles[{}] is: {}", i, deletableFiles.get(i));
188          }
189          if (deletableFiles.size() > 0) {
190            success.set(false);
191          }
192        }
193      } catch (Exception e) {
194        LOG.error("Chore cleaning failed: ", e);
195      }
196    };
197    Thread t1 = new Thread(snapshotRunnable);
198    t1.start();
199    Thread t2 = new Thread(cleanerRunnable);
200    t2.start();
201    t1.join();
202    t2.join();
203    Assert.assertTrue(success.get());
204  }
205}