001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.snapshot;
019
020import java.io.IOException;
021import java.util.Collection;
022import java.util.HashMap;
023import java.util.HashSet;
024import java.util.List;
025import java.util.Map;
026import java.util.Set;
027import java.util.Timer;
028import java.util.TimerTask;
029import java.util.concurrent.locks.Lock;
030import org.apache.commons.lang3.ArrayUtils;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.fs.FileStatus;
033import org.apache.hadoop.fs.FileSystem;
034import org.apache.hadoop.fs.Path;
035import org.apache.hadoop.hbase.Stoppable;
036import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
037import org.apache.hadoop.hbase.util.FSUtils;
038import org.apache.yetus.audience.InterfaceAudience;
039import org.apache.yetus.audience.InterfaceStability;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
044
045/**
046 * Intelligently keep track of all the files for all the snapshots.
047 * <p>
048 * A cache of files is kept to avoid querying the {@link FileSystem} frequently. If there is a cache
049 * miss the directory modification time is used to ensure that we don't rescan directories that we
050 * already have in cache. We only check the modification times of the snapshot directories
051 * (/hbase/.snapshot/[snapshot_name]) to determine if the files need to be loaded into the cache.
052 * <p>
053 * New snapshots will be added to the cache and deleted snapshots will be removed when we refresh
054 * the cache. If the files underneath a snapshot directory are changed, but not the snapshot itself,
055 * we will ignore updates to that snapshot's files.
056 * <p>
057 * This is sufficient because each snapshot has its own directory and is added via an atomic rename
058 * <i>once</i>, when the snapshot is created. We don't need to worry about the data in the snapshot
059 * being run.
060 * <p>
061 * Further, the cache is periodically refreshed ensure that files in snapshots that were deleted are
062 * also removed from the cache.
063 * <p>
064 * A {@link SnapshotFileCache.SnapshotFileInspector} must be passed when creating <tt>this</tt> to
065 * allow extraction of files under /hbase/.snapshot/[snapshot name] directory, for each snapshot.
066 * This allows you to only cache files under, for instance, all the logs in the .logs directory or
067 * all the files under all the regions.
068 * <p>
069 * <tt>this</tt> also considers all running snapshots (those under /hbase/.snapshot/.tmp) as valid
070 * snapshots and will attempt to cache files from those snapshots as well.
071 * <p>
072 * Queries about a given file are thread-safe with respect to multiple queries and cache refreshes.
073 */
074@InterfaceAudience.Private
075@InterfaceStability.Evolving
076public class SnapshotFileCache implements Stoppable {
077  interface SnapshotFileInspector {
078    /**
079     * Returns a collection of file names needed by the snapshot.
080     * @param snapshotDir {@link Path} to the snapshot directory to scan.
081     * @return the collection of file names needed by the snapshot.
082     */
083    Collection<String> filesUnderSnapshot(final Path snapshotDir) throws IOException;
084  }
085
086  private static final Logger LOG = LoggerFactory.getLogger(SnapshotFileCache.class);
087  private volatile boolean stop = false;
088  private final FileSystem fs;
089  private final SnapshotFileInspector fileInspector;
090  private final Path snapshotDir;
091  private final Set<String> cache = new HashSet<>();
092  /**
093   * This is a helper map of information about the snapshot directories so we don't need to rescan
094   * them if they haven't changed since the last time we looked.
095   */
096  private final Map<String, SnapshotDirectoryInfo> snapshots = new HashMap<>();
097  private final Timer refreshTimer;
098
099  /**
100   * Create a snapshot file cache for all snapshots under the specified [root]/.snapshot on the
101   * filesystem.
102   * <p>
103   * Immediately loads the file cache.
104   * @param conf to extract the configured {@link FileSystem} where the snapshots are stored and
105   *          hbase root directory
106   * @param cacheRefreshPeriod frequency (ms) with which the cache should be refreshed
107   * @param refreshThreadName name of the cache refresh thread
108   * @param inspectSnapshotFiles Filter to apply to each snapshot to extract the files.
109   * @throws IOException if the {@link FileSystem} or root directory cannot be loaded
110   */
111  public SnapshotFileCache(Configuration conf, long cacheRefreshPeriod, String refreshThreadName,
112      SnapshotFileInspector inspectSnapshotFiles) throws IOException {
113    this(FSUtils.getCurrentFileSystem(conf), FSUtils.getRootDir(conf), 0, cacheRefreshPeriod,
114      refreshThreadName, inspectSnapshotFiles);
115  }
116
117  /**
118   * Create a snapshot file cache for all snapshots under the specified [root]/.snapshot on the
119   * filesystem
120   * @param fs {@link FileSystem} where the snapshots are stored
121   * @param rootDir hbase root directory
122   * @param cacheRefreshPeriod period (ms) with which the cache should be refreshed
123   * @param cacheRefreshDelay amount of time to wait for the cache to be refreshed
124   * @param refreshThreadName name of the cache refresh thread
125   * @param inspectSnapshotFiles Filter to apply to each snapshot to extract the files.
126   */
127  public SnapshotFileCache(FileSystem fs, Path rootDir, long cacheRefreshPeriod,
128      long cacheRefreshDelay, String refreshThreadName,
129      SnapshotFileInspector inspectSnapshotFiles) {
130    this.fs = fs;
131    this.fileInspector = inspectSnapshotFiles;
132    this.snapshotDir = SnapshotDescriptionUtils.getSnapshotsDir(rootDir);
133    // periodically refresh the file cache to make sure we aren't superfluously saving files.
134    this.refreshTimer = new Timer(refreshThreadName, true);
135    this.refreshTimer.scheduleAtFixedRate(new RefreshCacheTask(), cacheRefreshDelay,
136      cacheRefreshPeriod);
137  }
138
139  /**
140   * Trigger a cache refresh, even if its before the next cache refresh. Does not affect pending
141   * cache refreshes.
142   * <p/>
143   * Blocks until the cache is refreshed.
144   * <p/>
145   * Exposed for TESTING.
146   */
147  public synchronized void triggerCacheRefreshForTesting() {
148    try {
149      refreshCache();
150    } catch (IOException e) {
151      LOG.warn("Failed to refresh snapshot hfile cache!", e);
152    }
153    LOG.debug("Current cache:" + cache);
154  }
155
156  /**
157   * Check to see if any of the passed file names is contained in any of the snapshots. First checks
158   * an in-memory cache of the files to keep. If its not in the cache, then the cache is refreshed
159   * and the cache checked again for that file. This ensures that we never return files that exist.
160   * <p>
161   * Note this may lead to periodic false positives for the file being referenced. Periodically, the
162   * cache is refreshed even if there are no requests to ensure that the false negatives get removed
163   * eventually. For instance, suppose you have a file in the snapshot and it gets loaded into the
164   * cache. Then at some point later that snapshot is deleted. If the cache has not been refreshed
165   * at that point, cache will still think the file system contains that file and return
166   * <tt>true</tt>, even if it is no longer present (false positive). However, if the file never was
167   * on the filesystem, we will never find it and always return <tt>false</tt>.
168   * @param files file to check, NOTE: Relies that files are loaded from hdfs before method is
169   *          called (NOT LAZY)
170   * @return <tt>unReferencedFiles</tt> the collection of files that do not have snapshot references
171   * @throws IOException if there is an unexpected error reaching the filesystem.
172   */
173  // XXX this is inefficient to synchronize on the method, when what we really need to guard against
174  // is an illegal access to the cache. Really we could do a mutex-guarded pointer swap on the
175  // cache, but that seems overkill at the moment and isn't necessarily a bottleneck.
176  public synchronized Iterable<FileStatus> getUnreferencedFiles(Iterable<FileStatus> files,
177      final SnapshotManager snapshotManager) throws IOException {
178    List<FileStatus> unReferencedFiles = Lists.newArrayList();
179    boolean refreshed = false;
180    Lock lock = null;
181    if (snapshotManager != null) {
182      lock = snapshotManager.getTakingSnapshotLock().writeLock();
183    }
184    if (lock == null || lock.tryLock()) {
185      try {
186        if (snapshotManager != null && snapshotManager.isTakingAnySnapshot()) {
187          LOG.warn("Not checking unreferenced files since snapshot is running, it will " +
188            "skip to clean the HFiles this time");
189          return unReferencedFiles;
190        }
191        for (FileStatus file : files) {
192          String fileName = file.getPath().getName();
193          if (!refreshed && !cache.contains(fileName)) {
194            refreshCache();
195            refreshed = true;
196          }
197          if (cache.contains(fileName)) {
198            continue;
199          }
200          unReferencedFiles.add(file);
201        }
202      } finally {
203        if (lock != null) {
204          lock.unlock();
205        }
206      }
207    }
208    return unReferencedFiles;
209  }
210
211  private void refreshCache() throws IOException {
212    // just list the snapshot directory directly, do not check the modification time for the root
213    // snapshot directory, as some file system implementations do not modify the parent directory's
214    // modTime when there are new sub items, for example, S3.
215    FileStatus[] snapshotDirs = FSUtils.listStatus(fs, snapshotDir,
216      p -> !p.getName().equals(SnapshotDescriptionUtils.SNAPSHOT_TMP_DIR_NAME));
217
218    // clear the cache, as in the below code, either we will also clear the snapshots, or we will
219    // refill the file name cache again.
220    this.cache.clear();
221    if (ArrayUtils.isEmpty(snapshotDirs)) {
222      // remove all the remembered snapshots because we don't have any left
223      if (LOG.isDebugEnabled() && this.snapshots.size() > 0) {
224        LOG.debug("No snapshots on-disk, clear cache");
225      }
226      this.snapshots.clear();
227      return;
228    }
229
230    // iterate over all the cached snapshots and see if we need to update some, it is not an
231    // expensive operation if we do not reload the manifest of snapshots.
232    Map<String, SnapshotDirectoryInfo> newSnapshots = new HashMap<>();
233    for (FileStatus snapshotDir : snapshotDirs) {
234      String name = snapshotDir.getPath().getName();
235      SnapshotDirectoryInfo files = this.snapshots.remove(name);
236      // if we don't know about the snapshot or its been modified, we need to update the
237      // files the latter could occur where I create a snapshot, then delete it, and then make a
238      // new snapshot with the same name. We will need to update the cache the information from
239      // that new snapshot, even though it has the same name as the files referenced have
240      // probably changed.
241      if (files == null || files.hasBeenModified(snapshotDir.getModificationTime())) {
242        Collection<String> storedFiles = fileInspector.filesUnderSnapshot(snapshotDir.getPath());
243        files = new SnapshotDirectoryInfo(snapshotDir.getModificationTime(), storedFiles);
244      }
245      // add all the files to cache
246      this.cache.addAll(files.getFiles());
247      newSnapshots.put(name, files);
248    }
249    // set the snapshots we are tracking
250    this.snapshots.clear();
251    this.snapshots.putAll(newSnapshots);
252  }
253
254  /**
255   * Simple helper task that just periodically attempts to refresh the cache
256   */
257  public class RefreshCacheTask extends TimerTask {
258    @Override
259    public void run() {
260      synchronized (SnapshotFileCache.this) {
261        try {
262          SnapshotFileCache.this.refreshCache();
263        } catch (IOException e) {
264          LOG.warn("Failed to refresh snapshot hfile cache!", e);
265          // clear all the cached entries if we meet an error
266          cache.clear();
267          snapshots.clear();
268        }
269      }
270
271    }
272  }
273
274  @Override
275  public void stop(String why) {
276    if (!this.stop) {
277      this.stop = true;
278      this.refreshTimer.cancel();
279    }
280
281  }
282
283  @Override
284  public boolean isStopped() {
285    return this.stop;
286  }
287
288  /**
289   * Information about a snapshot directory
290   */
291  private static class SnapshotDirectoryInfo {
292    long lastModified;
293    Collection<String> files;
294
295    public SnapshotDirectoryInfo(long mtime, Collection<String> files) {
296      this.lastModified = mtime;
297      this.files = files;
298    }
299
300    /**
301     * @return the hfiles in the snapshot when <tt>this</tt> was made.
302     */
303    public Collection<String> getFiles() {
304      return this.files;
305    }
306
307    /**
308     * Check if the snapshot directory has been modified
309     * @param mtime current modification time of the directory
310     * @return <tt>true</tt> if it the modification time of the directory is newer time when we
311     *         created <tt>this</tt>
312     */
313    public boolean hasBeenModified(long mtime) {
314      return this.lastModified < mtime;
315    }
316  }
317}