1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.procedure2.store.wal;
20
21 import java.io.IOException;
22
23 import org.apache.commons.logging.Log;
24 import org.apache.commons.logging.LogFactory;
25 import org.apache.hadoop.hbase.classification.InterfaceAudience;
26 import org.apache.hadoop.hbase.classification.InterfaceStability;
27 import org.apache.hadoop.fs.FSDataInputStream;
28 import org.apache.hadoop.fs.FileStatus;
29 import org.apache.hadoop.fs.FileSystem;
30 import org.apache.hadoop.fs.Path;
31 import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
32 import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALHeader;
33 import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALTrailer;
34
35
36
37
38 @InterfaceAudience.Private
39 @InterfaceStability.Evolving
40 public class ProcedureWALFile implements Comparable<ProcedureWALFile> {
41 private static final Log LOG = LogFactory.getLog(ProcedureWALFile.class);
42
43 private ProcedureWALHeader header;
44 private FSDataInputStream stream;
45 private FileStatus logStatus;
46 private FileSystem fs;
47 private Path logFile;
48 private long startPos;
49 private long minProcId;
50 private long maxProcId;
51
52 public ProcedureWALFile(final FileSystem fs, final FileStatus logStatus) {
53 this.fs = fs;
54 this.logStatus = logStatus;
55 this.logFile = logStatus.getPath();
56 }
57
58 public ProcedureWALFile(FileSystem fs, Path logFile, ProcedureWALHeader header, long startPos) {
59 this.fs = fs;
60 this.logFile = logFile;
61 this.header = header;
62 this.startPos = startPos;
63 }
64
65 public void open() throws IOException {
66 if (stream == null) {
67 stream = fs.open(logFile);
68 }
69
70 if (header == null) {
71 header = ProcedureWALFormat.readHeader(stream);
72 startPos = stream.getPos();
73 } else {
74 stream.seek(startPos);
75 }
76 }
77
78 public ProcedureWALTrailer readTrailer() throws IOException {
79 try {
80 return ProcedureWALFormat.readTrailer(stream, startPos, logStatus.getLen());
81 } finally {
82 stream.seek(startPos);
83 }
84 }
85
86 public void readTracker(ProcedureStoreTracker tracker) throws IOException {
87 ProcedureWALTrailer trailer = readTrailer();
88 try {
89 stream.seek(trailer.getTrackerPos());
90 tracker.readFrom(stream);
91 } finally {
92 stream.seek(startPos);
93 }
94 }
95
96 public void close() {
97 if (stream == null) return;
98 try {
99 stream.close();
100 } catch (IOException e) {
101 LOG.warn("unable to close the wal file: " + logFile, e);
102 } finally {
103 stream = null;
104 }
105 }
106
107 public FSDataInputStream getStream() {
108 return stream;
109 }
110
111 public ProcedureWALHeader getHeader() {
112 return header;
113 }
114
115 public boolean isCompacted() {
116 return header.getType() == ProcedureWALFormat.LOG_TYPE_COMPACTED;
117 }
118
119 public long getLogId() {
120 return header.getLogId();
121 }
122
123 public long getSize() {
124 return logStatus.getLen();
125 }
126
127 public void removeFile() throws IOException {
128 close();
129 fs.delete(logFile, false);
130 }
131
132 public void setProcIds(long minId, long maxId) {
133 this.minProcId = minId;
134 this.maxProcId = maxId;
135 }
136
137 public long getMinProcId() {
138 return minProcId;
139 }
140
141 public long getMaxProcId() {
142 return maxProcId;
143 }
144
145 @Override
146 public int compareTo(final ProcedureWALFile other) {
147 long diff = header.getLogId() - other.header.getLogId();
148 return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
149 }
150
151 @Override
152 public boolean equals(Object o) {
153 if (this == o) return true;
154 if (!(o instanceof ProcedureWALFile)) return false;
155 return compareTo((ProcedureWALFile)o) == 0;
156 }
157
158 @Override
159 public int hashCode() {
160 return logFile.hashCode();
161 }
162
163 @Override
164 public String toString() {
165 return logFile.toString();
166 }
167 }