1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.regionserver.wal;
21
22 import java.io.IOException;
23 import java.lang.reflect.Field;
24 import java.util.NavigableMap;
25
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.apache.hadoop.conf.Configuration;
29 import org.apache.hadoop.fs.FSDataInputStream;
30 import org.apache.hadoop.fs.FileSystem;
31 import org.apache.hadoop.fs.Path;
32 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
33 import org.apache.hadoop.hbase.classification.InterfaceAudience;
34 import org.apache.hadoop.hbase.wal.WAL.Entry;
35 import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
36 import org.apache.hadoop.io.SequenceFile;
37 import org.apache.hadoop.io.SequenceFile.Metadata;
38 import org.apache.hadoop.io.Text;
39
40 @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX,
41 HBaseInterfaceAudience.CONFIG})
42 public class SequenceFileLogReader extends ReaderBase {
43 private static final Log LOG = LogFactory.getLog(SequenceFileLogReader.class);
44
45
46 private static final Text WAL_VERSION_KEY = new Text("version");
47
48
49
50 private static final int COMPRESSION_VERSION = 1;
51 private static final Text WAL_COMPRESSION_TYPE_KEY = new Text("compression.type");
52 private static final Text DICTIONARY_COMPRESSION_TYPE = new Text("dictionary");
53
54
55
56
57
58
59
60
61
62
63
64
65
66 private static class WALReader extends SequenceFile.Reader {
67
68 WALReader(final FileSystem fs, final Path p, final Configuration c)
69 throws IOException {
70 super(fs, p, c);
71 }
72
73 @Override
74 protected FSDataInputStream openFile(FileSystem fs, Path file,
75 int bufferSize, long length)
76 throws IOException {
77 return new WALReaderFSDataInputStream(super.openFile(fs, file,
78 bufferSize, length), length);
79 }
80
81
82
83
84 static class WALReaderFSDataInputStream extends FSDataInputStream {
85 private boolean firstGetPosInvocation = true;
86 private long length;
87
88 WALReaderFSDataInputStream(final FSDataInputStream is, final long l)
89 throws IOException {
90 super(is);
91 this.length = l;
92 }
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109 @Override
110 public long getPos() throws IOException {
111 if (this.firstGetPosInvocation) {
112 this.firstGetPosInvocation = false;
113 long adjust = 0;
114 HdfsDataInputStream hdfsDataInputStream = null;
115 try {
116 if (this.in.getClass().getName().endsWith("HdfsDataInputStream")
117 || this.in.getClass().getName().endsWith("DFSInputStream")) {
118 hdfsDataInputStream = (HdfsDataInputStream) this.getWrappedStream();
119 long realLength = hdfsDataInputStream.getVisibleLength();
120 assert(realLength >= this.length);
121 adjust = realLength - this.length;
122 } else {
123 LOG.info(
124 "Input stream class: " + this.in.getClass().getName() + ", not adjusting length");
125 }
126 } catch (Exception e) {
127 LOG.warn("Error while trying to get accurate file length. "
128 + "Truncation / data loss may occur if RegionServers die.",
129 e);
130 throw new IOException(e);
131 }
132 return adjust + super.getPos();
133 }
134 return super.getPos();
135 }
136 }
137 }
138
139
140 protected SequenceFile.Reader reader;
141 long entryStart = 0;
142
143 public SequenceFileLogReader() {
144 super();
145 }
146
147 @Override
148 public void close() throws IOException {
149 try {
150 if (reader != null) {
151 this.reader.close();
152 this.reader = null;
153 }
154 } catch (IOException ioe) {
155 throw addFileInfoToException(ioe);
156 }
157 }
158
159 @Override
160 public long getPosition() throws IOException {
161 return reader != null ? reader.getPosition() : 0;
162 }
163
164 @Override
165 public void reset() throws IOException {
166
167
168 reader = new WALReader(fs, path, conf);
169 }
170
171 @Override
172 protected String initReader(FSDataInputStream stream) throws IOException {
173
174 if (stream != null) {
175 stream.close();
176 }
177 reset();
178 return null;
179 }
180
181 @Override
182 protected void initAfterCompression(String cellCodecClsName) throws IOException {
183
184 }
185
186 @Override
187 protected void initAfterCompression() throws IOException {
188
189 }
190
191 @Override
192 protected boolean hasCompression() {
193 return isWALCompressionEnabled(reader.getMetadata());
194 }
195
196 @Override
197 protected boolean hasTagCompression() {
198
199 return false;
200 }
201
202
203
204
205
206 static boolean isWALCompressionEnabled(final Metadata metadata) {
207
208 Text txt = metadata.get(WAL_VERSION_KEY);
209 if (txt == null || Integer.parseInt(txt.toString()) < COMPRESSION_VERSION) {
210 return false;
211 }
212
213 txt = metadata.get(WAL_COMPRESSION_TYPE_KEY);
214 return txt != null && txt.equals(DICTIONARY_COMPRESSION_TYPE);
215 }
216
217
218
219
220
221
222
223
224 @Override
225 protected boolean readNext(Entry e) throws IOException {
226 try {
227 if (!(e.getKey() instanceof HLogKey)) {
228 final IllegalArgumentException exception = new IllegalArgumentException(
229 "SequenceFileLogReader only works when given entries that have HLogKey for keys. This" +
230 " one had '" + e.getKey().getClass() + "'");
231 LOG.error("We need to use the legacy SequenceFileLogReader to handle a " +
232 " pre-0.96 style WAL, but HBase internals failed to use the deprecated HLogKey class." +
233 " This is a bug; please file an issue or email the developer mailing list. You will " +
234 "need the following exception details when seeking help from the HBase community.",
235 exception);
236 throw exception;
237 }
238 boolean hasNext = this.reader.next((HLogKey)e.getKey(), e.getEdit());
239 if (!hasNext) return false;
240
241 NavigableMap<byte[], Integer> scopes = e.getEdit().getAndRemoveScopes();
242 if (scopes != null) {
243 e.getKey().readOlderScopes(scopes);
244 }
245 return true;
246 } catch (IOException ioe) {
247 throw addFileInfoToException(ioe);
248 }
249 }
250
251 @Override
252 protected void seekOnFs(long pos) throws IOException {
253 try {
254 reader.seek(pos);
255 } catch (IOException ioe) {
256 throw addFileInfoToException(ioe);
257 }
258 }
259
260 protected IOException addFileInfoToException(final IOException ioe)
261 throws IOException {
262 long pos = -1;
263 try {
264 pos = getPosition();
265 } catch (IOException e) {
266 LOG.warn("Failed getting position to add to throw", e);
267 }
268
269
270 long end = Long.MAX_VALUE;
271 try {
272 Field fEnd = SequenceFile.Reader.class.getDeclaredField("end");
273 fEnd.setAccessible(true);
274 end = fEnd.getLong(this.reader);
275 } catch(NoSuchFieldException nfe) {
276
277 if (LOG.isTraceEnabled()) LOG.trace(nfe);
278 } catch(IllegalAccessException iae) {
279
280 if (LOG.isTraceEnabled()) LOG.trace(iae);
281 } catch(Exception e) {
282
283 LOG.warn("Unexpected exception when accessing the end field", e);
284 }
285
286 String msg = (this.path == null? "": this.path.toString()) +
287 ", entryStart=" + entryStart + ", pos=" + pos +
288 ((end == Long.MAX_VALUE) ? "" : ", end=" + end) +
289 ", edit=" + this.edit;
290
291
292 try {
293 return (IOException) ioe.getClass()
294 .getConstructor(String.class)
295 .newInstance(msg)
296 .initCause(ioe);
297 } catch(NoSuchMethodException nfe) {
298
299 if (LOG.isTraceEnabled()) LOG.trace(nfe);
300 } catch(IllegalAccessException iae) {
301
302 if (LOG.isTraceEnabled()) LOG.trace(iae);
303 } catch(Exception e) {
304
305 LOG.warn("Unexpected exception when accessing the end field", e);
306 }
307 return ioe;
308 }
309 }