001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.SLOW_SYNC; 021 022import java.io.IOException; 023import java.io.OutputStream; 024import java.util.Arrays; 025import java.util.List; 026import java.util.concurrent.BlockingQueue; 027import java.util.concurrent.CompletableFuture; 028import java.util.concurrent.LinkedBlockingQueue; 029import java.util.concurrent.atomic.AtomicInteger; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.fs.FSDataOutputStream; 032import org.apache.hadoop.fs.FileSystem; 033import org.apache.hadoop.fs.Path; 034import org.apache.hadoop.hbase.Abortable; 035import org.apache.hadoop.hbase.HConstants; 036import org.apache.hadoop.hbase.util.Bytes; 037import org.apache.hadoop.hbase.util.ClassSize; 038import org.apache.hadoop.hbase.util.CommonFSUtils; 039import org.apache.hadoop.hbase.wal.FSHLogProvider; 040import org.apache.hadoop.hbase.wal.WALProvider.Writer; 041import org.apache.hadoop.hdfs.DFSOutputStream; 042import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; 043import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 044import org.apache.yetus.audience.InterfaceAudience; 045import org.slf4j.Logger; 046import org.slf4j.LoggerFactory; 047 048/** 049 * The original implementation of FSWAL. 050 */ 051@InterfaceAudience.Private 052public class FSHLog extends AbstractFSWAL<Writer> { 053 // IMPLEMENTATION NOTES: 054 // 055 // At the core is a ring buffer. Our ring buffer is the LMAX Disruptor. It tries to 056 // minimize synchronizations and volatile writes when multiple contending threads as is the case 057 // here appending and syncing on a single WAL. The Disruptor is configured to handle multiple 058 // producers but it has one consumer only (the producers in HBase are IPC Handlers calling append 059 // and then sync). The single consumer/writer pulls the appends and syncs off the ring buffer. 060 // When a handler calls sync, it is given back a future. The producer 'blocks' on the future so 061 // it does not return until the sync completes. The future is passed over the ring buffer from 062 // the producer/handler to the consumer thread where it does its best to batch up the producer 063 // syncs so one WAL sync actually spans multiple producer sync invocations. How well the 064 // batching works depends on the write rate; i.e. we tend to batch more in times of 065 // high writes/syncs. 066 // 067 // Calls to append now also wait until the append has been done on the consumer side of the 068 // disruptor. We used to not wait but it makes the implementation easier to grok if we have 069 // the region edit/sequence id after the append returns. 070 // 071 // TODO: Handlers need to coordinate appending AND syncing. Can we have the threads contend 072 // once only? Probably hard given syncs take way longer than an append. 073 // 074 // The consumer threads pass the syncs off to multiple syncing threads in a round robin fashion 075 // to ensure we keep up back-to-back FS sync calls (FS sync calls are the long poll writing the 076 // WAL). The consumer thread passes the futures to the sync threads for it to complete 077 // the futures when done. 078 // 079 // The 'sequence' in the below is the sequence of the append/sync on the ringbuffer. It 080 // acts as a sort-of transaction id. It is always incrementing. 081 // 082 // The RingBufferEventHandler class hosts the ring buffer consuming code. The threads that 083 // do the actual FS sync are implementations of SyncRunner. SafePointZigZagLatch is a 084 // synchronization class used to halt the consumer at a safe point -- just after all outstanding 085 // syncs and appends have completed -- so the log roller can swap the WAL out under it. 086 // 087 // We use ring buffer sequence as txid of FSWALEntry and SyncFuture. 088 private static final Logger LOG = LoggerFactory.getLogger(FSHLog.class); 089 090 private static final String TOLERABLE_LOW_REPLICATION = 091 "hbase.regionserver.hlog.tolerable.lowreplication"; 092 private static final String LOW_REPLICATION_ROLL_LIMIT = 093 "hbase.regionserver.hlog.lowreplication.rolllimit"; 094 private static final int DEFAULT_LOW_REPLICATION_ROLL_LIMIT = 5; 095 private static final String SYNCER_COUNT = "hbase.regionserver.hlog.syncer.count"; 096 private static final int DEFAULT_SYNCER_COUNT = 5; 097 private static final String MAX_BATCH_COUNT = "hbase.regionserver.wal.sync.batch.count"; 098 private static final int DEFAULT_MAX_BATCH_COUNT = 200; 099 100 private static final String FSHLOG_WAIT_ON_SHUTDOWN_IN_SECONDS = 101 "hbase.wal.fshlog.wait.on.shutdown.seconds"; 102 private static final int DEFAULT_FSHLOG_WAIT_ON_SHUTDOWN_IN_SECONDS = 5; 103 104 private static final IOException WITER_REPLACED_EXCEPTION = 105 new IOException("Writer was replaced!"); 106 private static final IOException WITER_BROKEN_EXCEPTION = new IOException("Wirter was broken!"); 107 private static final IOException WAL_CLOSE_EXCEPTION = new IOException("WAL was closed!"); 108 109 /** 110 * FSDataOutputStream associated with the current SequenceFile.writer 111 */ 112 private FSDataOutputStream hdfs_out; 113 114 // All about log rolling if not enough replicas outstanding. 115 116 // Minimum tolerable replicas, if the actual value is lower than it, rollWriter will be triggered 117 private final int minTolerableReplication; 118 119 // If live datanode count is lower than the default replicas value, 120 // RollWriter will be triggered in each sync(So the RollWriter will be 121 // triggered one by one in a short time). Using it as a workaround to slow 122 // down the roll frequency triggered by checkLowReplication(). 123 private final AtomicInteger consecutiveLogRolls = new AtomicInteger(0); 124 125 private final int lowReplicationRollLimit; 126 127 // If consecutiveLogRolls is larger than lowReplicationRollLimit, 128 // then disable the rolling in checkLowReplication(). 129 // Enable it if the replications recover. 130 private volatile boolean lowReplicationRollEnabled = true; 131 132 private final int syncerCount; 133 private final int maxSyncRequestCount; 134 135 /** 136 * Which syncrunner to use next. 137 */ 138 private int syncRunnerIndex = 0; 139 140 private SyncRunner[] syncRunners = null; 141 142 /** 143 * Constructor. 144 * @param fs filesystem handle 145 * @param root path for stored and archived wals 146 * @param logDir dir where wals are stored 147 * @param conf configuration to use 148 */ 149 public FSHLog(final FileSystem fs, final Path root, final String logDir, final Configuration conf) 150 throws IOException { 151 this(fs, root, logDir, HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, null); 152 } 153 154 public FSHLog(final FileSystem fs, Abortable abortable, final Path root, final String logDir, 155 final Configuration conf) throws IOException { 156 this(fs, abortable, root, logDir, HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, 157 null, null, null); 158 } 159 160 public FSHLog(final FileSystem fs, final Path rootDir, final String logDir, 161 final String archiveDir, final Configuration conf, final List<WALActionsListener> listeners, 162 final boolean failIfWALExists, final String prefix, final String suffix) throws IOException { 163 this(fs, null, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix, 164 null, null); 165 } 166 167 /** 168 * Create an edit log at the given <code>dir</code> location. You should never have to load an 169 * existing log. If there is a log at startup, it should have already been processed and deleted 170 * by the time the WAL object is started up. 171 * @param fs filesystem handle 172 * @param abortable Abortable - the server here 173 * @param rootDir path to where logs and oldlogs 174 * @param logDir dir where wals are stored 175 * @param archiveDir dir where wals are archived 176 * @param conf configuration to use 177 * @param listeners Listeners on WAL events. Listeners passed here will be registered before 178 * we do anything else; e.g. the Constructor {@link #rollWriter()}. 179 * @param failIfWALExists If true IOException will be thrown if files related to this wal already 180 * exist. 181 * @param prefix should always be hostname and port in distributed env and it will be URL 182 * encoded before being used. If prefix is null, "wal" will be used 183 * @param suffix will be url encoded. null is treated as empty. non-empty must start with 184 * {@link org.apache.hadoop.hbase.wal.AbstractFSWALProvider#WAL_FILE_NAME_DELIMITER} 185 */ 186 public FSHLog(final FileSystem fs, final Abortable abortable, final Path rootDir, 187 final String logDir, final String archiveDir, final Configuration conf, 188 final List<WALActionsListener> listeners, final boolean failIfWALExists, final String prefix, 189 final String suffix, FileSystem remoteFs, Path remoteWALDir) throws IOException { 190 super(fs, abortable, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, 191 suffix, remoteFs, remoteWALDir); 192 this.minTolerableReplication = 193 conf.getInt(TOLERABLE_LOW_REPLICATION, CommonFSUtils.getDefaultReplication(fs, this.walDir)); 194 this.lowReplicationRollLimit = 195 conf.getInt(LOW_REPLICATION_ROLL_LIMIT, DEFAULT_LOW_REPLICATION_ROLL_LIMIT); 196 197 // Advance the ring buffer sequence so that it starts from 1 instead of 0, 198 // because SyncFuture.NOT_DONE = 0. 199 200 this.syncerCount = conf.getInt(SYNCER_COUNT, DEFAULT_SYNCER_COUNT); 201 this.maxSyncRequestCount = conf.getInt(MAX_BATCH_COUNT, 202 conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, DEFAULT_MAX_BATCH_COUNT)); 203 204 this.createSingleThreadPoolConsumeExecutor("FSHLog", rootDir, prefix); 205 206 this.setWaitOnShutdownInSeconds( 207 conf.getInt(FSHLOG_WAIT_ON_SHUTDOWN_IN_SECONDS, DEFAULT_FSHLOG_WAIT_ON_SHUTDOWN_IN_SECONDS), 208 FSHLOG_WAIT_ON_SHUTDOWN_IN_SECONDS); 209 } 210 211 @Override 212 public void init() throws IOException { 213 super.init(); 214 this.createSyncRunnersAndStart(); 215 } 216 217 private void createSyncRunnersAndStart() { 218 this.syncRunnerIndex = 0; 219 this.syncRunners = new SyncRunner[syncerCount]; 220 for (int i = 0; i < syncerCount; i++) { 221 this.syncRunners[i] = new SyncRunner("sync." + i, maxSyncRequestCount); 222 this.syncRunners[i].start(); 223 } 224 } 225 226 /** 227 * Currently, we need to expose the writer's OutputStream to tests so that they can manipulate the 228 * default behavior (such as setting the maxRecoveryErrorCount value). This is done using 229 * reflection on the underlying HDFS OutputStream. NOTE: This could be removed once Hadoop1 230 * support is removed. 231 * @return null if underlying stream is not ready. 232 */ 233 OutputStream getOutputStream() { 234 FSDataOutputStream fsdos = this.hdfs_out; 235 return fsdos != null ? fsdos.getWrappedStream() : null; 236 } 237 238 /** 239 * Run a sync after opening to set up the pipeline. 240 */ 241 private void preemptiveSync(final ProtobufLogWriter nextWriter) { 242 long startTimeNanos = System.nanoTime(); 243 try { 244 nextWriter.sync(useHsync); 245 postSync(System.nanoTime() - startTimeNanos, 0); 246 } catch (IOException e) { 247 // optimization failed, no need to abort here. 248 LOG.warn("pre-sync failed but an optimization so keep going", e); 249 } 250 } 251 252 /** 253 * This method allows subclasses to inject different writers without having to extend other 254 * methods like rollWriter(). 255 * @return Writer instance 256 */ 257 @Override 258 protected Writer createWriterInstance(FileSystem fs, Path path) throws IOException { 259 Writer writer = FSHLogProvider.createWriter(conf, fs, path, false, this.blocksize); 260 if (writer instanceof ProtobufLogWriter) { 261 preemptiveSync((ProtobufLogWriter) writer); 262 } 263 return writer; 264 } 265 266 @Override 267 protected void doAppend(Writer writer, FSWALEntry entry) throws IOException { 268 writer.append(entry); 269 } 270 271 @Override 272 protected void onWriterReplaced(Writer nextWriter) { 273 if (nextWriter != null && nextWriter instanceof ProtobufLogWriter) { 274 this.hdfs_out = ((ProtobufLogWriter) nextWriter).getStream(); 275 } else { 276 this.hdfs_out = null; 277 } 278 this.createSyncRunnersAndStart(); 279 } 280 281 @Override 282 protected void doCleanUpResources() { 283 this.shutDownSyncRunners(); 284 }; 285 286 private void shutDownSyncRunners() { 287 SyncRunner[] syncRunnersToUse = this.syncRunners; 288 if (syncRunnersToUse != null) { 289 for (SyncRunner syncRunner : syncRunnersToUse) { 290 syncRunner.shutDown(); 291 } 292 } 293 this.syncRunners = null; 294 } 295 296 @Override 297 protected CompletableFuture<Long> doWriterSync(Writer writer, boolean shouldUseHSync, 298 long txidWhenSync) { 299 CompletableFuture<Long> future = new CompletableFuture<>(); 300 SyncRequest syncRequest = new SyncRequest(writer, shouldUseHSync, txidWhenSync, future); 301 this.offerSyncRequest(syncRequest); 302 return future; 303 } 304 305 private void offerSyncRequest(SyncRequest syncRequest) { 306 for (int i = 0; i < this.syncRunners.length; i++) { 307 this.syncRunnerIndex = (this.syncRunnerIndex + 1) % this.syncRunners.length; 308 if (this.syncRunners[this.syncRunnerIndex].offer(syncRequest)) { 309 return; 310 } 311 } 312 syncRequest.completableFuture 313 .completeExceptionally(new IOException("There is no available syncRunner.")); 314 } 315 316 static class SyncRequest { 317 private final Writer writer; 318 private final boolean shouldUseHSync; 319 private final long sequenceWhenSync; 320 private final CompletableFuture<Long> completableFuture; 321 322 public SyncRequest(Writer writer, boolean shouldUseHSync, long txidWhenSync, 323 CompletableFuture<Long> completableFuture) { 324 this.writer = writer; 325 this.shouldUseHSync = shouldUseHSync; 326 this.sequenceWhenSync = txidWhenSync; 327 this.completableFuture = completableFuture; 328 } 329 330 } 331 332 /** 333 * Thread to runs the hdfs sync call. This call takes a while to complete. This is the longest 334 * pole adding edits to the WAL and this must complete to be sure all edits persisted. We run 335 * multiple threads sync'ng rather than one that just syncs in series so we have better latencies; 336 * otherwise, an edit that arrived just after a sync started, might have to wait almost the length 337 * of two sync invocations before it is marked done. 338 * <p> 339 * When the sync completes, it marks all the passed in futures done. On the other end of the sync 340 * future is a blocked thread, usually a regionserver Handler. There may be more than one future 341 * passed in the case where a few threads arrive at about the same time and all invoke 'sync'. In 342 * this case we'll batch up the invocations and run one filesystem sync only for a batch of 343 * Handler sync invocations. Do not confuse these Handler SyncFutures with the futures an 344 * ExecutorService returns when you call submit. We have no use for these in this model. These 345 * SyncFutures are 'artificial', something to hold the Handler until the filesystem sync 346 * completes. 347 */ 348 private class SyncRunner extends Thread { 349 // Keep around last exception thrown. Clear on successful sync. 350 private final BlockingQueue<SyncRequest> syncRequests; 351 private volatile boolean shutDown = false; 352 353 SyncRunner(final String name, final int maxHandlersCount) { 354 super(name); 355 // LinkedBlockingQueue because of 356 // http://www.javacodegeeks.com/2010/09/java-best-practices-queue-battle-and.html 357 // Could use other blockingqueues here or concurrent queues. 358 // 359 // We could let the capacity be 'open' but bound it so we get alerted in pathological case 360 // where we cannot sync and we have a bunch of threads all backed up waiting on their syncs 361 // to come in. LinkedBlockingQueue actually shrinks when you remove elements so Q should 362 // stay neat and tidy in usual case. Let the max size be three times the maximum handlers. 363 // The passed in maxHandlerCount is the user-level handlers which is what we put up most of 364 // but HBase has other handlers running too -- opening region handlers which want to write 365 // the meta table when succesful (i.e. sync), closing handlers -- etc. These are usually 366 // much fewer in number than the user-space handlers so Q-size should be user handlers plus 367 // some space for these other handlers. Lets multiply by 3 for good-measure. 368 this.syncRequests = new LinkedBlockingQueue<>(maxHandlersCount * 3); 369 } 370 371 boolean offer(SyncRequest syncRequest) { 372 if (this.shutDown) { 373 return false; 374 } 375 376 if (!this.syncRequests.offer(syncRequest)) { 377 return false; 378 } 379 380 // recheck 381 if (this.shutDown) { 382 if (this.syncRequests.remove(syncRequest)) { 383 return false; 384 } 385 } 386 return true; 387 } 388 389 private void completeSyncRequests(SyncRequest syncRequest, long syncedSequenceId) { 390 if (syncRequest != null) { 391 syncRequest.completableFuture.complete(syncedSequenceId); 392 } 393 while (true) { 394 SyncRequest head = this.syncRequests.peek(); 395 if (head == null) { 396 break; 397 } 398 if (head.sequenceWhenSync > syncedSequenceId) { 399 break; 400 } 401 head.completableFuture.complete(syncedSequenceId); 402 this.syncRequests.poll(); 403 } 404 } 405 406 private void completeExceptionallySyncRequests(SyncRequest syncRequest, Exception exception) { 407 if (syncRequest != null) { 408 syncRequest.completableFuture.completeExceptionally(exception); 409 } 410 while (true) { 411 SyncRequest head = this.syncRequests.peek(); 412 if (head == null) { 413 break; 414 } 415 if (head.writer != syncRequest.writer) { 416 break; 417 } 418 head.completableFuture.completeExceptionally(exception); 419 this.syncRequests.poll(); 420 } 421 } 422 423 private SyncRequest takeSyncRequest() throws InterruptedException { 424 while (true) { 425 // We have to process what we 'take' from the queue 426 SyncRequest syncRequest = this.syncRequests.take(); 427 // See if we can process any syncfutures BEFORE we go sync. 428 long currentHighestSyncedSequence = highestSyncedTxid.get(); 429 if (syncRequest.sequenceWhenSync < currentHighestSyncedSequence) { 430 syncRequest.completableFuture.complete(currentHighestSyncedSequence); 431 continue; 432 } 433 return syncRequest; 434 } 435 } 436 437 @Override 438 public void run() { 439 while (!this.shutDown) { 440 try { 441 SyncRequest syncRequest = this.takeSyncRequest(); 442 // I got something. Lets run. Save off current sequence number in case it changes 443 // while we run. 444 long currentSequenceToUse = syncRequest.sequenceWhenSync; 445 boolean writerBroken = isWriterBroken(); 446 long currentHighestProcessedAppendTxid = highestProcessedAppendTxid; 447 Writer currentWriter = writer; 448 if (currentWriter != syncRequest.writer) { 449 syncRequest.completableFuture.completeExceptionally(WITER_REPLACED_EXCEPTION); 450 continue; 451 } 452 if (writerBroken) { 453 syncRequest.completableFuture.completeExceptionally(WITER_BROKEN_EXCEPTION); 454 continue; 455 } 456 if (currentHighestProcessedAppendTxid > currentSequenceToUse) { 457 currentSequenceToUse = currentHighestProcessedAppendTxid; 458 } 459 Exception lastException = null; 460 try { 461 writer.sync(syncRequest.shouldUseHSync); 462 } catch (IOException e) { 463 LOG.error("Error syncing", e); 464 lastException = e; 465 } catch (Exception e) { 466 LOG.warn("UNEXPECTED", e); 467 lastException = e; 468 } finally { 469 if (lastException != null) { 470 this.completeExceptionallySyncRequests(syncRequest, lastException); 471 } else { 472 this.completeSyncRequests(syncRequest, currentSequenceToUse); 473 } 474 } 475 } catch (InterruptedException e) { 476 // Presume legit interrupt. 477 LOG.info("interrupted"); 478 } catch (Throwable t) { 479 LOG.warn("UNEXPECTED, continuing", t); 480 } 481 } 482 this.clearSyncRequestsWhenShutDown(); 483 } 484 485 private void clearSyncRequestsWhenShutDown() { 486 while (true) { 487 SyncRequest syncRequest = this.syncRequests.poll(); 488 if (syncRequest == null) { 489 break; 490 } 491 syncRequest.completableFuture.completeExceptionally(WAL_CLOSE_EXCEPTION); 492 } 493 } 494 495 void shutDown() { 496 try { 497 this.shutDown = true; 498 this.interrupt(); 499 this.join(); 500 } catch (InterruptedException e) { 501 LOG.warn("interrupted", e); 502 Thread.currentThread().interrupt(); 503 } 504 } 505 } 506 507 @Override 508 protected void checkSlowSyncCount() { 509 if (isLogRollRequested()) { 510 return; 511 } 512 if (doCheckSlowSync()) { 513 // We log this already in checkSlowSync 514 requestLogRoll(SLOW_SYNC); 515 } 516 } 517 518 /** Returns true if number of replicas for the WAL is lower than threshold */ 519 @Override 520 protected boolean doCheckLogLowReplication() { 521 boolean logRollNeeded = false; 522 // if the number of replicas in HDFS has fallen below the configured 523 // value, then roll logs. 524 try { 525 int numCurrentReplicas = getLogReplication(); 526 if (numCurrentReplicas != 0 && numCurrentReplicas < this.minTolerableReplication) { 527 if (this.lowReplicationRollEnabled) { 528 if (this.consecutiveLogRolls.get() < this.lowReplicationRollLimit) { 529 LOG.warn("HDFS pipeline error detected. " + "Found " + numCurrentReplicas 530 + " replicas but expecting no less than " + this.minTolerableReplication 531 + " replicas. " + " Requesting close of WAL. current pipeline: " 532 + Arrays.toString(getPipeline())); 533 logRollNeeded = true; 534 // If rollWriter is requested, increase consecutiveLogRolls. Once it 535 // is larger than lowReplicationRollLimit, disable the 536 // LowReplication-Roller 537 this.consecutiveLogRolls.getAndIncrement(); 538 } else { 539 LOG.warn("Too many consecutive RollWriter requests, it's a sign of " 540 + "the total number of live datanodes is lower than the tolerable replicas."); 541 this.consecutiveLogRolls.set(0); 542 this.lowReplicationRollEnabled = false; 543 } 544 } 545 } else if (numCurrentReplicas >= this.minTolerableReplication) { 546 if (!this.lowReplicationRollEnabled) { 547 // The new writer's log replicas is always the default value. 548 // So we should not enable LowReplication-Roller. If numEntries 549 // is lower than or equals 1, we consider it as a new writer. 550 if (this.numEntries.get() <= 1) { 551 return logRollNeeded; 552 } 553 // Once the live datanode number and the replicas return to normal, 554 // enable the LowReplication-Roller. 555 this.lowReplicationRollEnabled = true; 556 LOG.info("LowReplication-Roller was enabled."); 557 } 558 } 559 } catch (Exception e) { 560 LOG.warn("DFSOutputStream.getNumCurrentReplicas failed because of " + e + ", continuing..."); 561 } 562 return logRollNeeded; 563 } 564 565 /** 566 * {@inheritDoc} 567 * <p> 568 * If the pipeline isn't started yet or is empty, you will get the default replication factor. 569 * Therefore, if this function returns 0, it means you are not properly running with the HDFS-826 570 * patch. 571 */ 572 @Override 573 int getLogReplication() { 574 try { 575 // in standalone mode, it will return 0 576 if (this.hdfs_out instanceof HdfsDataOutputStream) { 577 return ((HdfsDataOutputStream) this.hdfs_out).getCurrentBlockReplication(); 578 } 579 } catch (IOException e) { 580 LOG.info("", e); 581 } 582 return 0; 583 } 584 585 boolean isLowReplicationRollEnabled() { 586 return lowReplicationRollEnabled; 587 } 588 589 public static final long FIXED_OVERHEAD = 590 ClassSize.align(ClassSize.OBJECT + (5 * ClassSize.REFERENCE) + (2 * ClassSize.ATOMIC_INTEGER) 591 + (3 * Bytes.SIZEOF_INT) + (4 * Bytes.SIZEOF_LONG)); 592 593 /** 594 * This method gets the pipeline for the current WAL. 595 */ 596 @Override 597 DatanodeInfo[] getPipeline() { 598 if (this.hdfs_out != null) { 599 if (this.hdfs_out.getWrappedStream() instanceof DFSOutputStream) { 600 return ((DFSOutputStream) this.hdfs_out.getWrappedStream()).getPipeline(); 601 } 602 } 603 return new DatanodeInfo[0]; 604 } 605 606 @Override 607 protected Writer createCombinedWriter(Writer localWriter, Writer remoteWriter) { 608 // put remote writer first as usually it will cost more time to finish, so we write to it first 609 return CombinedWriter.create(remoteWriter, localWriter); 610 } 611}