001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.snapshot;
019
020import java.io.IOException;
021import java.util.Collection;
022import java.util.List;
023import java.util.Timer;
024import java.util.TimerTask;
025import java.util.concurrent.TimeUnit;
026import java.util.concurrent.locks.Lock;
027import org.apache.commons.lang3.ArrayUtils;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.fs.FileStatus;
030import org.apache.hadoop.fs.FileSystem;
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.hbase.Stoppable;
033import org.apache.hadoop.hbase.snapshot.CorruptedSnapshotException;
034import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
035import org.apache.hadoop.hbase.util.CommonFSUtils;
036import org.apache.yetus.audience.InterfaceAudience;
037import org.apache.yetus.audience.InterfaceStability;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040
041import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
042import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;
043import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
044
045/**
046 * Intelligently keep track of all the files for all the snapshots.
047 * <p>
048 * A cache of files is kept to avoid querying the {@link FileSystem} frequently. If there is a cache
049 * miss the directory modification time is used to ensure that we don't rescan directories that we
050 * already have in cache. We only check the modification times of the snapshot directories
051 * (/hbase/.snapshot/[snapshot_name]) to determine if the files need to be loaded into the cache.
052 * <p>
053 * New snapshots will be added to the cache and deleted snapshots will be removed when we refresh
054 * the cache. If the files underneath a snapshot directory are changed, but not the snapshot itself,
055 * we will ignore updates to that snapshot's files.
056 * <p>
057 * This is sufficient because each snapshot has its own directory and is added via an atomic rename
058 * <i>once</i>, when the snapshot is created. We don't need to worry about the data in the snapshot
059 * being run.
060 * <p>
061 * Further, the cache is periodically refreshed ensure that files in snapshots that were deleted are
062 * also removed from the cache.
063 * <p>
064 * A {@link SnapshotFileCache.SnapshotFileInspector} must be passed when creating <tt>this</tt> to
065 * allow extraction of files under /hbase/.snapshot/[snapshot name] directory, for each snapshot.
066 * This allows you to only cache files under, for instance, all the logs in the .logs directory or
067 * all the files under all the regions.
068 * <p>
069 * <tt>this</tt> also considers all running snapshots (those under /hbase/.snapshot/.tmp) as valid
070 * snapshots and will attempt to cache files from those snapshots as well.
071 * <p>
072 * Queries about a given file are thread-safe with respect to multiple queries and cache refreshes.
073 */
074@InterfaceAudience.Private
075@InterfaceStability.Evolving
076public class SnapshotFileCache implements Stoppable {
077  interface SnapshotFileInspector {
078    /**
079     * Returns a collection of file names needed by the snapshot.
080     * @param fs          {@link FileSystem} where snapshot mainifest files are stored
081     * @param snapshotDir {@link Path} to the snapshot directory to scan.
082     * @return the collection of file names needed by the snapshot.
083     */
084    Collection<String> filesUnderSnapshot(final FileSystem fs, final Path snapshotDir)
085      throws IOException;
086  }
087
088  private static final Logger LOG = LoggerFactory.getLogger(SnapshotFileCache.class);
089  private volatile boolean stop = false;
090  private final FileSystem fs, workingFs;
091  private final SnapshotFileInspector fileInspector;
092  private final Path snapshotDir, workingSnapshotDir;
093  private volatile ImmutableSet<String> cache = ImmutableSet.of();
094  /**
095   * This is a helper map of information about the snapshot directories so we don't need to rescan
096   * them if they haven't changed since the last time we looked.
097   */
098  private ImmutableMap<String, SnapshotDirectoryInfo> snapshots = ImmutableMap.of();
099  private final Timer refreshTimer;
100
101  private static final int LOCK_TIMEOUT_MS = 30000;
102
103  /**
104   * Create a snapshot file cache for all snapshots under the specified [root]/.snapshot on the
105   * filesystem.
106   * <p>
107   * Immediately loads the file cache.
108   * @param conf                 to extract the configured {@link FileSystem} where the snapshots
109   *                             are stored and hbase root directory
110   * @param cacheRefreshPeriod   frequency (ms) with which the cache should be refreshed
111   * @param cacheRefreshDelay    amount of time to wait for the cache to be refreshed
112   * @param refreshThreadName    name of the cache refresh thread
113   * @param inspectSnapshotFiles Filter to apply to each snapshot to extract the files.
114   * @throws IOException if the {@link FileSystem} or root directory cannot be loaded
115   */
116  public SnapshotFileCache(Configuration conf, long cacheRefreshPeriod, long cacheRefreshDelay,
117    String refreshThreadName, SnapshotFileInspector inspectSnapshotFiles) throws IOException {
118    this(CommonFSUtils.getCurrentFileSystem(conf), CommonFSUtils.getRootDir(conf),
119      SnapshotDescriptionUtils.getWorkingSnapshotDir(CommonFSUtils.getRootDir(conf), conf)
120        .getFileSystem(conf),
121      SnapshotDescriptionUtils.getWorkingSnapshotDir(CommonFSUtils.getRootDir(conf), conf),
122      cacheRefreshPeriod, cacheRefreshDelay, refreshThreadName, inspectSnapshotFiles);
123  }
124
125  /**
126   * Create a snapshot file cache for all snapshots under the specified [root]/.snapshot on the
127   * filesystem
128   * @param fs                   {@link FileSystem} where the snapshots are stored
129   * @param rootDir              hbase root directory
130   * @param workingFs            {@link FileSystem} where ongoing snapshot mainifest files are
131   *                             stored
132   * @param workingDir           Location to store ongoing snapshot manifest files
133   * @param cacheRefreshPeriod   period (ms) with which the cache should be refreshed
134   * @param cacheRefreshDelay    amount of time to wait for the cache to be refreshed
135   * @param refreshThreadName    name of the cache refresh thread
136   * @param inspectSnapshotFiles Filter to apply to each snapshot to extract the files.
137   */
138  public SnapshotFileCache(FileSystem fs, Path rootDir, FileSystem workingFs, Path workingDir,
139    long cacheRefreshPeriod, long cacheRefreshDelay, String refreshThreadName,
140    SnapshotFileInspector inspectSnapshotFiles) {
141    this.fs = fs;
142    this.workingFs = workingFs;
143    this.workingSnapshotDir = workingDir;
144    this.fileInspector = inspectSnapshotFiles;
145    this.snapshotDir = SnapshotDescriptionUtils.getSnapshotsDir(rootDir);
146    // periodically refresh the file cache to make sure we aren't superfluously saving files.
147    this.refreshTimer = new Timer(refreshThreadName, true);
148    this.refreshTimer.scheduleAtFixedRate(new RefreshCacheTask(), cacheRefreshDelay,
149      cacheRefreshPeriod);
150  }
151
152  /**
153   * Trigger a cache refresh, even if its before the next cache refresh. Does not affect pending
154   * cache refreshes.
155   * <p/>
156   * Blocks until the cache is refreshed.
157   * <p/>
158   * Exposed for TESTING.
159   */
160  public synchronized void triggerCacheRefreshForTesting() {
161    try {
162      refreshCache();
163    } catch (IOException e) {
164      LOG.warn("Failed to refresh snapshot hfile cache!", e);
165    }
166    LOG.debug("Current cache:" + cache);
167  }
168
169  /**
170   * Check to see if any of the passed file names is contained in any of the snapshots. First checks
171   * an in-memory cache of the files to keep. If its not in the cache, then the cache is refreshed
172   * and the cache checked again for that file. This ensures that we never return files that exist.
173   * <p>
174   * Note this may lead to periodic false positives for the file being referenced. Periodically, the
175   * cache is refreshed even if there are no requests to ensure that the false negatives get removed
176   * eventually. For instance, suppose you have a file in the snapshot and it gets loaded into the
177   * cache. Then at some point later that snapshot is deleted. If the cache has not been refreshed
178   * at that point, cache will still think the file system contains that file and return
179   * <tt>true</tt>, even if it is no longer present (false positive). However, if the file never was
180   * on the filesystem, we will never find it and always return <tt>false</tt>.
181   * @param files file to check
182   * @return <tt>unReferencedFiles</tt> the collection of files that do not have snapshot references
183   * @throws IOException if there is an unexpected error reaching the filesystem.
184   */
185  public Iterable<FileStatus> getUnreferencedFiles(List<FileStatus> files,
186    final SnapshotManager snapshotManager) throws IOException {
187    List<FileStatus> unReferencedFiles = Lists.newArrayList();
188    List<String> snapshotsInProgress = null;
189    boolean refreshed = false;
190    Lock lock = null;
191    if (snapshotManager != null) {
192      lock = snapshotManager.getTakingSnapshotLock().writeLock();
193    }
194    try {
195      if (lock == null || lock.tryLock(LOCK_TIMEOUT_MS, TimeUnit.MILLISECONDS)) {
196        try {
197          if (snapshotManager != null && snapshotManager.isTakingAnySnapshot()) {
198            LOG.warn("Not checking unreferenced files since snapshot is running, it will "
199              + "skip to clean the HFiles this time");
200            return unReferencedFiles;
201          }
202          ImmutableSet<String> currentCache = cache;
203          for (FileStatus file : files) {
204            String fileName = file.getPath().getName();
205            if (!refreshed && !currentCache.contains(fileName)) {
206              synchronized (this) {
207                refreshCache();
208                currentCache = cache;
209                refreshed = true;
210              }
211            }
212            if (currentCache.contains(fileName)) {
213              continue;
214            }
215            if (snapshotsInProgress == null) {
216              snapshotsInProgress = getSnapshotsInProgress();
217            }
218            if (snapshotsInProgress.contains(fileName)) {
219              continue;
220            }
221            unReferencedFiles.add(file);
222          }
223        } finally {
224          if (lock != null) {
225            lock.unlock();
226          }
227        }
228      } else {
229        LOG.warn("Failed to acquire write lock on taking snapshot after waiting {}ms",
230          LOCK_TIMEOUT_MS);
231      }
232    } catch (InterruptedException e) {
233      LOG.warn("Interrupted while acquiring write lock on taking snapshot");
234      Thread.currentThread().interrupt(); // restore the interrupt flag
235    }
236    return unReferencedFiles;
237  }
238
239  private void refreshCache() throws IOException {
240    // just list the snapshot directory directly, do not check the modification time for the root
241    // snapshot directory, as some file system implementations do not modify the parent directory's
242    // modTime when there are new sub items, for example, S3.
243    FileStatus[] snapshotDirs = CommonFSUtils.listStatus(fs, snapshotDir,
244      p -> !p.getName().equals(SnapshotDescriptionUtils.SNAPSHOT_TMP_DIR_NAME));
245
246    // clear the cache, as in the below code, either we will also clear the snapshots, or we will
247    // refill the file name cache again.
248    if (ArrayUtils.isEmpty(snapshotDirs)) {
249      // remove all the remembered snapshots because we don't have any left
250      if (LOG.isDebugEnabled() && this.snapshots.size() > 0) {
251        LOG.debug("No snapshots on-disk, clear cache");
252      }
253      this.snapshots = ImmutableMap.of();
254      this.cache = ImmutableSet.of();
255      return;
256    }
257
258    ImmutableSet.Builder<String> cacheBuilder = ImmutableSet.builder();
259    ImmutableMap.Builder<String, SnapshotDirectoryInfo> snapshotsBuilder = ImmutableMap.builder();
260    // iterate over all the cached snapshots and see if we need to update some, it is not an
261    // expensive operation if we do not reload the manifest of snapshots.
262    for (FileStatus snapshotDir : snapshotDirs) {
263      String name = snapshotDir.getPath().getName();
264      SnapshotDirectoryInfo files = snapshots.get(name);
265      // if we don't know about the snapshot or its been modified, we need to update the
266      // files the latter could occur where I create a snapshot, then delete it, and then make a
267      // new snapshot with the same name. We will need to update the cache the information from
268      // that new snapshot, even though it has the same name as the files referenced have
269      // probably changed.
270      if (files == null || files.hasBeenModified(snapshotDir.getModificationTime())) {
271        Collection<String> storedFiles =
272          fileInspector.filesUnderSnapshot(fs, snapshotDir.getPath());
273        files = new SnapshotDirectoryInfo(snapshotDir.getModificationTime(), storedFiles);
274      }
275      // add all the files to cache
276      cacheBuilder.addAll(files.getFiles());
277      snapshotsBuilder.put(name, files);
278    }
279    // set the snapshots we are tracking
280    this.snapshots = snapshotsBuilder.build();
281    this.cache = cacheBuilder.build();
282  }
283
284  List<String> getSnapshotsInProgress() throws IOException {
285    List<String> snapshotInProgress = Lists.newArrayList();
286    // only add those files to the cache, but not to the known snapshots
287
288    FileStatus[] snapshotsInProgress =
289      CommonFSUtils.listStatus(this.workingFs, this.workingSnapshotDir);
290
291    if (!ArrayUtils.isEmpty(snapshotsInProgress)) {
292      for (FileStatus snapshot : snapshotsInProgress) {
293        try {
294          snapshotInProgress
295            .addAll(fileInspector.filesUnderSnapshot(workingFs, snapshot.getPath()));
296        } catch (CorruptedSnapshotException cse) {
297          LOG.info("Corrupted in-progress snapshot file exception, ignored.", cse);
298        }
299      }
300    }
301    return snapshotInProgress;
302  }
303
304  /**
305   * Simple helper task that just periodically attempts to refresh the cache
306   */
307  public class RefreshCacheTask extends TimerTask {
308    @Override
309    public void run() {
310      synchronized (SnapshotFileCache.this) {
311        try {
312          SnapshotFileCache.this.refreshCache();
313        } catch (IOException e) {
314          LOG.warn("Failed to refresh snapshot hfile cache!", e);
315          // clear all the cached entries if we meet an error
316          cache = ImmutableSet.of();
317          snapshots = ImmutableMap.of();
318        }
319      }
320    }
321  }
322
323  @Override
324  public void stop(String why) {
325    if (!this.stop) {
326      this.stop = true;
327      this.refreshTimer.cancel();
328    }
329  }
330
331  @Override
332  public boolean isStopped() {
333    return this.stop;
334  }
335
336  /**
337   * Information about a snapshot directory
338   */
339  private static class SnapshotDirectoryInfo {
340    long lastModified;
341    Collection<String> files;
342
343    public SnapshotDirectoryInfo(long mtime, Collection<String> files) {
344      this.lastModified = mtime;
345      this.files = files;
346    }
347
348    /** Returns the hfiles in the snapshot when <tt>this</tt> was made. */
349    public Collection<String> getFiles() {
350      return this.files;
351    }
352
353    /**
354     * Check if the snapshot directory has been modified
355     * @param mtime current modification time of the directory
356     * @return <tt>true</tt> if it the modification time of the directory is newer time when we
357     *         created <tt>this</tt>
358     */
359    public boolean hasBeenModified(long mtime) {
360      return this.lastModified < mtime;
361    }
362  }
363}