1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.regionserver.wal;
21
22 import java.io.EOFException;
23 import java.io.IOException;
24 import java.nio.ByteBuffer;
25 import java.util.ArrayList;
26 import java.util.Arrays;
27 import java.util.List;
28
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.apache.hadoop.hbase.classification.InterfaceAudience;
32 import org.apache.hadoop.conf.Configuration;
33 import org.apache.hadoop.fs.Path;
34 import org.apache.hadoop.fs.FileSystem;
35 import org.apache.hadoop.fs.FSDataInputStream;
36 import org.apache.hadoop.hbase.codec.Codec;
37 import org.apache.hadoop.hbase.io.LimitInputStream;
38 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
39 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
40 import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
41 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALHeader.Builder;
42 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey;
43 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer;
44 import org.apache.hadoop.hbase.util.Bytes;
45 import org.apache.hadoop.hbase.wal.WAL.Entry;
46
47 import com.google.protobuf.CodedInputStream;
48 import com.google.protobuf.InvalidProtocolBufferException;
49
50
51
52
53
54
55
56
57
58
59
60
61 @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX,
62 HBaseInterfaceAudience.CONFIG})
63 public class ProtobufLogReader extends ReaderBase {
64 private static final Log LOG = LogFactory.getLog(ProtobufLogReader.class);
65
66 @InterfaceAudience.Private
67 public static final byte[] PB_WAL_MAGIC = Bytes.toBytes("PWAL");
68
69 @InterfaceAudience.Private
70 public static final byte[] PB_WAL_COMPLETE_MAGIC = Bytes.toBytes("LAWP");
71
72
73
74
75 static final String WAL_TRAILER_WARN_SIZE = "hbase.regionserver.waltrailer.warn.size";
76 static final int DEFAULT_WAL_TRAILER_WARN_SIZE = 1024 * 1024;
77
78 protected FSDataInputStream inputStream;
79 protected Codec.Decoder cellDecoder;
80 protected WALCellCodec.ByteStringUncompressor byteStringUncompressor;
81 protected boolean hasCompression = false;
82 protected boolean hasTagCompression = false;
83
84
85 private long walEditsStopOffset;
86 private boolean trailerPresent;
87 protected WALTrailer trailer;
88
89
90 protected int trailerWarnSize;
91 private static List<String> writerClsNames = new ArrayList<String>();
92 static {
93 writerClsNames.add(ProtobufLogWriter.class.getSimpleName());
94 }
95
96
97 private String codecClsName = null;
98
99 @InterfaceAudience.Private
100 public long trailerSize() {
101 if (trailerPresent) {
102
103 final long calculatedSize = PB_WAL_COMPLETE_MAGIC.length + Bytes.SIZEOF_INT + trailer.getSerializedSize();
104 final long expectedSize = fileLength - walEditsStopOffset;
105 if (expectedSize != calculatedSize) {
106 LOG.warn("After parsing the trailer, we expect the total footer to be "+ expectedSize +" bytes, but we calculate it as being " + calculatedSize);
107 }
108 return expectedSize;
109 } else {
110 return -1L;
111 }
112 }
113
114 enum WALHdrResult {
115 EOF,
116 SUCCESS,
117 UNKNOWN_WRITER_CLS
118 }
119
120
121 static class WALHdrContext {
122 WALHdrResult result;
123 String cellCodecClsName;
124
125 WALHdrContext(WALHdrResult result, String cellCodecClsName) {
126 this.result = result;
127 this.cellCodecClsName = cellCodecClsName;
128 }
129 WALHdrResult getResult() {
130 return result;
131 }
132 String getCellCodecClsName() {
133 return cellCodecClsName;
134 }
135 }
136
137 public ProtobufLogReader() {
138 super();
139 }
140
141 @Override
142 public void close() throws IOException {
143 if (this.inputStream != null) {
144 this.inputStream.close();
145 this.inputStream = null;
146 }
147 }
148
149 @Override
150 public long getPosition() throws IOException {
151 return inputStream.getPos();
152 }
153
154 @Override
155 public void reset() throws IOException {
156 String clsName = initInternal(null, false);
157 initAfterCompression(clsName);
158 }
159
160 @Override
161 public void init(FileSystem fs, Path path, Configuration conf, FSDataInputStream stream)
162 throws IOException {
163 this.trailerWarnSize = conf.getInt(WAL_TRAILER_WARN_SIZE, DEFAULT_WAL_TRAILER_WARN_SIZE);
164 super.init(fs, path, conf, stream);
165 }
166
167 @Override
168 protected String initReader(FSDataInputStream stream) throws IOException {
169 return initInternal(stream, true);
170 }
171
172
173
174
175 public List<String> getWriterClsNames() {
176 return writerClsNames;
177 }
178
179
180
181
182 public String getCodecClsName() {
183 return codecClsName;
184 }
185
186 protected WALHdrContext readHeader(Builder builder, FSDataInputStream stream)
187 throws IOException {
188 boolean res = builder.mergeDelimitedFrom(stream);
189 if (!res) return new WALHdrContext(WALHdrResult.EOF, null);
190 if (builder.hasWriterClsName() &&
191 !getWriterClsNames().contains(builder.getWriterClsName())) {
192 return new WALHdrContext(WALHdrResult.UNKNOWN_WRITER_CLS, null);
193 }
194 String clsName = null;
195 if (builder.hasCellCodecClsName()) {
196 clsName = builder.getCellCodecClsName();
197 }
198 return new WALHdrContext(WALHdrResult.SUCCESS, clsName);
199 }
200
201 private String initInternal(FSDataInputStream stream, boolean isFirst)
202 throws IOException {
203 close();
204 long expectedPos = PB_WAL_MAGIC.length;
205 if (stream == null) {
206 stream = fs.open(path);
207 stream.seek(expectedPos);
208 }
209 if (stream.getPos() != expectedPos) {
210 throw new IOException("The stream is at invalid position: " + stream.getPos());
211 }
212
213 WALProtos.WALHeader.Builder builder = WALProtos.WALHeader.newBuilder();
214 WALHdrContext hdrCtxt = readHeader(builder, stream);
215 WALHdrResult walHdrRes = hdrCtxt.getResult();
216 if (walHdrRes == WALHdrResult.EOF) {
217 throw new EOFException("Couldn't read WAL PB header");
218 }
219 if (walHdrRes == WALHdrResult.UNKNOWN_WRITER_CLS) {
220 throw new IOException("Got unknown writer class: " + builder.getWriterClsName());
221 }
222 if (isFirst) {
223 WALProtos.WALHeader header = builder.build();
224 this.hasCompression = header.hasHasCompression() && header.getHasCompression();
225 this.hasTagCompression = header.hasHasTagCompression() && header.getHasTagCompression();
226 }
227 this.inputStream = stream;
228 this.walEditsStopOffset = this.fileLength;
229 long currentPosition = stream.getPos();
230 trailerPresent = setTrailerIfPresent();
231 this.seekOnFs(currentPosition);
232 if (LOG.isTraceEnabled()) {
233 LOG.trace("After reading the trailer: walEditsStopOffset: " + this.walEditsStopOffset
234 + ", fileLength: " + this.fileLength + ", " + "trailerPresent: " + (trailerPresent ? "true, size: " + trailer.getSerializedSize() : "false") + ", currentPosition: " + currentPosition);
235 }
236
237 codecClsName = hdrCtxt.getCellCodecClsName();
238
239 return hdrCtxt.getCellCodecClsName();
240 }
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259 private boolean setTrailerIfPresent() {
260 try {
261 long trailerSizeOffset = this.fileLength - (PB_WAL_COMPLETE_MAGIC.length + Bytes.SIZEOF_INT);
262 if (trailerSizeOffset <= 0) return false;
263 this.seekOnFs(trailerSizeOffset);
264
265 int trailerSize = this.inputStream.readInt();
266 ByteBuffer buf = ByteBuffer.allocate(ProtobufLogReader.PB_WAL_COMPLETE_MAGIC.length);
267 this.inputStream.readFully(buf.array(), buf.arrayOffset(), buf.capacity());
268 if (!Arrays.equals(buf.array(), PB_WAL_COMPLETE_MAGIC)) {
269 LOG.trace("No trailer found.");
270 return false;
271 }
272 if (trailerSize < 0) {
273 LOG.warn("Invalid trailer Size " + trailerSize + ", ignoring the trailer");
274 return false;
275 } else if (trailerSize > this.trailerWarnSize) {
276
277 LOG.warn("Please investigate WALTrailer usage. Trailer size > maximum configured size : "
278 + trailerSize + " > " + this.trailerWarnSize);
279 }
280
281 long positionOfTrailer = trailerSizeOffset - trailerSize;
282 this.seekOnFs(positionOfTrailer);
283
284 buf = ByteBuffer.allocate(trailerSize);
285 this.inputStream.readFully(buf.array(), buf.arrayOffset(), buf.capacity());
286 trailer = WALTrailer.parseFrom(buf.array());
287 this.walEditsStopOffset = positionOfTrailer;
288 return true;
289 } catch (IOException ioe) {
290 LOG.warn("Got IOE while reading the trailer. Continuing as if no trailer is present.", ioe);
291 }
292 return false;
293 }
294
295 protected WALCellCodec getCodec(Configuration conf, String cellCodecClsName,
296 CompressionContext compressionContext) throws IOException {
297 return WALCellCodec.create(conf, cellCodecClsName, compressionContext);
298 }
299
300 @Override
301 protected void initAfterCompression() throws IOException {
302 initAfterCompression(null);
303 }
304
305 @Override
306 protected void initAfterCompression(String cellCodecClsName) throws IOException {
307 WALCellCodec codec = getCodec(this.conf, cellCodecClsName, this.compressionContext);
308 this.cellDecoder = codec.getDecoder(this.inputStream);
309 if (this.hasCompression) {
310 this.byteStringUncompressor = codec.getByteStringUncompressor();
311 }
312 }
313
314 @Override
315 protected boolean hasCompression() {
316 return this.hasCompression;
317 }
318
319 @Override
320 protected boolean hasTagCompression() {
321 return this.hasTagCompression;
322 }
323
324 @Override
325 protected boolean readNext(Entry entry) throws IOException {
326 while (true) {
327
328 long originalPosition = this.inputStream.getPos();
329 if (trailerPresent && originalPosition > 0 && originalPosition == this.walEditsStopOffset) {
330 if (LOG.isTraceEnabled()) {
331 LOG.trace("Reached end of expected edits area at offset " + originalPosition);
332 }
333 return false;
334 }
335 WALKey.Builder builder = WALKey.newBuilder();
336 long size = 0;
337 boolean resetPosition = false;
338 try {
339 long available = -1;
340 try {
341 int firstByte = this.inputStream.read();
342 if (firstByte == -1) {
343 throw new EOFException("First byte is negative at offset " + originalPosition);
344 }
345 size = CodedInputStream.readRawVarint32(firstByte, this.inputStream);
346
347 available = this.inputStream.available();
348 if (available > 0 && available < size) {
349 throw new EOFException("Available stream not enough for edit, " +
350 "inputStream.available()= " + this.inputStream.available() + ", " +
351 "entry size= " + size + " at offset = " + this.inputStream.getPos());
352 }
353 ProtobufUtil.mergeFrom(builder, new LimitInputStream(this.inputStream, size),
354 (int)size);
355 } catch (InvalidProtocolBufferException ipbe) {
356 resetPosition = true;
357 throw (EOFException) new EOFException("Invalid PB, EOF? Ignoring; originalPosition=" +
358 originalPosition + ", currentPosition=" + this.inputStream.getPos() +
359 ", messageSize=" + size + ", currentAvailable=" + available).initCause(ipbe);
360 }
361 if (!builder.isInitialized()) {
362
363
364 throw new EOFException("Partial PB while reading WAL, " +
365 "probably an unexpected EOF, ignoring. current offset=" + this.inputStream.getPos());
366 }
367 WALKey walKey = builder.build();
368 entry.getKey().readFieldsFromPb(walKey, this.byteStringUncompressor);
369 if (!walKey.hasFollowingKvCount() || 0 == walKey.getFollowingKvCount()) {
370 if (LOG.isTraceEnabled()) {
371 LOG.trace("WALKey has no KVs that follow it; trying the next one. current offset=" + this.inputStream.getPos());
372 }
373 seekOnFs(originalPosition);
374 return false;
375 }
376 int expectedCells = walKey.getFollowingKvCount();
377 long posBefore = this.inputStream.getPos();
378 try {
379 int actualCells = entry.getEdit().readFromCells(cellDecoder, expectedCells);
380 if (expectedCells != actualCells) {
381 resetPosition = true;
382 throw new EOFException("Only read " + actualCells);
383 }
384 } catch (Exception ex) {
385 String posAfterStr = "<unknown>";
386 try {
387 posAfterStr = this.inputStream.getPos() + "";
388 } catch (Throwable t) {
389 if (LOG.isTraceEnabled()) {
390 LOG.trace("Error getting pos for error message - ignoring", t);
391 }
392 }
393 String message = " while reading " + expectedCells + " WAL KVs; started reading at "
394 + posBefore + " and read up to " + posAfterStr;
395 IOException realEofEx = extractHiddenEof(ex);
396 throw (EOFException) new EOFException("EOF " + message).
397 initCause(realEofEx != null ? realEofEx : ex);
398 }
399 if (trailerPresent && this.inputStream.getPos() > this.walEditsStopOffset) {
400 LOG.error("Read WALTrailer while reading WALEdits. wal: " + this.path
401 + ", inputStream.getPos(): " + this.inputStream.getPos() + ", walEditsStopOffset: "
402 + this.walEditsStopOffset);
403 throw new EOFException("Read WALTrailer while reading WALEdits");
404 }
405 } catch (EOFException eof) {
406
407 if (originalPosition < 0) {
408 if (LOG.isTraceEnabled()) {
409 LOG.trace("Encountered a malformed edit, but can't seek back to last good position "
410 + "because originalPosition is negative. last offset="
411 + this.inputStream.getPos(), eof);
412 }
413 throw eof;
414 }
415
416 if (inputStream.getPos() == originalPosition && resetPosition) {
417 if (LOG.isTraceEnabled()) {
418 LOG.trace("Encountered a malformed edit, seeking to the beginning of the WAL since "
419 + "current position and original position match at " + originalPosition);
420 }
421 seekOnFs(0);
422 } else {
423
424
425 if (LOG.isTraceEnabled()) {
426 LOG.trace("Encountered a malformed edit, seeking back to last good position in file, "
427 + "from " + inputStream.getPos()+" to " + originalPosition, eof);
428 }
429 seekOnFs(originalPosition);
430 }
431 return false;
432 }
433 return true;
434 }
435 }
436
437 private IOException extractHiddenEof(Exception ex) {
438
439
440 IOException ioEx = null;
441 if (ex instanceof EOFException) {
442 return (EOFException)ex;
443 } else if (ex instanceof IOException) {
444 ioEx = (IOException)ex;
445 } else if (ex instanceof RuntimeException
446 && ex.getCause() != null && ex.getCause() instanceof IOException) {
447 ioEx = (IOException)ex.getCause();
448 }
449 if (ioEx != null) {
450 if (ioEx.getMessage().contains("EOF")) return ioEx;
451 return null;
452 }
453 return null;
454 }
455
456 @Override
457 protected void seekOnFs(long pos) throws IOException {
458 this.inputStream.seek(pos);
459 }
460 }