1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.io;
20
21 import java.util.ArrayList;
22 import java.util.Arrays;
23 import java.util.Collection;
24 import java.io.IOException;
25 import java.io.InputStream;
26 import java.io.FileNotFoundException;
27 import java.util.List;
28
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.apache.hadoop.hbase.classification.InterfaceAudience;
32 import org.apache.hadoop.fs.FSDataInputStream;
33 import org.apache.hadoop.fs.FileSystem;
34 import org.apache.hadoop.fs.FileStatus;
35 import org.apache.hadoop.fs.Path;
36 import org.apache.hadoop.fs.PositionedReadable;
37 import org.apache.hadoop.fs.Seekable;
38 import org.apache.hadoop.hbase.util.FSUtils;
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90 @InterfaceAudience.Private
91 public class FileLink {
92 private static final Log LOG = LogFactory.getLog(FileLink.class);
93
94
95 public static final String BACK_REFERENCES_DIRECTORY_PREFIX = ".links-";
96
97
98
99
100
101 private static class FileLinkInputStream extends InputStream
102 implements Seekable, PositionedReadable {
103 private FSDataInputStream in = null;
104 private Path currentPath = null;
105 private long pos = 0;
106
107 private final FileLink fileLink;
108 private final int bufferSize;
109 private final FileSystem fs;
110
111 public FileLinkInputStream(final FileSystem fs, final FileLink fileLink)
112 throws IOException {
113 this(fs, fileLink, FSUtils.getDefaultBufferSize(fs));
114 }
115
116 public FileLinkInputStream(final FileSystem fs, final FileLink fileLink, int bufferSize)
117 throws IOException {
118 this.bufferSize = bufferSize;
119 this.fileLink = fileLink;
120 this.fs = fs;
121
122 this.in = tryOpen();
123 }
124
125 @Override
126 public int read() throws IOException {
127 int res;
128 try {
129 res = in.read();
130 } catch (FileNotFoundException e) {
131 res = tryOpen().read();
132 } catch (NullPointerException e) {
133 res = tryOpen().read();
134 } catch (AssertionError e) {
135 res = tryOpen().read();
136 }
137 if (res > 0) pos += 1;
138 return res;
139 }
140
141 @Override
142 public int read(byte[] b) throws IOException {
143 return read(b, 0, b.length);
144 }
145
146 @Override
147 public int read(byte[] b, int off, int len) throws IOException {
148 int n;
149 try {
150 n = in.read(b, off, len);
151 } catch (FileNotFoundException e) {
152 n = tryOpen().read(b, off, len);
153 } catch (NullPointerException e) {
154 n = tryOpen().read(b, off, len);
155 } catch (AssertionError e) {
156 n = tryOpen().read(b, off, len);
157 }
158 if (n > 0) pos += n;
159 assert(in.getPos() == pos);
160 return n;
161 }
162
163 @Override
164 public int read(long position, byte[] buffer, int offset, int length) throws IOException {
165 int n;
166 try {
167 n = in.read(position, buffer, offset, length);
168 } catch (FileNotFoundException e) {
169 n = tryOpen().read(position, buffer, offset, length);
170 } catch (NullPointerException e) {
171 n = tryOpen().read(position, buffer, offset, length);
172 } catch (AssertionError e) {
173 n = tryOpen().read(position, buffer, offset, length);
174 }
175 return n;
176 }
177
178 @Override
179 public void readFully(long position, byte[] buffer) throws IOException {
180 readFully(position, buffer, 0, buffer.length);
181 }
182
183 @Override
184 public void readFully(long position, byte[] buffer, int offset, int length) throws IOException {
185 try {
186 in.readFully(position, buffer, offset, length);
187 } catch (FileNotFoundException e) {
188 tryOpen().readFully(position, buffer, offset, length);
189 } catch (NullPointerException e) {
190 tryOpen().readFully(position, buffer, offset, length);
191 } catch (AssertionError e) {
192 tryOpen().readFully(position, buffer, offset, length);
193 }
194 }
195
196 @Override
197 public long skip(long n) throws IOException {
198 long skipped;
199
200 try {
201 skipped = in.skip(n);
202 } catch (FileNotFoundException e) {
203 skipped = tryOpen().skip(n);
204 } catch (NullPointerException e) {
205 skipped = tryOpen().skip(n);
206 } catch (AssertionError e) {
207 skipped = tryOpen().skip(n);
208 }
209
210 if (skipped > 0) pos += skipped;
211 return skipped;
212 }
213
214 @Override
215 public int available() throws IOException {
216 try {
217 return in.available();
218 } catch (FileNotFoundException e) {
219 return tryOpen().available();
220 } catch (NullPointerException e) {
221 return tryOpen().available();
222 } catch (AssertionError e) {
223 return tryOpen().available();
224 }
225 }
226
227 @Override
228 public void seek(long pos) throws IOException {
229 try {
230 in.seek(pos);
231 } catch (FileNotFoundException e) {
232 tryOpen().seek(pos);
233 } catch (NullPointerException e) {
234 tryOpen().seek(pos);
235 } catch (AssertionError e) {
236 tryOpen().seek(pos);
237 }
238 this.pos = pos;
239 }
240
241 @Override
242 public long getPos() throws IOException {
243 return pos;
244 }
245
246 @Override
247 public boolean seekToNewSource(long targetPos) throws IOException {
248 boolean res;
249 try {
250 res = in.seekToNewSource(targetPos);
251 } catch (FileNotFoundException e) {
252 res = tryOpen().seekToNewSource(targetPos);
253 } catch (NullPointerException e) {
254 res = tryOpen().seekToNewSource(targetPos);
255 } catch (AssertionError e) {
256 res = tryOpen().seekToNewSource(targetPos);
257 }
258 if (res) pos = targetPos;
259 return res;
260 }
261
262 @Override
263 public void close() throws IOException {
264 in.close();
265 }
266
267 @Override
268 public synchronized void mark(int readlimit) {
269 }
270
271 @Override
272 public synchronized void reset() throws IOException {
273 throw new IOException("mark/reset not supported");
274 }
275
276 @Override
277 public boolean markSupported() {
278 return false;
279 }
280
281
282
283
284
285
286
287 private FSDataInputStream tryOpen() throws IOException {
288 for (Path path: fileLink.getLocations()) {
289 if (path.equals(currentPath)) continue;
290 try {
291 in = fs.open(path, bufferSize);
292 if (pos != 0) in.seek(pos);
293 assert(in.getPos() == pos) : "Link unable to seek to the right position=" + pos;
294 if (LOG.isTraceEnabled()) {
295 if (currentPath == null) {
296 LOG.debug("link open path=" + path);
297 } else {
298 LOG.trace("link switch from path=" + currentPath + " to path=" + path);
299 }
300 }
301 currentPath = path;
302 return(in);
303 } catch (FileNotFoundException e) {
304
305 }
306 }
307 throw new FileNotFoundException("Unable to open link: " + fileLink);
308 }
309 }
310
311 private Path[] locations = null;
312
313 protected FileLink() {
314 this.locations = null;
315 }
316
317
318
319
320
321 public FileLink(Path originPath, Path... alternativePaths) {
322 setLocations(originPath, alternativePaths);
323 }
324
325
326
327
328 public FileLink(final Collection<Path> locations) {
329 this.locations = locations.toArray(new Path[locations.size()]);
330 }
331
332
333
334
335 public Path[] getLocations() {
336 return locations;
337 }
338
339 @Override
340 public String toString() {
341 StringBuilder str = new StringBuilder(getClass().getName());
342 str.append(" locations=[");
343 for (int i = 0; i < locations.length; ++i) {
344 if (i > 0) str.append(", ");
345 str.append(locations[i].toString());
346 }
347 str.append("]");
348 return str.toString();
349 }
350
351
352
353
354 public boolean exists(final FileSystem fs) throws IOException {
355 for (int i = 0; i < locations.length; ++i) {
356 if (fs.exists(locations[i])) {
357 return true;
358 }
359 }
360 return false;
361 }
362
363
364
365
366 public Path getAvailablePath(FileSystem fs) throws IOException {
367 for (int i = 0; i < locations.length; ++i) {
368 if (fs.exists(locations[i])) {
369 return locations[i];
370 }
371 }
372 throw new FileNotFoundException("Unable to open link: " + this);
373 }
374
375
376
377
378
379
380
381
382 public FileStatus getFileStatus(FileSystem fs) throws IOException {
383 for (int i = 0; i < locations.length; ++i) {
384 try {
385 return fs.getFileStatus(locations[i]);
386 } catch (FileNotFoundException e) {
387
388 }
389 }
390 throw new FileNotFoundException("Unable to open link: " + this);
391 }
392
393
394
395
396
397
398
399
400
401
402
403 public FSDataInputStream open(final FileSystem fs) throws IOException {
404 return new FSDataInputStream(new FileLinkInputStream(fs, this));
405 }
406
407
408
409
410
411
412
413
414
415
416
417
418 public FSDataInputStream open(final FileSystem fs, int bufferSize) throws IOException {
419 return new FSDataInputStream(new FileLinkInputStream(fs, this, bufferSize));
420 }
421
422
423
424
425
426 protected void setLocations(Path originPath, Path... alternativePaths) {
427 assert this.locations == null : "Link locations already set";
428
429 List<Path> paths = new ArrayList<Path>(alternativePaths.length +1);
430 if (originPath != null) {
431 paths.add(originPath);
432 }
433
434 for (int i = 0; i < alternativePaths.length; i++) {
435 if (alternativePaths[i] != null) {
436 paths.add(alternativePaths[i]);
437 }
438 }
439 this.locations = paths.toArray(new Path[0]);
440 }
441
442
443
444
445
446
447
448
449
450
451
452 public static Path getBackReferencesDir(final Path storeDir, final String fileName) {
453 return new Path(storeDir, BACK_REFERENCES_DIRECTORY_PREFIX + fileName);
454 }
455
456
457
458
459
460
461
462 public static String getBackReferenceFileName(final Path dirPath) {
463 return dirPath.getName().substring(BACK_REFERENCES_DIRECTORY_PREFIX.length());
464 }
465
466
467
468
469
470
471
472 public static boolean isBackReferencesDir(final Path dirPath) {
473 if (dirPath == null) return false;
474 return dirPath.getName().startsWith(BACK_REFERENCES_DIRECTORY_PREFIX);
475 }
476
477 @Override
478 public boolean equals(Object obj) {
479 if (obj == null) {
480 return false;
481 }
482
483
484
485 if (this.getClass().equals(obj.getClass())) {
486 return Arrays.equals(this.locations, ((FileLink) obj).locations);
487 }
488
489 return false;
490 }
491
492 @Override
493 public int hashCode() {
494 return Arrays.hashCode(locations);
495 }
496 }
497