001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import java.io.EOFException; 021import java.io.IOException; 022import java.io.InputStream; 023import org.apache.hadoop.fs.FSDataInputStream; 024import org.apache.hadoop.fs.FileStatus; 025import org.apache.hadoop.hbase.io.DelegatingInputStream; 026import org.apache.hadoop.hbase.io.util.StreamUtils; 027import org.apache.hadoop.hbase.util.Pair; 028import org.apache.hadoop.hbase.wal.WAL.Entry; 029import org.apache.hadoop.hbase.wal.WALTailingReader; 030import org.apache.yetus.audience.InterfaceAudience; 031import org.slf4j.Logger; 032import org.slf4j.LoggerFactory; 033 034import org.apache.hbase.thirdparty.com.google.common.io.ByteStreams; 035import org.apache.hbase.thirdparty.com.google.protobuf.CodedInputStream; 036import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException; 037 038import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 039import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; 040 041/** 042 * A WAL reader for replication. It supports reset so can be used to tail a WAL file which is being 043 * written currently. 044 */ 045@InterfaceAudience.Private 046public class ProtobufWALTailingReader extends AbstractProtobufWALReader 047 implements WALTailingReader { 048 049 private static final Logger LOG = LoggerFactory.getLogger(ProtobufWALTailingReader.class); 050 051 private DelegatingInputStream delegatingInput; 052 053 private static final class ReadWALKeyResult { 054 final State state; 055 final Entry entry; 056 final int followingKvCount; 057 058 public ReadWALKeyResult(State state, Entry entry, int followingKvCount) { 059 this.state = state; 060 this.entry = entry; 061 this.followingKvCount = followingKvCount; 062 } 063 } 064 065 private static final ReadWALKeyResult KEY_ERROR_AND_RESET = 066 new ReadWALKeyResult(State.ERROR_AND_RESET, null, 0); 067 068 private static final ReadWALKeyResult KEY_EOF_AND_RESET = 069 new ReadWALKeyResult(State.EOF_AND_RESET, null, 0); 070 071 private IOException unwrapIPBE(IOException e) { 072 if (e instanceof InvalidProtocolBufferException) { 073 return ((InvalidProtocolBufferException) e).unwrapIOException(); 074 } else { 075 return e; 076 } 077 } 078 079 private ReadWALKeyResult readWALKey(long originalPosition) { 080 int firstByte; 081 try { 082 firstByte = delegatingInput.read(); 083 } catch (IOException e) { 084 LOG.warn("Failed to read wal key length first byte", e); 085 return KEY_ERROR_AND_RESET; 086 } 087 if (firstByte == -1) { 088 return KEY_EOF_AND_RESET; 089 } 090 int size; 091 try { 092 size = CodedInputStream.readRawVarint32(firstByte, delegatingInput); 093 } catch (IOException e) { 094 // if we are reading a partial WALTrailer, the size will just be 0 so we will not get an 095 // exception here, so do not need to check whether it is a partial WALTrailer. 096 if ( 097 e instanceof InvalidProtocolBufferException 098 && ProtobufUtil.isEOF((InvalidProtocolBufferException) e) 099 ) { 100 LOG.info("EOF while reading WALKey, originalPosition={}, currentPosition={}, error={}", 101 originalPosition, getPositionQuietly(), e.toString()); 102 return KEY_EOF_AND_RESET; 103 } else { 104 LOG.warn("Failed to read wal key length", e); 105 return KEY_ERROR_AND_RESET; 106 } 107 } 108 if (size < 0) { 109 LOG.warn("Negative pb message size read: {}, malformed WAL file?", size); 110 return KEY_ERROR_AND_RESET; 111 } 112 int available; 113 try { 114 available = delegatingInput.available(); 115 } catch (IOException e) { 116 LOG.warn("Failed to get available bytes", e); 117 return KEY_ERROR_AND_RESET; 118 } 119 if (available > 0 && available < size) { 120 LOG.info("Available stream not enough for edit, available={}, entry size={} at offset={}", 121 available, size, getPositionQuietly()); 122 return KEY_EOF_AND_RESET; 123 } 124 WALProtos.WALKey walKey; 125 try { 126 if (available > 0) { 127 walKey = WALProtos.WALKey.parseFrom(ByteStreams.limit(delegatingInput, size)); 128 } else { 129 byte[] content = new byte[size]; 130 ByteStreams.readFully(delegatingInput, content); 131 walKey = WALProtos.WALKey.parseFrom(content); 132 } 133 } catch (IOException e) { 134 e = unwrapIPBE(e); 135 if ( 136 e instanceof EOFException || (e instanceof InvalidProtocolBufferException 137 && ProtobufUtil.isEOF((InvalidProtocolBufferException) e)) 138 ) { 139 LOG.info("EOF while reading WALKey, originalPosition={}, currentPosition={}, error={}", 140 originalPosition, getPositionQuietly(), e.toString()); 141 return KEY_EOF_AND_RESET; 142 } else { 143 boolean isWALTrailer; 144 try { 145 isWALTrailer = isWALTrailer(originalPosition); 146 } catch (IOException ioe) { 147 LOG.warn("Error while testing whether this is a partial WAL trailer, originalPosition={}," 148 + " currentPosition={}", originalPosition, getPositionQuietly(), e); 149 return KEY_ERROR_AND_RESET; 150 } 151 if (isWALTrailer) { 152 LOG.info("Reached partial WAL Trailer(EOF) while reading WALKey, originalPosition={}," 153 + " currentPosition={}", originalPosition, getPositionQuietly(), e); 154 return KEY_EOF_AND_RESET; 155 } else { 156 // for all other type of IPBEs or IOEs, it means the WAL key is broken 157 LOG.warn("Error while reading WALKey, originalPosition={}, currentPosition={}", 158 originalPosition, getPositionQuietly(), e); 159 return KEY_ERROR_AND_RESET; 160 } 161 } 162 } 163 Entry entry = new Entry(); 164 try { 165 entry.getKey().readFieldsFromPb(walKey, byteStringUncompressor); 166 } catch (IOException e) { 167 LOG.warn("Failed to read wal key fields from pb message", e); 168 return KEY_ERROR_AND_RESET; 169 } 170 return new ReadWALKeyResult(State.NORMAL, entry, 171 walKey.hasFollowingKvCount() ? walKey.getFollowingKvCount() : 0); 172 } 173 174 private Result editEof() { 175 return hasCompression 176 ? State.EOF_AND_RESET_COMPRESSION.getResult() 177 : State.EOF_AND_RESET.getResult(); 178 } 179 180 private Result editError() { 181 return hasCompression 182 ? State.ERROR_AND_RESET_COMPRESSION.getResult() 183 : State.ERROR_AND_RESET.getResult(); 184 } 185 186 private Result readWALEdit(Entry entry, int followingKvCount) { 187 long posBefore; 188 try { 189 posBefore = inputStream.getPos(); 190 } catch (IOException e) { 191 LOG.warn("failed to get position", e); 192 return State.ERROR_AND_RESET.getResult(); 193 } 194 if (followingKvCount == 0) { 195 LOG.trace("WALKey has no KVs that follow it; trying the next one. current offset={}", 196 posBefore); 197 return new Result(State.NORMAL, entry, posBefore); 198 } 199 int actualCells; 200 try { 201 actualCells = entry.getEdit().readFromCells(cellDecoder, followingKvCount); 202 } catch (Exception e) { 203 String message = " while reading " + followingKvCount + " WAL KVs; started reading at " 204 + posBefore + " and read up to " + getPositionQuietly(); 205 IOException realEofEx = extractHiddenEof(e); 206 if (realEofEx != null) { 207 LOG.warn("EOF " + message, realEofEx); 208 return editEof(); 209 } else { 210 LOG.warn("Error " + message, e); 211 return editError(); 212 } 213 } 214 if (actualCells != followingKvCount) { 215 LOG.warn("Only read {} cells, expected {}; started reading at {} and read up to {}", 216 actualCells, followingKvCount, posBefore, getPositionQuietly()); 217 return editEof(); 218 } 219 long posAfter; 220 try { 221 posAfter = inputStream.getPos(); 222 } catch (IOException e) { 223 LOG.warn("failed to get position", e); 224 return editError(); 225 } 226 if (trailerPresent && posAfter > this.walEditsStopOffset) { 227 LOG.error("Read WALTrailer while reading WALEdits. wal: {}, inputStream.getPos(): {}," 228 + " walEditsStopOffset: {}", path, posAfter, walEditsStopOffset); 229 return editEof(); 230 } 231 return new Result(State.NORMAL, entry, posAfter); 232 } 233 234 @Override 235 public Result next(long limit) { 236 long originalPosition; 237 try { 238 originalPosition = inputStream.getPos(); 239 } catch (IOException e) { 240 LOG.warn("failed to get position", e); 241 return State.EOF_AND_RESET.getResult(); 242 } 243 if (reachWALEditsStopOffset(originalPosition)) { 244 return State.EOF_WITH_TRAILER.getResult(); 245 } 246 if (limit < 0) { 247 // should be closed WAL file, set to no limit, i.e, just use the original inputStream 248 delegatingInput.setDelegate(inputStream); 249 } else if (limit <= originalPosition) { 250 // no data available, just return EOF 251 return State.EOF_AND_RESET.getResult(); 252 } else { 253 // calculate the remaining bytes we can read and set 254 delegatingInput.setDelegate(ByteStreams.limit(inputStream, limit - originalPosition)); 255 } 256 ReadWALKeyResult readKeyResult = readWALKey(originalPosition); 257 if (readKeyResult.state != State.NORMAL) { 258 return readKeyResult.state.getResult(); 259 } 260 return readWALEdit(readKeyResult.entry, readKeyResult.followingKvCount); 261 } 262 263 private void skipHeader(FSDataInputStream stream) throws IOException { 264 stream.seek(PB_WAL_MAGIC.length); 265 int headerLength = StreamUtils.readRawVarint32(stream); 266 stream.seek(stream.getPos() + headerLength); 267 } 268 269 @Override 270 public void resetTo(long position, boolean resetCompression) throws IOException { 271 close(); 272 Pair<FSDataInputStream, FileStatus> pair = open(); 273 boolean resetSucceed = false; 274 try { 275 if (!trailerPresent) { 276 // try read trailer this time 277 readTrailer(pair.getFirst(), pair.getSecond()); 278 } 279 inputStream = pair.getFirst(); 280 delegatingInput.setDelegate(inputStream); 281 if (position < 0) { 282 // read from the beginning 283 if (compressionCtx != null) { 284 compressionCtx.clear(); 285 } 286 skipHeader(inputStream); 287 } else if (resetCompression && compressionCtx != null) { 288 // clear compressCtx and skip to the expected position, to fill up the dictionary 289 compressionCtx.clear(); 290 skipHeader(inputStream); 291 if (position != inputStream.getPos()) { 292 skipTo(position); 293 } 294 } else { 295 // just seek to the expected position 296 inputStream.seek(position); 297 } 298 resetSucceed = true; 299 } finally { 300 if (!resetSucceed) { 301 // close the input stream to avoid resource leak 302 close(); 303 } 304 } 305 } 306 307 @Override 308 protected InputStream getCellCodecInputStream(FSDataInputStream stream) { 309 delegatingInput = new DelegatingInputStream(stream); 310 return delegatingInput; 311 } 312 313 @Override 314 protected void skipTo(long position) throws IOException { 315 for (;;) { 316 Result result = next(-1); 317 if (result.getState() != State.NORMAL) { 318 throw new IOException("Can not skip to the given position " + position + ", stopped at " 319 + result.getEntryEndPos() + " which is still before the give position"); 320 } 321 if (result.getEntryEndPos() == position) { 322 return; 323 } 324 if (result.getEntryEndPos() > position) { 325 throw new IOException("Can not skip to the given position " + position + ", stopped at " 326 + result.getEntryEndPos() + " which is already beyond the give position, malformed WAL?"); 327 } 328 } 329 } 330}