001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.wal;
019
020import java.io.EOFException;
021import java.io.IOException;
022import java.io.InputStream;
023import org.apache.hadoop.fs.FSDataInputStream;
024import org.apache.hadoop.fs.FileStatus;
025import org.apache.hadoop.hbase.io.DelegatingInputStream;
026import org.apache.hadoop.hbase.io.util.StreamUtils;
027import org.apache.hadoop.hbase.util.Pair;
028import org.apache.hadoop.hbase.wal.WAL.Entry;
029import org.apache.hadoop.hbase.wal.WALTailingReader;
030import org.apache.yetus.audience.InterfaceAudience;
031import org.slf4j.Logger;
032import org.slf4j.LoggerFactory;
033
034import org.apache.hbase.thirdparty.com.google.common.io.ByteStreams;
035import org.apache.hbase.thirdparty.com.google.protobuf.CodedInputStream;
036import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
037
038import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
039import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
040
041/**
042 * A WAL reader for replication. It supports reset so can be used to tail a WAL file which is being
043 * written currently.
044 */
045@InterfaceAudience.Private
046public class ProtobufWALTailingReader extends AbstractProtobufWALReader
047  implements WALTailingReader {
048
049  private static final Logger LOG = LoggerFactory.getLogger(ProtobufWALStreamReader.class);
050
051  private DelegatingInputStream delegatingInput;
052
053  private static final class ReadWALKeyResult {
054    final State state;
055    final Entry entry;
056    final int followingKvCount;
057
058    public ReadWALKeyResult(State state, Entry entry, int followingKvCount) {
059      this.state = state;
060      this.entry = entry;
061      this.followingKvCount = followingKvCount;
062    }
063  }
064
065  private static final ReadWALKeyResult KEY_ERROR_AND_RESET =
066    new ReadWALKeyResult(State.ERROR_AND_RESET, null, 0);
067
068  private static final ReadWALKeyResult KEY_EOF_AND_RESET =
069    new ReadWALKeyResult(State.EOF_AND_RESET, null, 0);
070
071  private IOException unwrapIPBE(IOException e) {
072    if (e instanceof InvalidProtocolBufferException) {
073      return ((InvalidProtocolBufferException) e).unwrapIOException();
074    } else {
075      return e;
076    }
077  }
078
079  private ReadWALKeyResult readWALKey(long originalPosition) {
080    int firstByte;
081    try {
082      firstByte = delegatingInput.read();
083    } catch (IOException e) {
084      LOG.warn("Failed to read wal key length first byte", e);
085      return KEY_ERROR_AND_RESET;
086    }
087    if (firstByte == -1) {
088      return KEY_EOF_AND_RESET;
089    }
090    int size;
091    try {
092      size = CodedInputStream.readRawVarint32(firstByte, delegatingInput);
093    } catch (IOException e) {
094      // if we are reading a partial WALTrailer, the size will just be 0 so we will not get an
095      // exception here, so do not need to check whether it is a partial WALTrailer.
096      if (
097        e instanceof InvalidProtocolBufferException
098          && ProtobufUtil.isEOF((InvalidProtocolBufferException) e)
099      ) {
100        LOG.info("EOF while reading WALKey, originalPosition={}, currentPosition={}, error={}",
101          originalPosition, getPositionQuietly(), e.toString());
102        return KEY_EOF_AND_RESET;
103      } else {
104        LOG.warn("Failed to read wal key length", e);
105        return KEY_ERROR_AND_RESET;
106      }
107    }
108    if (size < 0) {
109      LOG.warn("Negative pb message size read: {}, malformed WAL file?", size);
110      return KEY_ERROR_AND_RESET;
111    }
112    int available;
113    try {
114      available = delegatingInput.available();
115    } catch (IOException e) {
116      LOG.warn("Failed to get available bytes", e);
117      return KEY_ERROR_AND_RESET;
118    }
119    if (available > 0 && available < size) {
120      LOG.info(
121        "Available stream not enough for edit, available={}, " + "entry size={} at offset={}",
122        available, size, getPositionQuietly());
123      return KEY_EOF_AND_RESET;
124    }
125    WALProtos.WALKey walKey;
126    try {
127      if (available > 0) {
128        walKey = WALProtos.WALKey.parseFrom(ByteStreams.limit(delegatingInput, size));
129      } else {
130        byte[] content = new byte[size];
131        ByteStreams.readFully(delegatingInput, content);
132        walKey = WALProtos.WALKey.parseFrom(content);
133      }
134    } catch (IOException e) {
135      e = unwrapIPBE(e);
136      if (
137        e instanceof EOFException || (e instanceof InvalidProtocolBufferException
138          && ProtobufUtil.isEOF((InvalidProtocolBufferException) e))
139      ) {
140        LOG.info("EOF while reading WALKey, originalPosition={}, currentPosition={}, error={}",
141          originalPosition, getPositionQuietly(), e.toString());
142        return KEY_EOF_AND_RESET;
143      } else {
144        boolean isWALTrailer;
145        try {
146          isWALTrailer = isWALTrailer(originalPosition);
147        } catch (IOException ioe) {
148          LOG.warn("Error while testing whether this is a partial WAL trailer, originalPosition={},"
149            + " currentPosition={}", originalPosition, getPositionQuietly(), e);
150          return KEY_ERROR_AND_RESET;
151        }
152        if (isWALTrailer) {
153          LOG.info("Reached partial WAL Trailer(EOF) while reading WALKey, originalPosition={},"
154            + " currentPosition={}", originalPosition, getPositionQuietly(), e);
155          return KEY_EOF_AND_RESET;
156        } else {
157          // for all other type of IPBEs or IOEs, it means the WAL key is broken
158          LOG.warn("Error while reading WALKey, originalPosition={}, currentPosition={}",
159            originalPosition, getPositionQuietly(), e);
160          return KEY_ERROR_AND_RESET;
161        }
162      }
163    }
164    Entry entry = new Entry();
165    try {
166      entry.getKey().readFieldsFromPb(walKey, byteStringUncompressor);
167    } catch (IOException e) {
168      LOG.warn("Failed to read wal key fields from pb message", e);
169      return KEY_ERROR_AND_RESET;
170    }
171    return new ReadWALKeyResult(State.NORMAL, entry,
172      walKey.hasFollowingKvCount() ? walKey.getFollowingKvCount() : 0);
173  }
174
175  private Result editEof() {
176    return hasCompression
177      ? State.EOF_AND_RESET_COMPRESSION.getResult()
178      : State.EOF_AND_RESET.getResult();
179  }
180
181  private Result editError() {
182    return hasCompression
183      ? State.ERROR_AND_RESET_COMPRESSION.getResult()
184      : State.ERROR_AND_RESET.getResult();
185  }
186
187  private Result readWALEdit(Entry entry, int followingKvCount) {
188    long posBefore;
189    try {
190      posBefore = inputStream.getPos();
191    } catch (IOException e) {
192      LOG.warn("failed to get position", e);
193      return State.ERROR_AND_RESET.getResult();
194    }
195    if (followingKvCount == 0) {
196      LOG.trace("WALKey has no KVs that follow it; trying the next one. current offset={}",
197        posBefore);
198      return new Result(State.NORMAL, entry, posBefore);
199    }
200    int actualCells;
201    try {
202      actualCells = entry.getEdit().readFromCells(cellDecoder, followingKvCount);
203    } catch (Exception e) {
204      String message = " while reading " + followingKvCount + " WAL KVs; started reading at "
205        + posBefore + " and read up to " + getPositionQuietly();
206      IOException realEofEx = extractHiddenEof(e);
207      if (realEofEx != null) {
208        LOG.warn("EOF " + message, realEofEx);
209        return editEof();
210      } else {
211        LOG.warn("Error " + message, e);
212        return editError();
213      }
214    }
215    if (actualCells != followingKvCount) {
216      LOG.warn("Only read {} cells, expected {}; started reading at {} and read up to {}",
217        actualCells, followingKvCount, posBefore, getPositionQuietly());
218      return editEof();
219    }
220    long posAfter;
221    try {
222      posAfter = inputStream.getPos();
223    } catch (IOException e) {
224      LOG.warn("failed to get position", e);
225      return editError();
226    }
227    if (trailerPresent && posAfter > this.walEditsStopOffset) {
228      LOG.error("Read WALTrailer while reading WALEdits. wal: {}, inputStream.getPos(): {},"
229        + " walEditsStopOffset: {}", path, posAfter, walEditsStopOffset);
230      return editEof();
231    }
232    return new Result(State.NORMAL, entry, posAfter);
233  }
234
235  @Override
236  public Result next(long limit) {
237    long originalPosition;
238    try {
239      originalPosition = inputStream.getPos();
240    } catch (IOException e) {
241      LOG.warn("failed to get position", e);
242      return State.EOF_AND_RESET.getResult();
243    }
244    if (reachWALEditsStopOffset(originalPosition)) {
245      return State.EOF_WITH_TRAILER.getResult();
246    }
247    if (limit < 0) {
248      // should be closed WAL file, set to no limit, i.e, just use the original inputStream
249      delegatingInput.setDelegate(inputStream);
250    } else if (limit <= originalPosition) {
251      // no data available, just return EOF
252      return State.EOF_AND_RESET.getResult();
253    } else {
254      // calculate the remaining bytes we can read and set
255      delegatingInput.setDelegate(ByteStreams.limit(inputStream, limit - originalPosition));
256    }
257    ReadWALKeyResult readKeyResult = readWALKey(originalPosition);
258    if (readKeyResult.state != State.NORMAL) {
259      return readKeyResult.state.getResult();
260    }
261    return readWALEdit(readKeyResult.entry, readKeyResult.followingKvCount);
262  }
263
264  private void skipHeader(FSDataInputStream stream) throws IOException {
265    stream.seek(PB_WAL_MAGIC.length);
266    int headerLength = StreamUtils.readRawVarint32(stream);
267    stream.seek(stream.getPos() + headerLength);
268  }
269
270  @Override
271  public void resetTo(long position, boolean resetCompression) throws IOException {
272    close();
273    Pair<FSDataInputStream, FileStatus> pair = open();
274    boolean resetSucceed = false;
275    try {
276      if (!trailerPresent) {
277        // try read trailer this time
278        readTrailer(pair.getFirst(), pair.getSecond());
279      }
280      inputStream = pair.getFirst();
281      delegatingInput.setDelegate(inputStream);
282      if (position < 0) {
283        // read from the beginning
284        if (compressionCtx != null) {
285          compressionCtx.clear();
286        }
287        skipHeader(inputStream);
288      } else if (resetCompression && compressionCtx != null) {
289        // clear compressCtx and skip to the expected position, to fill up the dictionary
290        compressionCtx.clear();
291        skipHeader(inputStream);
292        if (position != inputStream.getPos()) {
293          skipTo(position);
294        }
295      } else {
296        // just seek to the expected position
297        inputStream.seek(position);
298      }
299      resetSucceed = true;
300    } finally {
301      if (!resetSucceed) {
302        // close the input stream to avoid resource leak
303        close();
304      }
305    }
306  }
307
308  @Override
309  protected InputStream getCellCodecInputStream(FSDataInputStream stream) {
310    delegatingInput = new DelegatingInputStream(stream);
311    return delegatingInput;
312  }
313
314  @Override
315  protected void skipTo(long position) throws IOException {
316    for (;;) {
317      Result result = next(-1);
318      if (result.getState() != State.NORMAL) {
319        throw new IOException("Can not skip to the given position " + position + ", stopped at "
320          + result.getEntryEndPos() + " which is still before the give position");
321      }
322      if (result.getEntryEndPos() == position) {
323        return;
324      }
325      if (result.getEntryEndPos() > position) {
326        throw new IOException("Can not skip to the given position " + position + ", stopped at "
327          + result.getEntryEndPos() + " which is already beyond the give position, malformed WAL?");
328      }
329    }
330  }
331}