001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import java.io.Closeable; 021import java.io.FileNotFoundException; 022import java.io.IOException; 023import java.util.OptionalLong; 024import java.util.concurrent.PriorityBlockingQueue; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.fs.FileStatus; 027import org.apache.hadoop.fs.FileSystem; 028import org.apache.hadoop.fs.Path; 029import org.apache.hadoop.hbase.ServerName; 030import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader; 031import org.apache.hadoop.hbase.util.CancelableProgressable; 032import org.apache.hadoop.hbase.util.CommonFSUtils; 033import org.apache.hadoop.hbase.util.LeaseNotRecoveredException; 034import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils; 035import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 036import org.apache.hadoop.hbase.wal.WAL.Entry; 037import org.apache.hadoop.hbase.wal.WAL.Reader; 038import org.apache.hadoop.hbase.wal.WALFactory; 039import org.apache.hadoop.ipc.RemoteException; 040import org.apache.yetus.audience.InterfaceAudience; 041import org.apache.yetus.audience.InterfaceStability; 042import org.slf4j.Logger; 043import org.slf4j.LoggerFactory; 044 045/** 046 * Streaming access to WAL entries. This class is given a queue of WAL {@link Path}, and continually 047 * iterates through all the WAL {@link Entry} in the queue. When it's done reading from a Path, it 048 * dequeues it and starts reading from the next. 049 */ 050@InterfaceAudience.Private 051@InterfaceStability.Evolving 052class WALEntryStream implements Closeable { 053 private static final Logger LOG = LoggerFactory.getLogger(WALEntryStream.class); 054 055 private Reader reader; 056 private Path currentPath; 057 // cache of next entry for hasNext() 058 private Entry currentEntry; 059 // position for the current entry. As now we support peek, which means that the upper layer may 060 // choose to return before reading the current entry, so it is not safe to return the value below 061 // in getPosition. 062 private long currentPositionOfEntry = 0; 063 // position after reading current entry 064 private long currentPositionOfReader = 0; 065 private final ReplicationSourceLogQueue logQueue; 066 private final String walGroupId; 067 private final FileSystem fs; 068 private final Configuration conf; 069 private final WALFileLengthProvider walFileLengthProvider; 070 // which region server the WALs belong to 071 private final ServerName serverName; 072 private final MetricsSource metrics; 073 074 /** 075 * Create an entry stream over the given queue at the given start position 076 * @param logQueue the queue of WAL paths 077 * @param conf the {@link Configuration} to use to create {@link Reader} for this 078 * stream 079 * @param startPosition the position in the first WAL to start reading at 080 * @param walFileLengthProvider provides the length of the WAL file 081 * @param serverName the server name which all WALs belong to 082 * @param metrics the replication metrics 083 * @throws IOException throw IO exception from stream 084 */ 085 public WALEntryStream(ReplicationSourceLogQueue logQueue, Configuration conf, long startPosition, 086 WALFileLengthProvider walFileLengthProvider, ServerName serverName, MetricsSource metrics, 087 String walGroupId) throws IOException { 088 this.logQueue = logQueue; 089 this.fs = CommonFSUtils.getWALFileSystem(conf); 090 this.conf = conf; 091 this.currentPositionOfEntry = startPosition; 092 this.walFileLengthProvider = walFileLengthProvider; 093 this.serverName = serverName; 094 this.metrics = metrics; 095 this.walGroupId = walGroupId; 096 } 097 098 /** Returns true if there is another WAL {@link Entry} */ 099 public boolean hasNext() throws IOException { 100 if (currentEntry == null) { 101 tryAdvanceEntry(); 102 } 103 return currentEntry != null; 104 } 105 106 /** 107 * Returns the next WAL entry in this stream but does not advance. 108 */ 109 public Entry peek() throws IOException { 110 return hasNext() ? currentEntry : null; 111 } 112 113 /** 114 * Returns the next WAL entry in this stream and advance the stream. 115 */ 116 public Entry next() throws IOException { 117 Entry save = peek(); 118 currentPositionOfEntry = currentPositionOfReader; 119 currentEntry = null; 120 return save; 121 } 122 123 /** 124 * {@inheritDoc} 125 */ 126 @Override 127 public void close() throws IOException { 128 closeReader(); 129 } 130 131 /** Returns the position of the last Entry returned by next() */ 132 public long getPosition() { 133 return currentPositionOfEntry; 134 } 135 136 /** Returns the {@link Path} of the current WAL */ 137 public Path getCurrentPath() { 138 return currentPath; 139 } 140 141 private String getCurrentPathStat() { 142 StringBuilder sb = new StringBuilder(); 143 if (currentPath != null) { 144 sb.append("currently replicating from: ").append(currentPath).append(" at position: ") 145 .append(currentPositionOfEntry).append("\n"); 146 } else { 147 sb.append("no replication ongoing, waiting for new log"); 148 } 149 return sb.toString(); 150 } 151 152 /** 153 * Should be called if the stream is to be reused (i.e. used again after hasNext() has returned 154 * false) 155 */ 156 public void reset() throws IOException { 157 if (reader != null && currentPath != null) { 158 resetReader(); 159 } 160 } 161 162 private void setPosition(long position) { 163 currentPositionOfEntry = position; 164 } 165 166 private void setCurrentPath(Path path) { 167 this.currentPath = path; 168 } 169 170 private void tryAdvanceEntry() throws IOException { 171 if (checkReader()) { 172 boolean beingWritten = readNextEntryAndRecordReaderPosition(); 173 LOG.trace("Reading WAL {}; currently open for write={}", this.currentPath, beingWritten); 174 if (currentEntry == null && !beingWritten) { 175 // no more entries in this log file, and the file is already closed, i.e, rolled 176 // Before dequeueing, we should always get one more attempt at reading. 177 // This is in case more entries came in after we opened the reader, and the log is rolled 178 // while we were reading. See HBASE-6758 179 resetReader(); 180 readNextEntryAndRecordReaderPosition(); 181 if (currentEntry == null) { 182 if (checkAllBytesParsed()) { // now we're certain we're done with this log file 183 dequeueCurrentLog(); 184 if (openNextLog()) { 185 readNextEntryAndRecordReaderPosition(); 186 } 187 } 188 } 189 } 190 // if currentEntry != null then just return 191 // if currentEntry == null but the file is still being written, then we should not switch to 192 // the next log either, just return here and try next time to see if there are more entries in 193 // the current file 194 } 195 // do nothing if we don't have a WAL Reader (e.g. if there's no logs in queue) 196 } 197 198 // HBASE-15984 check to see we have in fact parsed all data in a cleanly closed file 199 private boolean checkAllBytesParsed() throws IOException { 200 // -1 means the wal wasn't closed cleanly. 201 final long trailerSize = currentTrailerSize(); 202 FileStatus stat = null; 203 try { 204 stat = fs.getFileStatus(this.currentPath); 205 } catch (IOException exception) { 206 LOG.warn("Couldn't get file length information about log {}, it {} closed cleanly {}", 207 currentPath, trailerSize < 0 ? "was not" : "was", getCurrentPathStat()); 208 metrics.incrUnknownFileLengthForClosedWAL(); 209 } 210 // Here we use currentPositionOfReader instead of currentPositionOfEntry. 211 // We only call this method when currentEntry is null so usually they are the same, but there 212 // are two exceptions. One is we have nothing in the file but only a header, in this way 213 // the currentPositionOfEntry will always be 0 since we have no change to update it. The other 214 // is that we reach the end of file, then currentPositionOfEntry will point to the tail of the 215 // last valid entry, and the currentPositionOfReader will usually point to the end of the file. 216 if (stat != null) { 217 if (trailerSize < 0) { 218 if (currentPositionOfReader < stat.getLen()) { 219 final long skippedBytes = stat.getLen() - currentPositionOfReader; 220 // See the commits in HBASE-25924/HBASE-25932 for context. 221 LOG.warn("Reached the end of WAL {}. It was not closed cleanly," 222 + " so we did not parse {} bytes of data.", currentPath, skippedBytes); 223 metrics.incrUncleanlyClosedWALs(); 224 metrics.incrBytesSkippedInUncleanlyClosedWALs(skippedBytes); 225 } 226 } else if (currentPositionOfReader + trailerSize < stat.getLen()) { 227 LOG.warn( 228 "Processing end of WAL {} at position {}, which is too far away from" 229 + " reported file length {}. Restarting WAL reading (see HBASE-15983 for details). {}", 230 currentPath, currentPositionOfReader, stat.getLen(), getCurrentPathStat()); 231 setPosition(0); 232 resetReader(); 233 metrics.incrRestartedWALReading(); 234 metrics.incrRepeatedFileBytes(currentPositionOfReader); 235 return false; 236 } 237 } 238 if (LOG.isTraceEnabled()) { 239 LOG.trace("Reached the end of " + this.currentPath + " and length of the file is " 240 + (stat == null ? "N/A" : stat.getLen())); 241 } 242 metrics.incrCompletedWAL(); 243 return true; 244 } 245 246 private void dequeueCurrentLog() throws IOException { 247 LOG.debug("EOF, closing {}", currentPath); 248 closeReader(); 249 logQueue.remove(walGroupId); 250 setCurrentPath(null); 251 setPosition(0); 252 } 253 254 /** 255 * Returns whether the file is opened for writing. 256 */ 257 private boolean readNextEntryAndRecordReaderPosition() throws IOException { 258 Entry readEntry = reader.next(); 259 long readerPos = reader.getPosition(); 260 OptionalLong fileLength = walFileLengthProvider.getLogFileSizeIfBeingWritten(currentPath); 261 if (fileLength.isPresent() && readerPos > fileLength.getAsLong()) { 262 // See HBASE-14004, for AsyncFSWAL which uses fan-out, it is possible that we read uncommitted 263 // data, so we need to make sure that we do not read beyond the committed file length. 264 if (LOG.isDebugEnabled()) { 265 LOG.debug("The provider tells us the valid length for " + currentPath + " is " 266 + fileLength.getAsLong() + ", but we have advanced to " + readerPos); 267 } 268 resetReader(); 269 return true; 270 } 271 if (readEntry != null) { 272 LOG.trace("reading entry: {} ", readEntry); 273 metrics.incrLogEditsRead(); 274 metrics.incrLogReadInBytes(readerPos - currentPositionOfEntry); 275 } 276 currentEntry = readEntry; // could be null 277 this.currentPositionOfReader = readerPos; 278 return fileLength.isPresent(); 279 } 280 281 private void closeReader() throws IOException { 282 if (reader != null) { 283 reader.close(); 284 reader = null; 285 } 286 } 287 288 // if we don't have a reader, open a reader on the next log 289 private boolean checkReader() throws IOException { 290 if (reader == null) { 291 return openNextLog(); 292 } 293 return true; 294 } 295 296 // open a reader on the next log in queue 297 private boolean openNextLog() throws IOException { 298 PriorityBlockingQueue<Path> queue = logQueue.getQueue(walGroupId); 299 Path nextPath = queue.peek(); 300 if (nextPath != null) { 301 openReader(nextPath); 302 if (reader != null) { 303 return true; 304 } 305 } else { 306 // no more files in queue, this could happen for recovered queue, or for a wal group of a sync 307 // replication peer which has already been transited to DA or S. 308 setCurrentPath(null); 309 } 310 return false; 311 } 312 313 private void handleFileNotFound(Path path, FileNotFoundException fnfe) throws IOException { 314 // If the log was archived, continue reading from there 315 Path archivedLog = AbstractFSWALProvider.findArchivedLog(path, conf); 316 // archivedLog can be null if unable to locate in archiveDir. 317 if (archivedLog != null) { 318 openReader(archivedLog); 319 } else { 320 throw fnfe; 321 } 322 } 323 324 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "DCN_NULLPOINTER_EXCEPTION", 325 justification = "HDFS-4380") 326 private void openReader(Path path) throws IOException { 327 try { 328 // Detect if this is a new file, if so get a new reader else 329 // reset the current reader so that we see the new data 330 if (reader == null || !getCurrentPath().equals(path)) { 331 closeReader(); 332 reader = WALFactory.createReader(fs, path, conf); 333 seek(); 334 setCurrentPath(path); 335 } else { 336 resetReader(); 337 } 338 } catch (FileNotFoundException fnfe) { 339 handleFileNotFound(path, fnfe); 340 } catch (RemoteException re) { 341 IOException ioe = re.unwrapRemoteException(FileNotFoundException.class); 342 if (!(ioe instanceof FileNotFoundException)) { 343 throw ioe; 344 } 345 handleFileNotFound(path, (FileNotFoundException) ioe); 346 } catch (LeaseNotRecoveredException lnre) { 347 // HBASE-15019 the WAL was not closed due to some hiccup. 348 LOG.warn("Try to recover the WAL lease " + path, lnre); 349 recoverLease(conf, path); 350 reader = null; 351 } catch (NullPointerException npe) { 352 // Workaround for race condition in HDFS-4380 353 // which throws a NPE if we open a file before any data node has the most recent block 354 // Just sleep and retry. Will require re-reading compressed WALs for compressionContext. 355 LOG.warn("Got NPE opening reader, will retry."); 356 reader = null; 357 } 358 } 359 360 // For HBASE-15019 361 private void recoverLease(final Configuration conf, final Path path) { 362 try { 363 final FileSystem dfs = CommonFSUtils.getWALFileSystem(conf); 364 RecoverLeaseFSUtils.recoverFileLease(dfs, path, conf, new CancelableProgressable() { 365 @Override 366 public boolean progress() { 367 LOG.debug("recover WAL lease: " + path); 368 return true; 369 } 370 }); 371 } catch (IOException e) { 372 LOG.warn("unable to recover lease for WAL: " + path, e); 373 } 374 } 375 376 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "DCN_NULLPOINTER_EXCEPTION", 377 justification = "HDFS-4380") 378 private void resetReader() throws IOException { 379 try { 380 currentEntry = null; 381 reader.reset(); 382 seek(); 383 } catch (FileNotFoundException fnfe) { 384 // If the log was archived, continue reading from there 385 Path archivedLog = AbstractFSWALProvider.findArchivedLog(currentPath, conf); 386 // archivedLog can be null if unable to locate in archiveDir. 387 if (archivedLog != null) { 388 openReader(archivedLog); 389 } else { 390 throw fnfe; 391 } 392 } catch (NullPointerException npe) { 393 throw new IOException("NPE resetting reader, likely HDFS-4380", npe); 394 } 395 } 396 397 private void seek() throws IOException { 398 if (currentPositionOfEntry != 0) { 399 reader.seek(currentPositionOfEntry); 400 } 401 } 402 403 private long currentTrailerSize() { 404 long size = -1L; 405 if (reader instanceof ProtobufLogReader) { 406 final ProtobufLogReader pblr = (ProtobufLogReader) reader; 407 size = pblr.trailerSize(); 408 } 409 return size; 410 } 411}