1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.regionserver.wal;
21
22 import java.io.FilterInputStream;
23 import java.io.IOException;
24 import java.lang.reflect.Field;
25 import java.lang.reflect.Method;
26 import java.util.NavigableMap;
27
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.apache.hadoop.hbase.classification.InterfaceAudience;
31 import org.apache.hadoop.conf.Configuration;
32 import org.apache.hadoop.fs.FSDataInputStream;
33 import org.apache.hadoop.fs.FileSystem;
34 import org.apache.hadoop.fs.Path;
35 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
36 import org.apache.hadoop.hbase.wal.WAL.Entry;
37 import org.apache.hadoop.io.SequenceFile;
38 import org.apache.hadoop.io.SequenceFile.Metadata;
39 import org.apache.hadoop.io.Text;
40
41 @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX, HBaseInterfaceAudience.CONFIG})
42 public class SequenceFileLogReader extends ReaderBase {
43 private static final Log LOG = LogFactory.getLog(SequenceFileLogReader.class);
44
45
46 private static final Text WAL_VERSION_KEY = new Text("version");
47
48
49
50 private static final int COMPRESSION_VERSION = 1;
51 private static final Text WAL_COMPRESSION_TYPE_KEY = new Text("compression.type");
52 private static final Text DICTIONARY_COMPRESSION_TYPE = new Text("dictionary");
53
54
55
56
57
58
59
60
61
62
63
64
65
66 private static class WALReader extends SequenceFile.Reader {
67
68 WALReader(final FileSystem fs, final Path p, final Configuration c)
69 throws IOException {
70 super(fs, p, c);
71 }
72
73 @Override
74 protected FSDataInputStream openFile(FileSystem fs, Path file,
75 int bufferSize, long length)
76 throws IOException {
77 return new WALReaderFSDataInputStream(super.openFile(fs, file,
78 bufferSize, length), length);
79 }
80
81
82
83
84 static class WALReaderFSDataInputStream extends FSDataInputStream {
85 private boolean firstGetPosInvocation = true;
86 private long length;
87
88 WALReaderFSDataInputStream(final FSDataInputStream is, final long l)
89 throws IOException {
90 super(is);
91 this.length = l;
92 }
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109 @Override
110 public long getPos() throws IOException {
111 if (this.firstGetPosInvocation) {
112 this.firstGetPosInvocation = false;
113 long adjust = 0;
114
115 try {
116 Field fIn = FilterInputStream.class.getDeclaredField("in");
117 fIn.setAccessible(true);
118 Object realIn = fIn.get(this.in);
119
120
121 if (realIn.getClass().getName().endsWith("DFSInputStream")) {
122 Method getFileLength = realIn.getClass().
123 getDeclaredMethod("getFileLength", new Class<?> []{});
124 getFileLength.setAccessible(true);
125 long realLength = ((Long)getFileLength.
126 invoke(realIn, new Object []{})).longValue();
127 assert(realLength >= this.length);
128 adjust = realLength - this.length;
129 } else {
130 LOG.info("Input stream class: " + realIn.getClass().getName() +
131 ", not adjusting length");
132 }
133 } catch(Exception e) {
134 SequenceFileLogReader.LOG.warn(
135 "Error while trying to get accurate file length. " +
136 "Truncation / data loss may occur if RegionServers die.", e);
137 throw new IOException(e);
138 }
139
140 return adjust + super.getPos();
141 }
142 return super.getPos();
143 }
144 }
145 }
146
147
148 protected SequenceFile.Reader reader;
149 long entryStart = 0;
150
151 public SequenceFileLogReader() {
152 super();
153 }
154
155 @Override
156 public void close() throws IOException {
157 try {
158 if (reader != null) {
159 this.reader.close();
160 this.reader = null;
161 }
162 } catch (IOException ioe) {
163 throw addFileInfoToException(ioe);
164 }
165 }
166
167 @Override
168 public long getPosition() throws IOException {
169 return reader != null ? reader.getPosition() : 0;
170 }
171
172 @Override
173 public void reset() throws IOException {
174
175
176 reader = new WALReader(fs, path, conf);
177 }
178
179 @Override
180 protected String initReader(FSDataInputStream stream) throws IOException {
181
182 if (stream != null) {
183 stream.close();
184 }
185 reset();
186 return null;
187 }
188
189 @Override
190 protected void initAfterCompression(String cellCodecClsName) throws IOException {
191
192 }
193
194 @Override
195 protected void initAfterCompression() throws IOException {
196
197 }
198
199 @Override
200 protected boolean hasCompression() {
201 return isWALCompressionEnabled(reader.getMetadata());
202 }
203
204 @Override
205 protected boolean hasTagCompression() {
206
207 return false;
208 }
209
210
211
212
213
214 static boolean isWALCompressionEnabled(final Metadata metadata) {
215
216 Text txt = metadata.get(WAL_VERSION_KEY);
217 if (txt == null || Integer.parseInt(txt.toString()) < COMPRESSION_VERSION) {
218 return false;
219 }
220
221 txt = metadata.get(WAL_COMPRESSION_TYPE_KEY);
222 return txt != null && txt.equals(DICTIONARY_COMPRESSION_TYPE);
223 }
224
225
226
227
228
229
230
231
232 @Override
233 protected boolean readNext(Entry e) throws IOException {
234 try {
235 if (!(e.getKey() instanceof HLogKey)) {
236 final IllegalArgumentException exception = new IllegalArgumentException(
237 "SequenceFileLogReader only works when given entries that have HLogKey for keys. This" +
238 " one had '" + e.getKey().getClass() + "'");
239 LOG.error("We need to use the legacy SequenceFileLogReader to handle a " +
240 " pre-0.96 style WAL, but HBase internals failed to use the deprecated HLogKey class." +
241 " This is a bug; please file an issue or email the developer mailing list. You will " +
242 "need the following exception details when seeking help from the HBase community.",
243 exception);
244 throw exception;
245 }
246 boolean hasNext = this.reader.next((HLogKey)e.getKey(), e.getEdit());
247 if (!hasNext) return false;
248
249 NavigableMap<byte[], Integer> scopes = e.getEdit().getAndRemoveScopes();
250 if (scopes != null) {
251 e.getKey().readOlderScopes(scopes);
252 }
253 return true;
254 } catch (IOException ioe) {
255 throw addFileInfoToException(ioe);
256 }
257 }
258
259 @Override
260 protected void seekOnFs(long pos) throws IOException {
261 try {
262 reader.seek(pos);
263 } catch (IOException ioe) {
264 throw addFileInfoToException(ioe);
265 }
266 }
267
268 protected IOException addFileInfoToException(final IOException ioe)
269 throws IOException {
270 long pos = -1;
271 try {
272 pos = getPosition();
273 } catch (IOException e) {
274 LOG.warn("Failed getting position to add to throw", e);
275 }
276
277
278 long end = Long.MAX_VALUE;
279 try {
280 Field fEnd = SequenceFile.Reader.class.getDeclaredField("end");
281 fEnd.setAccessible(true);
282 end = fEnd.getLong(this.reader);
283 } catch(NoSuchFieldException nfe) {
284
285 } catch(IllegalAccessException iae) {
286
287 } catch(Exception e) {
288
289 LOG.warn("Unexpected exception when accessing the end field", e);
290 }
291
292 String msg = (this.path == null? "": this.path.toString()) +
293 ", entryStart=" + entryStart + ", pos=" + pos +
294 ((end == Long.MAX_VALUE) ? "" : ", end=" + end) +
295 ", edit=" + this.edit;
296
297
298 try {
299 return (IOException) ioe.getClass()
300 .getConstructor(String.class)
301 .newInstance(msg)
302 .initCause(ioe);
303 } catch(NoSuchMethodException nfe) {
304
305 } catch(IllegalAccessException iae) {
306
307 } catch(Exception e) {
308
309 LOG.warn("Unexpected exception when accessing the end field", e);
310 }
311 return ioe;
312 }
313 }