001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.snapshot;
019
020import java.io.IOException;
021import java.util.Collection;
022import java.util.HashMap;
023import java.util.HashSet;
024import java.util.List;
025import java.util.Map;
026import java.util.Set;
027import java.util.Timer;
028import java.util.TimerTask;
029import java.util.concurrent.locks.Lock;
030import org.apache.commons.lang3.ArrayUtils;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.fs.FileStatus;
033import org.apache.hadoop.fs.FileSystem;
034import org.apache.hadoop.fs.Path;
035import org.apache.hadoop.hbase.Stoppable;
036import org.apache.hadoop.hbase.snapshot.CorruptedSnapshotException;
037import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
038import org.apache.hadoop.hbase.util.CommonFSUtils;
039import org.apache.yetus.audience.InterfaceAudience;
040import org.apache.yetus.audience.InterfaceStability;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043
044import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
045import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
046
047/**
048 * Intelligently keep track of all the files for all the snapshots.
049 * <p>
050 * A cache of files is kept to avoid querying the {@link FileSystem} frequently. If there is a cache
051 * miss the directory modification time is used to ensure that we don't rescan directories that we
052 * already have in cache. We only check the modification times of the snapshot directories
053 * (/hbase/.snapshot/[snapshot_name]) to determine if the files need to be loaded into the cache.
054 * <p>
055 * New snapshots will be added to the cache and deleted snapshots will be removed when we refresh
056 * the cache. If the files underneath a snapshot directory are changed, but not the snapshot itself,
057 * we will ignore updates to that snapshot's files.
058 * <p>
059 * This is sufficient because each snapshot has its own directory and is added via an atomic rename
060 * <i>once</i>, when the snapshot is created. We don't need to worry about the data in the snapshot
061 * being run.
062 * <p>
063 * Further, the cache is periodically refreshed ensure that files in snapshots that were deleted are
064 * also removed from the cache.
065 * <p>
066 * A {@link SnapshotFileCache.SnapshotFileInspector} must be passed when creating <tt>this</tt> to
067 * allow extraction of files under /hbase/.snapshot/[snapshot name] directory, for each snapshot.
068 * This allows you to only cache files under, for instance, all the logs in the .logs directory or
069 * all the files under all the regions.
070 * <p>
071 * <tt>this</tt> also considers all running snapshots (those under /hbase/.snapshot/.tmp) as valid
072 * snapshots and will attempt to cache files from those snapshots as well.
073 * <p>
074 * Queries about a given file are thread-safe with respect to multiple queries and cache refreshes.
075 */
076@InterfaceAudience.Private
077@InterfaceStability.Evolving
078public class SnapshotFileCache implements Stoppable {
079  interface SnapshotFileInspector {
080    /**
081     * Returns a collection of file names needed by the snapshot.
082     * @param fs {@link FileSystem} where snapshot mainifest files are stored
083     * @param snapshotDir {@link Path} to the snapshot directory to scan.
084     * @return the collection of file names needed by the snapshot.
085     */
086    Collection<String> filesUnderSnapshot(final FileSystem fs, final Path snapshotDir)
087      throws IOException;
088  }
089
090  private static final Logger LOG = LoggerFactory.getLogger(SnapshotFileCache.class);
091  private volatile boolean stop = false;
092  private final FileSystem fs, workingFs;
093  private final SnapshotFileInspector fileInspector;
094  private final Path snapshotDir, workingSnapshotDir;
095  private final Set<String> cache = new HashSet<>();
096  /**
097   * This is a helper map of information about the snapshot directories so we don't need to rescan
098   * them if they haven't changed since the last time we looked.
099   */
100  private final Map<String, SnapshotDirectoryInfo> snapshots = new HashMap<>();
101  private final Timer refreshTimer;
102
103  /**
104   * Create a snapshot file cache for all snapshots under the specified [root]/.snapshot on the
105   * filesystem.
106   * <p>
107   * Immediately loads the file cache.
108   * @param conf to extract the configured {@link FileSystem} where the snapshots are stored and
109   *          hbase root directory
110   * @param cacheRefreshPeriod frequency (ms) with which the cache should be refreshed
111   * @param cacheRefreshDelay amount of time to wait for the cache to be refreshed
112   * @param refreshThreadName name of the cache refresh thread
113   * @param inspectSnapshotFiles Filter to apply to each snapshot to extract the files.
114   * @throws IOException if the {@link FileSystem} or root directory cannot be loaded
115   */
116  public SnapshotFileCache(Configuration conf, long cacheRefreshPeriod, long cacheRefreshDelay,
117    String refreshThreadName, SnapshotFileInspector inspectSnapshotFiles) throws IOException {
118    this(CommonFSUtils.getCurrentFileSystem(conf), CommonFSUtils.getRootDir(conf),
119      SnapshotDescriptionUtils.getWorkingSnapshotDir(CommonFSUtils.getRootDir(conf), conf).
120        getFileSystem(conf),
121      SnapshotDescriptionUtils.getWorkingSnapshotDir(CommonFSUtils.getRootDir(conf), conf),
122      cacheRefreshPeriod, cacheRefreshDelay, refreshThreadName, inspectSnapshotFiles);
123  }
124
125  /**
126   * Create a snapshot file cache for all snapshots under the specified [root]/.snapshot on the
127   * filesystem
128   * @param fs {@link FileSystem} where the snapshots are stored
129   * @param rootDir hbase root directory
130   * @param workingFs {@link FileSystem} where ongoing snapshot mainifest files are stored
131   * @param workingDir Location to store ongoing snapshot manifest files
132   * @param cacheRefreshPeriod period (ms) with which the cache should be refreshed
133   * @param cacheRefreshDelay amount of time to wait for the cache to be refreshed
134   * @param refreshThreadName name of the cache refresh thread
135   * @param inspectSnapshotFiles Filter to apply to each snapshot to extract the files.
136   */
137  public SnapshotFileCache(FileSystem fs, Path rootDir, FileSystem workingFs, Path workingDir,
138    long cacheRefreshPeriod, long cacheRefreshDelay, String refreshThreadName,
139    SnapshotFileInspector inspectSnapshotFiles) {
140    this.fs = fs;
141    this.workingFs = workingFs;
142    this.workingSnapshotDir = workingDir;
143    this.fileInspector = inspectSnapshotFiles;
144    this.snapshotDir = SnapshotDescriptionUtils.getSnapshotsDir(rootDir);
145    // periodically refresh the file cache to make sure we aren't superfluously saving files.
146    this.refreshTimer = new Timer(refreshThreadName, true);
147    this.refreshTimer.scheduleAtFixedRate(new RefreshCacheTask(), cacheRefreshDelay,
148      cacheRefreshPeriod);
149  }
150
151  /**
152   * Trigger a cache refresh, even if its before the next cache refresh. Does not affect pending
153   * cache refreshes.
154   * <p/>
155   * Blocks until the cache is refreshed.
156   * <p/>
157   * Exposed for TESTING.
158   */
159  public synchronized void triggerCacheRefreshForTesting() {
160    try {
161      refreshCache();
162    } catch (IOException e) {
163      LOG.warn("Failed to refresh snapshot hfile cache!", e);
164    }
165    LOG.debug("Current cache:" + cache);
166  }
167
168  /**
169   * Check to see if any of the passed file names is contained in any of the snapshots. First checks
170   * an in-memory cache of the files to keep. If its not in the cache, then the cache is refreshed
171   * and the cache checked again for that file. This ensures that we never return files that exist.
172   * <p>
173   * Note this may lead to periodic false positives for the file being referenced. Periodically, the
174   * cache is refreshed even if there are no requests to ensure that the false negatives get removed
175   * eventually. For instance, suppose you have a file in the snapshot and it gets loaded into the
176   * cache. Then at some point later that snapshot is deleted. If the cache has not been refreshed
177   * at that point, cache will still think the file system contains that file and return
178   * <tt>true</tt>, even if it is no longer present (false positive). However, if the file never was
179   * on the filesystem, we will never find it and always return <tt>false</tt>.
180   * @param files file to check, NOTE: Relies that files are loaded from hdfs before method is
181   *          called (NOT LAZY)
182   * @return <tt>unReferencedFiles</tt> the collection of files that do not have snapshot references
183   * @throws IOException if there is an unexpected error reaching the filesystem.
184   */
185  // XXX this is inefficient to synchronize on the method, when what we really need to guard against
186  // is an illegal access to the cache. Really we could do a mutex-guarded pointer swap on the
187  // cache, but that seems overkill at the moment and isn't necessarily a bottleneck.
188  public synchronized Iterable<FileStatus> getUnreferencedFiles(Iterable<FileStatus> files,
189      final SnapshotManager snapshotManager) throws IOException {
190    List<FileStatus> unReferencedFiles = Lists.newArrayList();
191    List<String> snapshotsInProgress = null;
192    boolean refreshed = false;
193    Lock lock = null;
194    if (snapshotManager != null) {
195      lock = snapshotManager.getTakingSnapshotLock().writeLock();
196    }
197    if (lock == null || lock.tryLock()) {
198      try {
199        if (snapshotManager != null && snapshotManager.isTakingAnySnapshot()) {
200          LOG.warn("Not checking unreferenced files since snapshot is running, it will " +
201            "skip to clean the HFiles this time");
202          return unReferencedFiles;
203        }
204        for (FileStatus file : files) {
205          String fileName = file.getPath().getName();
206          if (!refreshed && !cache.contains(fileName)) {
207            refreshCache();
208            refreshed = true;
209          }
210          if (cache.contains(fileName)) {
211            continue;
212          }
213          if (snapshotsInProgress == null) {
214            snapshotsInProgress = getSnapshotsInProgress();
215          }
216          if (snapshotsInProgress.contains(fileName)) {
217            continue;
218          }
219          unReferencedFiles.add(file);
220        }
221      } finally {
222        if (lock != null) {
223          lock.unlock();
224        }
225      }
226    }
227    return unReferencedFiles;
228  }
229
230  private void refreshCache() throws IOException {
231    // just list the snapshot directory directly, do not check the modification time for the root
232    // snapshot directory, as some file system implementations do not modify the parent directory's
233    // modTime when there are new sub items, for example, S3.
234    FileStatus[] snapshotDirs = CommonFSUtils.listStatus(fs, snapshotDir,
235      p -> !p.getName().equals(SnapshotDescriptionUtils.SNAPSHOT_TMP_DIR_NAME));
236
237    // clear the cache, as in the below code, either we will also clear the snapshots, or we will
238    // refill the file name cache again.
239    this.cache.clear();
240    if (ArrayUtils.isEmpty(snapshotDirs)) {
241      // remove all the remembered snapshots because we don't have any left
242      if (LOG.isDebugEnabled() && this.snapshots.size() > 0) {
243        LOG.debug("No snapshots on-disk, clear cache");
244      }
245      this.snapshots.clear();
246      return;
247    }
248
249    // iterate over all the cached snapshots and see if we need to update some, it is not an
250    // expensive operation if we do not reload the manifest of snapshots.
251    Map<String, SnapshotDirectoryInfo> newSnapshots = new HashMap<>();
252    for (FileStatus snapshotDir : snapshotDirs) {
253      String name = snapshotDir.getPath().getName();
254      SnapshotDirectoryInfo files = this.snapshots.remove(name);
255      // if we don't know about the snapshot or its been modified, we need to update the
256      // files the latter could occur where I create a snapshot, then delete it, and then make a
257      // new snapshot with the same name. We will need to update the cache the information from
258      // that new snapshot, even though it has the same name as the files referenced have
259      // probably changed.
260      if (files == null || files.hasBeenModified(snapshotDir.getModificationTime())) {
261        Collection<String> storedFiles = fileInspector.filesUnderSnapshot(fs,
262          snapshotDir.getPath());
263        files = new SnapshotDirectoryInfo(snapshotDir.getModificationTime(), storedFiles);
264      }
265      // add all the files to cache
266      this.cache.addAll(files.getFiles());
267      newSnapshots.put(name, files);
268    }
269    // set the snapshots we are tracking
270    this.snapshots.clear();
271    this.snapshots.putAll(newSnapshots);
272  }
273
274  @VisibleForTesting
275  List<String> getSnapshotsInProgress() throws IOException {
276    List<String> snapshotInProgress = Lists.newArrayList();
277    // only add those files to the cache, but not to the known snapshots
278
279    FileStatus[] snapshotsInProgress = CommonFSUtils.listStatus(this.workingFs, this.workingSnapshotDir);
280
281    if (!ArrayUtils.isEmpty(snapshotsInProgress)) {
282      for (FileStatus snapshot : snapshotsInProgress) {
283        try {
284          snapshotInProgress.addAll(fileInspector.filesUnderSnapshot(workingFs,
285            snapshot.getPath()));
286        } catch (CorruptedSnapshotException cse) {
287          LOG.info("Corrupted in-progress snapshot file exception, ignored.", cse);
288        }
289      }
290    }
291    return snapshotInProgress;
292  }
293
294  /**
295   * Simple helper task that just periodically attempts to refresh the cache
296   */
297  public class RefreshCacheTask extends TimerTask {
298    @Override
299    public void run() {
300      synchronized (SnapshotFileCache.this) {
301        try {
302          SnapshotFileCache.this.refreshCache();
303        } catch (IOException e) {
304          LOG.warn("Failed to refresh snapshot hfile cache!", e);
305          // clear all the cached entries if we meet an error
306          cache.clear();
307          snapshots.clear();
308        }
309      }
310
311    }
312  }
313
314  @Override
315  public void stop(String why) {
316    if (!this.stop) {
317      this.stop = true;
318      this.refreshTimer.cancel();
319    }
320
321  }
322
323  @Override
324  public boolean isStopped() {
325    return this.stop;
326  }
327
328  /**
329   * Information about a snapshot directory
330   */
331  private static class SnapshotDirectoryInfo {
332    long lastModified;
333    Collection<String> files;
334
335    public SnapshotDirectoryInfo(long mtime, Collection<String> files) {
336      this.lastModified = mtime;
337      this.files = files;
338    }
339
340    /**
341     * @return the hfiles in the snapshot when <tt>this</tt> was made.
342     */
343    public Collection<String> getFiles() {
344      return this.files;
345    }
346
347    /**
348     * Check if the snapshot directory has been modified
349     * @param mtime current modification time of the directory
350     * @return <tt>true</tt> if it the modification time of the directory is newer time when we
351     *         created <tt>this</tt>
352     */
353    public boolean hasBeenModified(long mtime) {
354      return this.lastModified < mtime;
355    }
356  }
357}