1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.master.snapshot;
19
20 import java.io.FileNotFoundException;
21 import java.io.IOException;
22 import java.util.Collection;
23 import java.util.HashMap;
24 import java.util.HashSet;
25 import java.util.List;
26 import java.util.Map;
27 import java.util.Set;
28 import java.util.Timer;
29 import java.util.TimerTask;
30 import java.util.concurrent.locks.Lock;
31
32 import com.google.common.annotations.VisibleForTesting;
33 import com.google.common.collect.Lists;
34 import org.apache.commons.logging.Log;
35 import org.apache.commons.logging.LogFactory;
36 import org.apache.hadoop.hbase.classification.InterfaceAudience;
37 import org.apache.hadoop.hbase.classification.InterfaceStability;
38 import org.apache.hadoop.conf.Configuration;
39 import org.apache.hadoop.fs.FileStatus;
40 import org.apache.hadoop.fs.FileSystem;
41 import org.apache.hadoop.fs.Path;
42 import org.apache.hadoop.hbase.Stoppable;
43 import org.apache.hadoop.hbase.snapshot.CorruptedSnapshotException;
44 import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
45 import org.apache.hadoop.hbase.util.FSUtils;
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76 @InterfaceAudience.Private
77 @InterfaceStability.Evolving
78 public class SnapshotFileCache implements Stoppable {
79 interface SnapshotFileInspector {
80
81
82
83
84
85 Collection<String> filesUnderSnapshot(final Path snapshotDir) throws IOException;
86 }
87
88 private static final Log LOG = LogFactory.getLog(SnapshotFileCache.class);
89 private volatile boolean stop = false;
90 private final FileSystem fs;
91 private final SnapshotFileInspector fileInspector;
92 private final Path snapshotDir;
93 private final Set<String> cache = new HashSet<String>();
94
95
96
97
98 private final Map<String, SnapshotDirectoryInfo> snapshots =
99 new HashMap<String, SnapshotDirectoryInfo>();
100 private final Timer refreshTimer;
101
102 private long lastModifiedTime = Long.MIN_VALUE;
103
104
105
106
107
108
109
110
111
112
113
114
115
116 public SnapshotFileCache(Configuration conf, long cacheRefreshPeriod, String refreshThreadName,
117 SnapshotFileInspector inspectSnapshotFiles) throws IOException {
118 this(FSUtils.getCurrentFileSystem(conf), FSUtils.getRootDir(conf), 0, cacheRefreshPeriod,
119 refreshThreadName, inspectSnapshotFiles);
120 }
121
122
123
124
125
126
127
128
129
130
131
132 public SnapshotFileCache(FileSystem fs, Path rootDir, long cacheRefreshPeriod,
133 long cacheRefreshDelay, String refreshThreadName, SnapshotFileInspector inspectSnapshotFiles) {
134 this.fs = fs;
135 this.fileInspector = inspectSnapshotFiles;
136 this.snapshotDir = SnapshotDescriptionUtils.getSnapshotsDir(rootDir);
137
138 this.refreshTimer = new Timer(refreshThreadName, true);
139 this.refreshTimer.scheduleAtFixedRate(new RefreshCacheTask(), cacheRefreshDelay,
140 cacheRefreshPeriod);
141 }
142
143
144
145
146
147
148
149
150
151 public void triggerCacheRefreshForTesting() {
152 try {
153 SnapshotFileCache.this.refreshCache();
154 } catch (IOException e) {
155 LOG.warn("Failed to refresh snapshot hfile cache!", e);
156 }
157 LOG.debug("Current cache:" + cache);
158 }
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181 public synchronized Iterable<FileStatus> getUnreferencedFiles(Iterable<FileStatus> files,
182 SnapshotManager snapshotManager) throws IOException {
183 List<FileStatus> unReferencedFiles = Lists.newArrayList();
184 List<String> snapshotsInProgress = null;
185 boolean refreshed = false;
186 Lock lock = null;
187 if (snapshotManager != null) {
188 lock = snapshotManager.getTakingSnapshotLock().writeLock();
189 }
190 if (lock == null || lock.tryLock()) {
191 try {
192 if (snapshotManager != null && snapshotManager.isTakingAnySnapshot()) {
193 LOG.warn("Not checking unreferenced files since snapshot is running, it will "
194 + "skip to clean the HFiles this time");
195 return unReferencedFiles;
196 }
197 for (FileStatus file : files) {
198 String fileName = file.getPath().getName();
199 if (!refreshed && !cache.contains(fileName)) {
200 refreshCache();
201 refreshed = true;
202 }
203 if (cache.contains(fileName)) {
204 continue;
205 }
206 if (snapshotsInProgress == null) {
207 snapshotsInProgress = getSnapshotsInProgress();
208 }
209 if (snapshotsInProgress.contains(fileName)) {
210 continue;
211 }
212 unReferencedFiles.add(file);
213 }
214 } finally {
215 if (lock != null) {
216 lock.unlock();
217 }
218 }
219 }
220 return unReferencedFiles;
221 }
222
223 private synchronized void refreshCache() throws IOException {
224 long lastTimestamp = Long.MAX_VALUE;
225 boolean hasChanges = false;
226
227
228 try {
229 FileStatus dirStatus = fs.getFileStatus(snapshotDir);
230 lastTimestamp = dirStatus.getModificationTime();
231 hasChanges |= (lastTimestamp >= lastModifiedTime);
232 } catch (FileNotFoundException e) {
233 if (this.cache.size() > 0) {
234 LOG.error("Snapshot directory: " + snapshotDir + " doesn't exist");
235 }
236 return;
237 }
238
239
240
241 try {
242 Path snapshotTmpDir = new Path(snapshotDir, SnapshotDescriptionUtils.SNAPSHOT_TMP_DIR_NAME);
243 FileStatus tempDirStatus = fs.getFileStatus(snapshotTmpDir);
244 lastTimestamp = Math.min(lastTimestamp, tempDirStatus.getModificationTime());
245 hasChanges |= (lastTimestamp >= lastModifiedTime);
246 if (!hasChanges) {
247 FileStatus[] tmpSnapshots = FSUtils.listStatus(fs, snapshotDir);
248 if (tmpSnapshots != null) {
249 for (FileStatus dirStatus: tmpSnapshots) {
250 lastTimestamp = Math.min(lastTimestamp, dirStatus.getModificationTime());
251 }
252 hasChanges |= (lastTimestamp >= lastModifiedTime);
253 }
254 }
255 } catch (FileNotFoundException e) {
256
257 }
258
259
260 if (!hasChanges) {
261 return;
262 }
263
264
265
266
267
268
269
270 this.lastModifiedTime = lastTimestamp;
271
272
273 this.cache.clear();
274 Map<String, SnapshotDirectoryInfo> known = new HashMap<String, SnapshotDirectoryInfo>();
275
276
277 FileStatus[] snapshots = FSUtils.listStatus(fs, snapshotDir);
278 if (snapshots == null) {
279
280 if (LOG.isDebugEnabled() && this.snapshots.size() > 0) {
281 LOG.debug("No snapshots on-disk, cache empty");
282 }
283 this.snapshots.clear();
284 return;
285 }
286
287
288 for (FileStatus snapshot : snapshots) {
289 String name = snapshot.getPath().getName();
290
291 if (!name.equals(SnapshotDescriptionUtils.SNAPSHOT_TMP_DIR_NAME)) {
292 SnapshotDirectoryInfo files = this.snapshots.remove(name);
293
294
295
296
297
298 if (files == null || files.hasBeenModified(snapshot.getModificationTime())) {
299
300 Collection<String> storedFiles = fileInspector.filesUnderSnapshot(snapshot.getPath());
301 files = new SnapshotDirectoryInfo(snapshot.getModificationTime(), storedFiles);
302 }
303
304 this.cache.addAll(files.getFiles());
305 known.put(name, files);
306 }
307 }
308
309
310 this.snapshots.clear();
311 this.snapshots.putAll(known);
312 }
313
314 @VisibleForTesting
315 List<String> getSnapshotsInProgress() throws IOException {
316 List<String> snapshotInProgress = Lists.newArrayList();
317
318 Path snapshotTmpDir = new Path(snapshotDir, SnapshotDescriptionUtils.SNAPSHOT_TMP_DIR_NAME);
319
320 FileStatus[] running = FSUtils.listStatus(fs, snapshotTmpDir);
321 if (running != null) {
322 for (FileStatus run : running) {
323 try {
324 snapshotInProgress.addAll(fileInspector.filesUnderSnapshot(run.getPath()));
325 } catch (CorruptedSnapshotException e) {
326
327 if (e.getCause() instanceof FileNotFoundException) {
328
329 fs.delete(run.getPath(), true);
330 LOG.warn("delete the " + run.getPath() + " due to exception:", e.getCause());
331 } else {
332 throw e;
333 }
334 }
335 }
336 }
337 return snapshotInProgress;
338 }
339
340
341
342
343 public class RefreshCacheTask extends TimerTask {
344 @Override
345 public void run() {
346 try {
347 SnapshotFileCache.this.refreshCache();
348 } catch (IOException e) {
349 LOG.warn("Failed to refresh snapshot hfile cache!", e);
350 }
351 }
352 }
353
354 @Override
355 public void stop(String why) {
356 if (!this.stop) {
357 this.stop = true;
358 this.refreshTimer.cancel();
359 }
360
361 }
362
363 @Override
364 public boolean isStopped() {
365 return this.stop;
366 }
367
368
369
370
371 private static class SnapshotDirectoryInfo {
372 long lastModified;
373 Collection<String> files;
374
375 public SnapshotDirectoryInfo(long mtime, Collection<String> files) {
376 this.lastModified = mtime;
377 this.files = files;
378 }
379
380
381
382
383 public Collection<String> getFiles() {
384 return this.files;
385 }
386
387
388
389
390
391
392
393 public boolean hasBeenModified(long mtime) {
394 return this.lastModified < mtime;
395 }
396 }
397 }