001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.snapshot;
019
020import java.io.IOException;
021import java.util.Collection;
022import java.util.List;
023import java.util.Timer;
024import java.util.TimerTask;
025import java.util.concurrent.TimeUnit;
026import java.util.concurrent.locks.Lock;
027import org.apache.commons.lang3.ArrayUtils;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.fs.FileStatus;
030import org.apache.hadoop.fs.FileSystem;
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.hbase.Stoppable;
033import org.apache.hadoop.hbase.snapshot.CorruptedSnapshotException;
034import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
035import org.apache.hadoop.hbase.util.CommonFSUtils;
036import org.apache.yetus.audience.InterfaceAudience;
037import org.apache.yetus.audience.InterfaceStability;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040
041import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
042import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;
043import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
044
045/**
046 * Intelligently keep track of all the files for all the snapshots.
047 * <p>
048 * A cache of files is kept to avoid querying the {@link FileSystem} frequently. If there is a cache
049 * miss the directory modification time is used to ensure that we don't rescan directories that we
050 * already have in cache. We only check the modification times of the snapshot directories
051 * (/hbase/.snapshot/[snapshot_name]) to determine if the files need to be loaded into the cache.
052 * <p>
053 * New snapshots will be added to the cache and deleted snapshots will be removed when we refresh
054 * the cache. If the files underneath a snapshot directory are changed, but not the snapshot itself,
055 * we will ignore updates to that snapshot's files.
056 * <p>
057 * This is sufficient because each snapshot has its own directory and is added via an atomic rename
058 * <i>once</i>, when the snapshot is created. We don't need to worry about the data in the snapshot
059 * being run.
060 * <p>
061 * Further, the cache is periodically refreshed ensure that files in snapshots that were deleted are
062 * also removed from the cache.
063 * <p>
064 * A {@link SnapshotFileCache.SnapshotFileInspector} must be passed when creating <tt>this</tt> to
065 * allow extraction of files under /hbase/.snapshot/[snapshot name] directory, for each snapshot.
066 * This allows you to only cache files under, for instance, all the logs in the .logs directory or
067 * all the files under all the regions.
068 * <p>
069 * <tt>this</tt> also considers all running snapshots (those under /hbase/.snapshot/.tmp) as valid
070 * snapshots and will attempt to cache files from those snapshots as well.
071 * <p>
072 * Queries about a given file are thread-safe with respect to multiple queries and cache refreshes.
073 */
074@InterfaceAudience.Private
075@InterfaceStability.Evolving
076public class SnapshotFileCache implements Stoppable {
077  interface SnapshotFileInspector {
078    /**
079     * Returns a collection of file names needed by the snapshot.
080     * @param fs          {@link FileSystem} where snapshot mainifest files are stored
081     * @param snapshotDir {@link Path} to the snapshot directory to scan.
082     * @return the collection of file names needed by the snapshot.
083     */
084    Collection<String> filesUnderSnapshot(final FileSystem fs, final Path snapshotDir)
085      throws IOException;
086  }
087
088  private static final Logger LOG = LoggerFactory.getLogger(SnapshotFileCache.class);
089  private volatile boolean stop = false;
090  private final FileSystem fs, workingFs;
091  private final SnapshotFileInspector fileInspector;
092  private final Path snapshotDir, workingSnapshotDir;
093  private volatile ImmutableSet<String> cache = ImmutableSet.of();
094  /**
095   * This is a helper map of information about the snapshot directories so we don't need to rescan
096   * them if they haven't changed since the last time we looked.
097   */
098  private ImmutableMap<String, SnapshotDirectoryInfo> snapshots = ImmutableMap.of();
099  private final Timer refreshTimer;
100
101  private static final int LOCK_TIMEOUT_MS = 30000;
102
103  /**
104   * Create a snapshot file cache for all snapshots under the specified [root]/.snapshot on the
105   * filesystem.
106   * <p>
107   * Immediately loads the file cache.
108   * @param conf                 to extract the configured {@link FileSystem} where the snapshots
109   *                             are stored and hbase root directory
110   * @param cacheRefreshPeriod   frequency (ms) with which the cache should be refreshed
111   * @param cacheRefreshDelay    amount of time to wait for the cache to be refreshed
112   * @param refreshThreadName    name of the cache refresh thread
113   * @param inspectSnapshotFiles Filter to apply to each snapshot to extract the files.
114   * @throws IOException if the {@link FileSystem} or root directory cannot be loaded
115   */
116  public SnapshotFileCache(Configuration conf, long cacheRefreshPeriod, long cacheRefreshDelay,
117    String refreshThreadName, SnapshotFileInspector inspectSnapshotFiles) throws IOException {
118    this(CommonFSUtils.getCurrentFileSystem(conf), CommonFSUtils.getRootDir(conf),
119      SnapshotDescriptionUtils.getWorkingSnapshotDir(CommonFSUtils.getRootDir(conf), conf)
120        .getFileSystem(conf),
121      SnapshotDescriptionUtils.getWorkingSnapshotDir(CommonFSUtils.getRootDir(conf), conf),
122      cacheRefreshPeriod, cacheRefreshDelay, refreshThreadName, inspectSnapshotFiles);
123  }
124
125  /**
126   * Create a snapshot file cache for all snapshots under the specified [root]/.snapshot on the
127   * filesystem
128   * @param fs                   {@link FileSystem} where the snapshots are stored
129   * @param rootDir              hbase root directory
130   * @param workingFs            {@link FileSystem} where ongoing snapshot mainifest files are
131   *                             stored
132   * @param workingDir           Location to store ongoing snapshot manifest files
133   * @param cacheRefreshPeriod   period (ms) with which the cache should be refreshed
134   * @param cacheRefreshDelay    amount of time to wait for the cache to be refreshed
135   * @param refreshThreadName    name of the cache refresh thread
136   * @param inspectSnapshotFiles Filter to apply to each snapshot to extract the files.
137   */
138  public SnapshotFileCache(FileSystem fs, Path rootDir, FileSystem workingFs, Path workingDir,
139    long cacheRefreshPeriod, long cacheRefreshDelay, String refreshThreadName,
140    SnapshotFileInspector inspectSnapshotFiles) {
141    this.fs = fs;
142    this.workingFs = workingFs;
143    this.workingSnapshotDir = workingDir;
144    this.fileInspector = inspectSnapshotFiles;
145    this.snapshotDir = SnapshotDescriptionUtils.getSnapshotsDir(rootDir);
146    // periodically refresh the file cache to make sure we aren't superfluously saving files.
147    this.refreshTimer = new Timer(refreshThreadName, true);
148    this.refreshTimer.scheduleAtFixedRate(new RefreshCacheTask(), cacheRefreshDelay,
149      cacheRefreshPeriod);
150  }
151
152  /**
153   * Trigger a cache refresh, even if its before the next cache refresh. Does not affect pending
154   * cache refreshes.
155   * <p/>
156   * Blocks until the cache is refreshed.
157   * <p/>
158   * Exposed for TESTING.
159   */
160  public synchronized void triggerCacheRefreshForTesting() {
161    try {
162      refreshCache();
163    } catch (IOException e) {
164      LOG.warn("Failed to refresh snapshot hfile cache!", e);
165    }
166    LOG.debug("Current cache:" + cache);
167  }
168
169  /**
170   * Check to see if any of the passed file names is contained in any of the snapshots. First checks
171   * an in-memory cache of the files to keep. If its not in the cache, then the cache is refreshed
172   * and the cache checked again for that file. This ensures that we never return files that exist.
173   * <p>
174   * Note this may lead to periodic false positives for the file being referenced. Periodically, the
175   * cache is refreshed even if there are no requests to ensure that the false negatives get removed
176   * eventually. For instance, suppose you have a file in the snapshot and it gets loaded into the
177   * cache. Then at some point later that snapshot is deleted. If the cache has not been refreshed
178   * at that point, cache will still think the file system contains that file and return
179   * <tt>true</tt>, even if it is no longer present (false positive). However, if the file never was
180   * on the filesystem, we will never find it and always return <tt>false</tt>.
181   * @param files file to check, NOTE: Relies that files are loaded from hdfs before method is
182   *              called (NOT LAZY)
183   * @return <tt>unReferencedFiles</tt> the collection of files that do not have snapshot references
184   * @throws IOException if there is an unexpected error reaching the filesystem.
185   */
186  // XXX this is inefficient to synchronize on the method, when what we really need to guard against
187  // is an illegal access to the cache. Really we could do a mutex-guarded pointer swap on the
188  // cache, but that seems overkill at the moment and isn't necessarily a bottleneck.
189  public Iterable<FileStatus> getUnreferencedFiles(Iterable<FileStatus> files,
190    final SnapshotManager snapshotManager) throws IOException {
191    List<FileStatus> unReferencedFiles = Lists.newArrayList();
192    List<String> snapshotsInProgress = null;
193    boolean refreshed = false;
194    Lock lock = null;
195    if (snapshotManager != null) {
196      lock = snapshotManager.getTakingSnapshotLock().writeLock();
197    }
198    try {
199      if (lock == null || lock.tryLock(LOCK_TIMEOUT_MS, TimeUnit.MILLISECONDS)) {
200        try {
201          if (snapshotManager != null && snapshotManager.isTakingAnySnapshot()) {
202            LOG.warn("Not checking unreferenced files since snapshot is running, it will "
203              + "skip to clean the HFiles this time");
204            return unReferencedFiles;
205          }
206          ImmutableSet<String> currentCache = cache;
207          for (FileStatus file : files) {
208            String fileName = file.getPath().getName();
209            if (!refreshed && !currentCache.contains(fileName)) {
210              synchronized (this) {
211                refreshCache();
212                currentCache = cache;
213                refreshed = true;
214              }
215            }
216            if (currentCache.contains(fileName)) {
217              continue;
218            }
219            if (snapshotsInProgress == null) {
220              snapshotsInProgress = getSnapshotsInProgress();
221            }
222            if (snapshotsInProgress.contains(fileName)) {
223              continue;
224            }
225            unReferencedFiles.add(file);
226          }
227        } finally {
228          if (lock != null) {
229            lock.unlock();
230          }
231        }
232      } else {
233        LOG.warn("Failed to acquire write lock on taking snapshot after waiting {}ms",
234          LOCK_TIMEOUT_MS);
235      }
236    } catch (InterruptedException e) {
237      LOG.warn("Interrupted while acquiring write lock on taking snapshot");
238      Thread.currentThread().interrupt(); // restore the interrupt flag
239    }
240    return unReferencedFiles;
241  }
242
243  private void refreshCache() throws IOException {
244    // just list the snapshot directory directly, do not check the modification time for the root
245    // snapshot directory, as some file system implementations do not modify the parent directory's
246    // modTime when there are new sub items, for example, S3.
247    FileStatus[] snapshotDirs = CommonFSUtils.listStatus(fs, snapshotDir,
248      p -> !p.getName().equals(SnapshotDescriptionUtils.SNAPSHOT_TMP_DIR_NAME));
249
250    // clear the cache, as in the below code, either we will also clear the snapshots, or we will
251    // refill the file name cache again.
252    if (ArrayUtils.isEmpty(snapshotDirs)) {
253      // remove all the remembered snapshots because we don't have any left
254      if (LOG.isDebugEnabled() && this.snapshots.size() > 0) {
255        LOG.debug("No snapshots on-disk, clear cache");
256      }
257      this.snapshots = ImmutableMap.of();
258      this.cache = ImmutableSet.of();
259      return;
260    }
261
262    ImmutableSet.Builder<String> cacheBuilder = ImmutableSet.builder();
263    ImmutableMap.Builder<String, SnapshotDirectoryInfo> snapshotsBuilder = ImmutableMap.builder();
264    // iterate over all the cached snapshots and see if we need to update some, it is not an
265    // expensive operation if we do not reload the manifest of snapshots.
266    for (FileStatus snapshotDir : snapshotDirs) {
267      String name = snapshotDir.getPath().getName();
268      SnapshotDirectoryInfo files = snapshots.get(name);
269      // if we don't know about the snapshot or its been modified, we need to update the
270      // files the latter could occur where I create a snapshot, then delete it, and then make a
271      // new snapshot with the same name. We will need to update the cache the information from
272      // that new snapshot, even though it has the same name as the files referenced have
273      // probably changed.
274      if (files == null || files.hasBeenModified(snapshotDir.getModificationTime())) {
275        Collection<String> storedFiles =
276          fileInspector.filesUnderSnapshot(fs, snapshotDir.getPath());
277        files = new SnapshotDirectoryInfo(snapshotDir.getModificationTime(), storedFiles);
278      }
279      // add all the files to cache
280      cacheBuilder.addAll(files.getFiles());
281      snapshotsBuilder.put(name, files);
282    }
283    // set the snapshots we are tracking
284    this.snapshots = snapshotsBuilder.build();
285    this.cache = cacheBuilder.build();
286  }
287
288  List<String> getSnapshotsInProgress() throws IOException {
289    List<String> snapshotInProgress = Lists.newArrayList();
290    // only add those files to the cache, but not to the known snapshots
291
292    FileStatus[] snapshotsInProgress =
293      CommonFSUtils.listStatus(this.workingFs, this.workingSnapshotDir);
294
295    if (!ArrayUtils.isEmpty(snapshotsInProgress)) {
296      for (FileStatus snapshot : snapshotsInProgress) {
297        try {
298          snapshotInProgress
299            .addAll(fileInspector.filesUnderSnapshot(workingFs, snapshot.getPath()));
300        } catch (CorruptedSnapshotException cse) {
301          LOG.info("Corrupted in-progress snapshot file exception, ignored.", cse);
302        }
303      }
304    }
305    return snapshotInProgress;
306  }
307
308  /**
309   * Simple helper task that just periodically attempts to refresh the cache
310   */
311  public class RefreshCacheTask extends TimerTask {
312    @Override
313    public void run() {
314      synchronized (SnapshotFileCache.this) {
315        try {
316          SnapshotFileCache.this.refreshCache();
317        } catch (IOException e) {
318          LOG.warn("Failed to refresh snapshot hfile cache!", e);
319          // clear all the cached entries if we meet an error
320          cache = ImmutableSet.of();
321          snapshots = ImmutableMap.of();
322        }
323      }
324    }
325  }
326
327  @Override
328  public void stop(String why) {
329    if (!this.stop) {
330      this.stop = true;
331      this.refreshTimer.cancel();
332    }
333  }
334
335  @Override
336  public boolean isStopped() {
337    return this.stop;
338  }
339
340  /**
341   * Information about a snapshot directory
342   */
343  private static class SnapshotDirectoryInfo {
344    long lastModified;
345    Collection<String> files;
346
347    public SnapshotDirectoryInfo(long mtime, Collection<String> files) {
348      this.lastModified = mtime;
349      this.files = files;
350    }
351
352    /** Returns the hfiles in the snapshot when <tt>this</tt> was made. */
353    public Collection<String> getFiles() {
354      return this.files;
355    }
356
357    /**
358     * Check if the snapshot directory has been modified
359     * @param mtime current modification time of the directory
360     * @return <tt>true</tt> if it the modification time of the directory is newer time when we
361     *         created <tt>this</tt>
362     */
363    public boolean hasBeenModified(long mtime) {
364      return this.lastModified < mtime;
365    }
366  }
367}