001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.snapshot;
019
020import java.io.IOException;
021import java.util.Collection;
022import java.util.HashMap;
023import java.util.HashSet;
024import java.util.List;
025import java.util.Map;
026import java.util.Set;
027import java.util.Timer;
028import java.util.TimerTask;
029import java.util.concurrent.locks.Lock;
030import org.apache.commons.lang3.ArrayUtils;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.fs.FileStatus;
033import org.apache.hadoop.fs.FileSystem;
034import org.apache.hadoop.fs.Path;
035import org.apache.hadoop.hbase.Stoppable;
036import org.apache.hadoop.hbase.snapshot.CorruptedSnapshotException;
037import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
038import org.apache.hadoop.hbase.util.CommonFSUtils;
039import org.apache.yetus.audience.InterfaceAudience;
040import org.apache.yetus.audience.InterfaceStability;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043
044import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
045
046/**
047 * Intelligently keep track of all the files for all the snapshots.
048 * <p>
049 * A cache of files is kept to avoid querying the {@link FileSystem} frequently. If there is a cache
050 * miss the directory modification time is used to ensure that we don't rescan directories that we
051 * already have in cache. We only check the modification times of the snapshot directories
052 * (/hbase/.snapshot/[snapshot_name]) to determine if the files need to be loaded into the cache.
053 * <p>
054 * New snapshots will be added to the cache and deleted snapshots will be removed when we refresh
055 * the cache. If the files underneath a snapshot directory are changed, but not the snapshot itself,
056 * we will ignore updates to that snapshot's files.
057 * <p>
058 * This is sufficient because each snapshot has its own directory and is added via an atomic rename
059 * <i>once</i>, when the snapshot is created. We don't need to worry about the data in the snapshot
060 * being run.
061 * <p>
062 * Further, the cache is periodically refreshed ensure that files in snapshots that were deleted are
063 * also removed from the cache.
064 * <p>
065 * A {@link SnapshotFileCache.SnapshotFileInspector} must be passed when creating <tt>this</tt> to
066 * allow extraction of files under /hbase/.snapshot/[snapshot name] directory, for each snapshot.
067 * This allows you to only cache files under, for instance, all the logs in the .logs directory or
068 * all the files under all the regions.
069 * <p>
070 * <tt>this</tt> also considers all running snapshots (those under /hbase/.snapshot/.tmp) as valid
071 * snapshots and will attempt to cache files from those snapshots as well.
072 * <p>
073 * Queries about a given file are thread-safe with respect to multiple queries and cache refreshes.
074 */
075@InterfaceAudience.Private
076@InterfaceStability.Evolving
077public class SnapshotFileCache implements Stoppable {
078  interface SnapshotFileInspector {
079    /**
080     * Returns a collection of file names needed by the snapshot.
081     * @param fs {@link FileSystem} where snapshot mainifest files are stored
082     * @param snapshotDir {@link Path} to the snapshot directory to scan.
083     * @return the collection of file names needed by the snapshot.
084     */
085    Collection<String> filesUnderSnapshot(final FileSystem fs, final Path snapshotDir)
086      throws IOException;
087  }
088
089  private static final Logger LOG = LoggerFactory.getLogger(SnapshotFileCache.class);
090  private volatile boolean stop = false;
091  private final FileSystem fs, workingFs;
092  private final SnapshotFileInspector fileInspector;
093  private final Path snapshotDir, workingSnapshotDir;
094  private final Set<String> cache = new HashSet<>();
095  /**
096   * This is a helper map of information about the snapshot directories so we don't need to rescan
097   * them if they haven't changed since the last time we looked.
098   */
099  private final Map<String, SnapshotDirectoryInfo> snapshots = new HashMap<>();
100  private final Timer refreshTimer;
101
102  /**
103   * Create a snapshot file cache for all snapshots under the specified [root]/.snapshot on the
104   * filesystem.
105   * <p>
106   * Immediately loads the file cache.
107   * @param conf to extract the configured {@link FileSystem} where the snapshots are stored and
108   *          hbase root directory
109   * @param cacheRefreshPeriod frequency (ms) with which the cache should be refreshed
110   * @param cacheRefreshDelay amount of time to wait for the cache to be refreshed
111   * @param refreshThreadName name of the cache refresh thread
112   * @param inspectSnapshotFiles Filter to apply to each snapshot to extract the files.
113   * @throws IOException if the {@link FileSystem} or root directory cannot be loaded
114   */
115  public SnapshotFileCache(Configuration conf, long cacheRefreshPeriod, long cacheRefreshDelay,
116    String refreshThreadName, SnapshotFileInspector inspectSnapshotFiles) throws IOException {
117    this(CommonFSUtils.getCurrentFileSystem(conf), CommonFSUtils.getRootDir(conf),
118      SnapshotDescriptionUtils.getWorkingSnapshotDir(CommonFSUtils.getRootDir(conf), conf).
119        getFileSystem(conf),
120      SnapshotDescriptionUtils.getWorkingSnapshotDir(CommonFSUtils.getRootDir(conf), conf),
121      cacheRefreshPeriod, cacheRefreshDelay, refreshThreadName, inspectSnapshotFiles);
122  }
123
124  /**
125   * Create a snapshot file cache for all snapshots under the specified [root]/.snapshot on the
126   * filesystem
127   * @param fs {@link FileSystem} where the snapshots are stored
128   * @param rootDir hbase root directory
129   * @param workingFs {@link FileSystem} where ongoing snapshot mainifest files are stored
130   * @param workingDir Location to store ongoing snapshot manifest files
131   * @param cacheRefreshPeriod period (ms) with which the cache should be refreshed
132   * @param cacheRefreshDelay amount of time to wait for the cache to be refreshed
133   * @param refreshThreadName name of the cache refresh thread
134   * @param inspectSnapshotFiles Filter to apply to each snapshot to extract the files.
135   */
136  public SnapshotFileCache(FileSystem fs, Path rootDir, FileSystem workingFs, Path workingDir,
137    long cacheRefreshPeriod, long cacheRefreshDelay, String refreshThreadName,
138    SnapshotFileInspector inspectSnapshotFiles) {
139    this.fs = fs;
140    this.workingFs = workingFs;
141    this.workingSnapshotDir = workingDir;
142    this.fileInspector = inspectSnapshotFiles;
143    this.snapshotDir = SnapshotDescriptionUtils.getSnapshotsDir(rootDir);
144    // periodically refresh the file cache to make sure we aren't superfluously saving files.
145    this.refreshTimer = new Timer(refreshThreadName, true);
146    this.refreshTimer.scheduleAtFixedRate(new RefreshCacheTask(), cacheRefreshDelay,
147      cacheRefreshPeriod);
148  }
149
150  /**
151   * Trigger a cache refresh, even if its before the next cache refresh. Does not affect pending
152   * cache refreshes.
153   * <p/>
154   * Blocks until the cache is refreshed.
155   * <p/>
156   * Exposed for TESTING.
157   */
158  public synchronized void triggerCacheRefreshForTesting() {
159    try {
160      refreshCache();
161    } catch (IOException e) {
162      LOG.warn("Failed to refresh snapshot hfile cache!", e);
163    }
164    LOG.debug("Current cache:" + cache);
165  }
166
167  /**
168   * Check to see if any of the passed file names is contained in any of the snapshots. First checks
169   * an in-memory cache of the files to keep. If its not in the cache, then the cache is refreshed
170   * and the cache checked again for that file. This ensures that we never return files that exist.
171   * <p>
172   * Note this may lead to periodic false positives for the file being referenced. Periodically, the
173   * cache is refreshed even if there are no requests to ensure that the false negatives get removed
174   * eventually. For instance, suppose you have a file in the snapshot and it gets loaded into the
175   * cache. Then at some point later that snapshot is deleted. If the cache has not been refreshed
176   * at that point, cache will still think the file system contains that file and return
177   * <tt>true</tt>, even if it is no longer present (false positive). However, if the file never was
178   * on the filesystem, we will never find it and always return <tt>false</tt>.
179   * @param files file to check, NOTE: Relies that files are loaded from hdfs before method is
180   *          called (NOT LAZY)
181   * @return <tt>unReferencedFiles</tt> the collection of files that do not have snapshot references
182   * @throws IOException if there is an unexpected error reaching the filesystem.
183   */
184  // XXX this is inefficient to synchronize on the method, when what we really need to guard against
185  // is an illegal access to the cache. Really we could do a mutex-guarded pointer swap on the
186  // cache, but that seems overkill at the moment and isn't necessarily a bottleneck.
187  public synchronized Iterable<FileStatus> getUnreferencedFiles(Iterable<FileStatus> files,
188      final SnapshotManager snapshotManager) throws IOException {
189    List<FileStatus> unReferencedFiles = Lists.newArrayList();
190    List<String> snapshotsInProgress = null;
191    boolean refreshed = false;
192    Lock lock = null;
193    if (snapshotManager != null) {
194      lock = snapshotManager.getTakingSnapshotLock().writeLock();
195    }
196    if (lock == null || lock.tryLock()) {
197      try {
198        if (snapshotManager != null && snapshotManager.isTakingAnySnapshot()) {
199          LOG.warn("Not checking unreferenced files since snapshot is running, it will " +
200            "skip to clean the HFiles this time");
201          return unReferencedFiles;
202        }
203        for (FileStatus file : files) {
204          String fileName = file.getPath().getName();
205          if (!refreshed && !cache.contains(fileName)) {
206            refreshCache();
207            refreshed = true;
208          }
209          if (cache.contains(fileName)) {
210            continue;
211          }
212          if (snapshotsInProgress == null) {
213            snapshotsInProgress = getSnapshotsInProgress();
214          }
215          if (snapshotsInProgress.contains(fileName)) {
216            continue;
217          }
218          unReferencedFiles.add(file);
219        }
220      } finally {
221        if (lock != null) {
222          lock.unlock();
223        }
224      }
225    }
226    return unReferencedFiles;
227  }
228
229  private void refreshCache() throws IOException {
230    // just list the snapshot directory directly, do not check the modification time for the root
231    // snapshot directory, as some file system implementations do not modify the parent directory's
232    // modTime when there are new sub items, for example, S3.
233    FileStatus[] snapshotDirs = CommonFSUtils.listStatus(fs, snapshotDir,
234      p -> !p.getName().equals(SnapshotDescriptionUtils.SNAPSHOT_TMP_DIR_NAME));
235
236    // clear the cache, as in the below code, either we will also clear the snapshots, or we will
237    // refill the file name cache again.
238    this.cache.clear();
239    if (ArrayUtils.isEmpty(snapshotDirs)) {
240      // remove all the remembered snapshots because we don't have any left
241      if (LOG.isDebugEnabled() && this.snapshots.size() > 0) {
242        LOG.debug("No snapshots on-disk, clear cache");
243      }
244      this.snapshots.clear();
245      return;
246    }
247
248    // iterate over all the cached snapshots and see if we need to update some, it is not an
249    // expensive operation if we do not reload the manifest of snapshots.
250    Map<String, SnapshotDirectoryInfo> newSnapshots = new HashMap<>();
251    for (FileStatus snapshotDir : snapshotDirs) {
252      String name = snapshotDir.getPath().getName();
253      SnapshotDirectoryInfo files = this.snapshots.remove(name);
254      // if we don't know about the snapshot or its been modified, we need to update the
255      // files the latter could occur where I create a snapshot, then delete it, and then make a
256      // new snapshot with the same name. We will need to update the cache the information from
257      // that new snapshot, even though it has the same name as the files referenced have
258      // probably changed.
259      if (files == null || files.hasBeenModified(snapshotDir.getModificationTime())) {
260        Collection<String> storedFiles = fileInspector.filesUnderSnapshot(fs,
261          snapshotDir.getPath());
262        files = new SnapshotDirectoryInfo(snapshotDir.getModificationTime(), storedFiles);
263      }
264      // add all the files to cache
265      this.cache.addAll(files.getFiles());
266      newSnapshots.put(name, files);
267    }
268    // set the snapshots we are tracking
269    this.snapshots.clear();
270    this.snapshots.putAll(newSnapshots);
271  }
272
273  List<String> getSnapshotsInProgress() throws IOException {
274    List<String> snapshotInProgress = Lists.newArrayList();
275    // only add those files to the cache, but not to the known snapshots
276
277    FileStatus[] snapshotsInProgress = CommonFSUtils.listStatus(this.workingFs, this.workingSnapshotDir);
278
279    if (!ArrayUtils.isEmpty(snapshotsInProgress)) {
280      for (FileStatus snapshot : snapshotsInProgress) {
281        try {
282          snapshotInProgress.addAll(fileInspector.filesUnderSnapshot(workingFs,
283            snapshot.getPath()));
284        } catch (CorruptedSnapshotException cse) {
285          LOG.info("Corrupted in-progress snapshot file exception, ignored.", cse);
286        }
287      }
288    }
289    return snapshotInProgress;
290  }
291
292  /**
293   * Simple helper task that just periodically attempts to refresh the cache
294   */
295  public class RefreshCacheTask extends TimerTask {
296    @Override
297    public void run() {
298      synchronized (SnapshotFileCache.this) {
299        try {
300          SnapshotFileCache.this.refreshCache();
301        } catch (IOException e) {
302          LOG.warn("Failed to refresh snapshot hfile cache!", e);
303          // clear all the cached entries if we meet an error
304          cache.clear();
305          snapshots.clear();
306        }
307      }
308
309    }
310  }
311
312  @Override
313  public void stop(String why) {
314    if (!this.stop) {
315      this.stop = true;
316      this.refreshTimer.cancel();
317    }
318
319  }
320
321  @Override
322  public boolean isStopped() {
323    return this.stop;
324  }
325
326  /**
327   * Information about a snapshot directory
328   */
329  private static class SnapshotDirectoryInfo {
330    long lastModified;
331    Collection<String> files;
332
333    public SnapshotDirectoryInfo(long mtime, Collection<String> files) {
334      this.lastModified = mtime;
335      this.files = files;
336    }
337
338    /**
339     * @return the hfiles in the snapshot when <tt>this</tt> was made.
340     */
341    public Collection<String> getFiles() {
342      return this.files;
343    }
344
345    /**
346     * Check if the snapshot directory has been modified
347     * @param mtime current modification time of the directory
348     * @return <tt>true</tt> if it the modification time of the directory is newer time when we
349     *         created <tt>this</tt>
350     */
351    public boolean hasBeenModified(long mtime) {
352      return this.lastModified < mtime;
353    }
354  }
355}