1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.master.snapshot;
19
20 import java.io.FileNotFoundException;
21 import java.io.IOException;
22 import java.util.Collection;
23 import java.util.HashMap;
24 import java.util.HashSet;
25 import java.util.List;
26 import java.util.Map;
27 import java.util.Set;
28 import java.util.Timer;
29 import java.util.TimerTask;
30
31 import com.google.common.annotations.VisibleForTesting;
32 import com.google.common.collect.Lists;
33 import org.apache.commons.logging.Log;
34 import org.apache.commons.logging.LogFactory;
35 import org.apache.hadoop.hbase.classification.InterfaceAudience;
36 import org.apache.hadoop.hbase.classification.InterfaceStability;
37 import org.apache.hadoop.conf.Configuration;
38 import org.apache.hadoop.fs.FileStatus;
39 import org.apache.hadoop.fs.FileSystem;
40 import org.apache.hadoop.fs.Path;
41 import org.apache.hadoop.hbase.Stoppable;
42 import org.apache.hadoop.hbase.snapshot.CorruptedSnapshotException;
43 import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
44 import org.apache.hadoop.hbase.util.FSUtils;
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75 @InterfaceAudience.Private
76 @InterfaceStability.Evolving
77 public class SnapshotFileCache implements Stoppable {
78 interface SnapshotFileInspector {
79
80
81
82
83
84 Collection<String> filesUnderSnapshot(final Path snapshotDir) throws IOException;
85 }
86
87 private static final Log LOG = LogFactory.getLog(SnapshotFileCache.class);
88 private volatile boolean stop = false;
89 private final FileSystem fs;
90 private final SnapshotFileInspector fileInspector;
91 private final Path snapshotDir;
92 private final Set<String> cache = new HashSet<String>();
93
94
95
96
97 private final Map<String, SnapshotDirectoryInfo> snapshots =
98 new HashMap<String, SnapshotDirectoryInfo>();
99 private final Timer refreshTimer;
100
101 private long lastModifiedTime = Long.MIN_VALUE;
102
103
104
105
106
107
108
109
110
111
112
113
114
115 public SnapshotFileCache(Configuration conf, long cacheRefreshPeriod, String refreshThreadName,
116 SnapshotFileInspector inspectSnapshotFiles) throws IOException {
117 this(FSUtils.getCurrentFileSystem(conf), FSUtils.getRootDir(conf), 0, cacheRefreshPeriod,
118 refreshThreadName, inspectSnapshotFiles);
119 }
120
121
122
123
124
125
126
127
128
129
130
131 public SnapshotFileCache(FileSystem fs, Path rootDir, long cacheRefreshPeriod,
132 long cacheRefreshDelay, String refreshThreadName, SnapshotFileInspector inspectSnapshotFiles) {
133 this.fs = fs;
134 this.fileInspector = inspectSnapshotFiles;
135 this.snapshotDir = SnapshotDescriptionUtils.getSnapshotsDir(rootDir);
136
137 this.refreshTimer = new Timer(refreshThreadName, true);
138 this.refreshTimer.scheduleAtFixedRate(new RefreshCacheTask(), cacheRefreshDelay,
139 cacheRefreshPeriod);
140 }
141
142
143
144
145
146
147
148
149
150 public void triggerCacheRefreshForTesting() {
151 try {
152 SnapshotFileCache.this.refreshCache();
153 } catch (IOException e) {
154 LOG.warn("Failed to refresh snapshot hfile cache!", e);
155 }
156 LOG.debug("Current cache:" + cache);
157 }
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180 public synchronized Iterable<FileStatus> getUnreferencedFiles(Iterable<FileStatus> files)
181 throws IOException {
182 List<FileStatus> unReferencedFiles = Lists.newArrayList();
183 List<String> snapshotsInProgress = null;
184 boolean refreshed = false;
185 for (FileStatus file : files) {
186 String fileName = file.getPath().getName();
187 if (!refreshed && !cache.contains(fileName)) {
188 refreshCache();
189 refreshed = true;
190 }
191 if (cache.contains(fileName)) {
192 continue;
193 }
194 if (snapshotsInProgress == null) {
195 snapshotsInProgress = getSnapshotsInProgress();
196 }
197 if (snapshotsInProgress.contains(fileName)) {
198 continue;
199 }
200 unReferencedFiles.add(file);
201 }
202 return unReferencedFiles;
203 }
204
205 private synchronized void refreshCache() throws IOException {
206 long lastTimestamp = Long.MAX_VALUE;
207 boolean hasChanges = false;
208
209
210 try {
211 FileStatus dirStatus = fs.getFileStatus(snapshotDir);
212 lastTimestamp = dirStatus.getModificationTime();
213 hasChanges |= (lastTimestamp >= lastModifiedTime);
214 } catch (FileNotFoundException e) {
215 if (this.cache.size() > 0) {
216 LOG.error("Snapshot directory: " + snapshotDir + " doesn't exist");
217 }
218 return;
219 }
220
221
222
223 try {
224 Path snapshotTmpDir = new Path(snapshotDir, SnapshotDescriptionUtils.SNAPSHOT_TMP_DIR_NAME);
225 FileStatus tempDirStatus = fs.getFileStatus(snapshotTmpDir);
226 lastTimestamp = Math.min(lastTimestamp, tempDirStatus.getModificationTime());
227 hasChanges |= (lastTimestamp >= lastModifiedTime);
228 if (!hasChanges) {
229 FileStatus[] tmpSnapshots = FSUtils.listStatus(fs, snapshotDir);
230 if (tmpSnapshots != null) {
231 for (FileStatus dirStatus: tmpSnapshots) {
232 lastTimestamp = Math.min(lastTimestamp, dirStatus.getModificationTime());
233 }
234 hasChanges |= (lastTimestamp >= lastModifiedTime);
235 }
236 }
237 } catch (FileNotFoundException e) {
238
239 }
240
241
242 if (!hasChanges) {
243 return;
244 }
245
246
247
248
249
250
251
252 this.lastModifiedTime = lastTimestamp;
253
254
255 this.cache.clear();
256 Map<String, SnapshotDirectoryInfo> known = new HashMap<String, SnapshotDirectoryInfo>();
257
258
259 FileStatus[] snapshots = FSUtils.listStatus(fs, snapshotDir);
260 if (snapshots == null) {
261
262 if (LOG.isDebugEnabled() && this.snapshots.size() > 0) {
263 LOG.debug("No snapshots on-disk, cache empty");
264 }
265 this.snapshots.clear();
266 return;
267 }
268
269
270 for (FileStatus snapshot : snapshots) {
271 String name = snapshot.getPath().getName();
272
273 if (!name.equals(SnapshotDescriptionUtils.SNAPSHOT_TMP_DIR_NAME)) {
274 SnapshotDirectoryInfo files = this.snapshots.remove(name);
275
276
277
278
279
280 if (files == null || files.hasBeenModified(snapshot.getModificationTime())) {
281
282 Collection<String> storedFiles = fileInspector.filesUnderSnapshot(snapshot.getPath());
283 files = new SnapshotDirectoryInfo(snapshot.getModificationTime(), storedFiles);
284 }
285
286 this.cache.addAll(files.getFiles());
287 known.put(name, files);
288 }
289 }
290
291
292 this.snapshots.clear();
293 this.snapshots.putAll(known);
294 }
295
296 @VisibleForTesting List<String> getSnapshotsInProgress() throws IOException {
297 List<String> snapshotInProgress = Lists.newArrayList();
298
299 Path snapshotTmpDir = new Path(snapshotDir, SnapshotDescriptionUtils.SNAPSHOT_TMP_DIR_NAME);
300
301 FileStatus[] running = FSUtils.listStatus(fs, snapshotTmpDir);
302 if (running != null) {
303 for (FileStatus run : running) {
304 try {
305 snapshotInProgress.addAll(fileInspector.filesUnderSnapshot(run.getPath()));
306 } catch (CorruptedSnapshotException e) {
307
308 if (e.getCause() instanceof FileNotFoundException) {
309
310 if (!fs.exists(new Path(run.getPath(),
311 SnapshotDescriptionUtils.SNAPSHOT_IN_PROGRESS))) {
312 fs.delete(run.getPath(), true);
313 LOG.warn("delete the " + run.getPath() + " due to exception:", e.getCause());
314 }
315 } else {
316 throw e;
317 }
318 }
319 }
320 }
321 return snapshotInProgress;
322 }
323
324
325
326
327 public class RefreshCacheTask extends TimerTask {
328 @Override
329 public void run() {
330 try {
331 SnapshotFileCache.this.refreshCache();
332 } catch (IOException e) {
333 LOG.warn("Failed to refresh snapshot hfile cache!", e);
334 }
335 }
336 }
337
338 @Override
339 public void stop(String why) {
340 if (!this.stop) {
341 this.stop = true;
342 this.refreshTimer.cancel();
343 }
344
345 }
346
347 @Override
348 public boolean isStopped() {
349 return this.stop;
350 }
351
352
353
354
355 private static class SnapshotDirectoryInfo {
356 long lastModified;
357 Collection<String> files;
358
359 public SnapshotDirectoryInfo(long mtime, Collection<String> files) {
360 this.lastModified = mtime;
361 this.files = files;
362 }
363
364
365
366
367 public Collection<String> getFiles() {
368 return this.files;
369 }
370
371
372
373
374
375
376
377 public boolean hasBeenModified(long mtime) {
378 return this.lastModified < mtime;
379 }
380 }
381 }