001/**
002 * Copyright 2010 The Apache Software Foundation
003 *
004 * Licensed to the Apache Software Foundation (ASF) under one
005 * or more contributor license agreements.  See the NOTICE file
006 * distributed with this work for additional information
007 * regarding copyright ownership.  The ASF licenses this file
008 * to you under the Apache License, Version 2.0 (the
009 * "License"); you may not use this file except in compliance
010 * with the License.  You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020package org.apache.hadoop.hbase.regionserver.wal;
021
022import java.io.DataInput;
023import java.io.DataOutput;
024import java.io.FileNotFoundException;
025import java.io.IOException;
026import java.io.OutputStream;
027import java.io.UnsupportedEncodingException;
028import java.lang.reflect.InvocationTargetException;
029import java.lang.reflect.Method;
030import java.net.URLEncoder;
031import java.util.ArrayList;
032import java.util.Arrays;
033import java.util.Collections;
034import java.util.LinkedList;
035import java.util.List;
036import java.util.Map;
037import java.util.NavigableSet;
038import java.util.SortedMap;
039import java.util.TreeMap;
040import java.util.TreeSet;
041import java.util.UUID;
042import java.util.concurrent.ConcurrentSkipListMap;
043import java.util.concurrent.CopyOnWriteArrayList;
044import java.util.concurrent.atomic.AtomicBoolean;
045import java.util.concurrent.atomic.AtomicInteger;
046import java.util.concurrent.atomic.AtomicLong;
047import java.util.concurrent.locks.Lock;
048import java.util.concurrent.locks.ReentrantLock;
049import java.util.regex.Matcher;
050import java.util.regex.Pattern;
051
052import org.apache.commons.logging.Log;
053import org.apache.commons.logging.LogFactory;
054import org.apache.hadoop.conf.Configuration;
055import org.apache.hadoop.fs.FSDataOutputStream;
056import org.apache.hadoop.fs.FileStatus;
057import org.apache.hadoop.fs.FileSystem;
058import org.apache.hadoop.fs.Path;
059import org.apache.hadoop.fs.PathFilter;
060import org.apache.hadoop.fs.Syncable;
061import org.apache.hadoop.hbase.HBaseConfiguration;
062import org.apache.hadoop.hbase.HBaseFileSystem;
063import org.apache.hadoop.hbase.HConstants;
064import org.apache.hadoop.hbase.HRegionInfo;
065import org.apache.hadoop.hbase.HTableDescriptor;
066import org.apache.hadoop.hbase.KeyValue;
067import org.apache.hadoop.hbase.ServerName;
068import org.apache.hadoop.hbase.util.Bytes;
069import org.apache.hadoop.hbase.util.ClassSize;
070import org.apache.hadoop.hbase.util.FSUtils;
071import org.apache.hadoop.hbase.util.HasThread;
072import org.apache.hadoop.hbase.util.Threads;
073import org.apache.hadoop.io.Writable;
074import org.apache.hadoop.util.StringUtils;
075
076/**
077 * HLog stores all the edits to the HStore.  Its the hbase write-ahead-log
078 * implementation.
079 *
080 * It performs logfile-rolling, so external callers are not aware that the
081 * underlying file is being rolled.
082 *
083 * <p>
084 * There is one HLog per RegionServer.  All edits for all Regions carried by
085 * a particular RegionServer are entered first in the HLog.
086 *
087 * <p>
088 * Each HRegion is identified by a unique long <code>int</code>. HRegions do
089 * not need to declare themselves before using the HLog; they simply include
090 * their HRegion-id in the <code>append</code> or
091 * <code>completeCacheFlush</code> calls.
092 *
093 * <p>
094 * An HLog consists of multiple on-disk files, which have a chronological order.
095 * As data is flushed to other (better) on-disk structures, the log becomes
096 * obsolete. We can destroy all the log messages for a given HRegion-id up to
097 * the most-recent CACHEFLUSH message from that HRegion.
098 *
099 * <p>
100 * It's only practical to delete entire files. Thus, we delete an entire on-disk
101 * file F when all of the messages in F have a log-sequence-id that's older
102 * (smaller) than the most-recent CACHEFLUSH message for every HRegion that has
103 * a message in F.
104 *
105 * <p>
106 * Synchronized methods can never execute in parallel. However, between the
107 * start of a cache flush and the completion point, appends are allowed but log
108 * rolling is not. To prevent log rolling taking place during this period, a
109 * separate reentrant lock is used.
110 *
111 * <p>To read an HLog, call {@link #getReader(org.apache.hadoop.fs.FileSystem,
112 * org.apache.hadoop.fs.Path, org.apache.hadoop.conf.Configuration)}.
113 *
114 */
115public class HLog implements Syncable {
116  static final Log LOG = LogFactory.getLog(HLog.class);
117  public static final byte [] METAFAMILY = Bytes.toBytes("METAFAMILY");
118  static final byte [] METAROW = Bytes.toBytes("METAROW");
119
120  /** File Extension used while splitting an HLog into regions (HBASE-2312) */
121  public static final String SPLITTING_EXT = "-splitting";
122  public static final boolean SPLIT_SKIP_ERRORS_DEFAULT = false;
123  /** The META region's HLog filename extension */
124  public static final String META_HLOG_FILE_EXTN = ".meta";
125  public static final String SEPARATE_HLOG_FOR_META = "hbase.regionserver.separate.hlog.for.meta";
126
127  /*
128   * Name of directory that holds recovered edits written by the wal log
129   * splitting code, one per region
130   */
131  public static final String RECOVERED_EDITS_DIR = "recovered.edits";
132  private static final Pattern EDITFILES_NAME_PATTERN =
133    Pattern.compile("-?[0-9]+");
134  public static final String RECOVERED_LOG_TMPFILE_SUFFIX = ".temp";
135  
136  private final FileSystem fs;
137  private final Path dir;
138  private final Configuration conf;
139  private final HLogFileSystem hlogFs;
140  // Listeners that are called on WAL events.
141  private List<WALActionsListener> listeners =
142    new CopyOnWriteArrayList<WALActionsListener>();
143  private final long optionalFlushInterval;
144  private final long blocksize;
145  private final String prefix;
146  private final AtomicLong unflushedEntries = new AtomicLong(0);
147  private volatile long syncedTillHere = 0;
148  private long lastDeferredTxid;
149  private final Path oldLogDir;
150  private volatile boolean logRollRunning;
151
152  private static Class<? extends Writer> logWriterClass;
153  private static Class<? extends Reader> logReaderClass;
154
155  private WALCoprocessorHost coprocessorHost;
156
157  static void resetLogReaderClass() {
158    HLog.logReaderClass = null;
159  }
160
161  private FSDataOutputStream hdfs_out; // FSDataOutputStream associated with the current SequenceFile.writer
162  // Minimum tolerable replicas, if the actual value is lower than it, 
163  // rollWriter will be triggered
164  private int minTolerableReplication;
165  private Method getNumCurrentReplicas; // refers to DFSOutputStream.getNumCurrentReplicas
166  final static Object [] NO_ARGS = new Object []{};
167
168  public interface Reader {
169    void init(FileSystem fs, Path path, Configuration c) throws IOException;
170    void close() throws IOException;
171    Entry next() throws IOException;
172    Entry next(Entry reuse) throws IOException;
173    void seek(long pos) throws IOException;
174    long getPosition() throws IOException;
175    void reset() throws IOException;
176  }
177
178  public interface Writer {
179    void init(FileSystem fs, Path path, Configuration c) throws IOException;
180    void close() throws IOException;
181    void sync() throws IOException;
182    void append(Entry entry) throws IOException;
183    long getLength() throws IOException;
184  }
185
186  /*
187   * Current log file.
188   */
189  Writer writer;
190
191  /*
192   * Map of all log files but the current one.
193   */
194  final SortedMap<Long, Path> outputfiles =
195    Collections.synchronizedSortedMap(new TreeMap<Long, Path>());
196
197  /*
198   * Map of encoded region names to their most recent sequence/edit id in their
199   * memstore.
200   */
201  private final ConcurrentSkipListMap<byte [], Long> lastSeqWritten =
202    new ConcurrentSkipListMap<byte [], Long>(Bytes.BYTES_COMPARATOR);
203
204  private volatile boolean closed = false;
205
206  private final AtomicLong logSeqNum = new AtomicLong(0);
207
208  private boolean forMeta = false;
209
210  // The timestamp (in ms) when the log file was created.
211  private volatile long filenum = -1;
212
213  //number of transactions in the current Hlog.
214  private final AtomicInteger numEntries = new AtomicInteger(0);
215
216  // If live datanode count is lower than the default replicas value,
217  // RollWriter will be triggered in each sync(So the RollWriter will be
218  // triggered one by one in a short time). Using it as a workaround to slow
219  // down the roll frequency triggered by checkLowReplication().
220  private volatile int consecutiveLogRolls = 0;
221  private final int lowReplicationRollLimit;
222
223  // If consecutiveLogRolls is larger than lowReplicationRollLimit,
224  // then disable the rolling in checkLowReplication().
225  // Enable it if the replications recover.
226  private volatile boolean lowReplicationRollEnabled = true;
227
228  // If > than this size, roll the log. This is typically 0.95 times the size
229  // of the default Hdfs block size.
230  private final long logrollsize;
231
232  // size of current log 
233  private long curLogSize = 0;
234
235  // The total size of hlog
236  private AtomicLong totalLogSize = new AtomicLong(0);
237  
238  // This lock prevents starting a log roll during a cache flush.
239  // synchronized is insufficient because a cache flush spans two method calls.
240  private final Lock cacheFlushLock = new ReentrantLock();
241
242  // We synchronize on updateLock to prevent updates and to prevent a log roll
243  // during an update
244  // locked during appends
245  private final Object updateLock = new Object();
246  private final Object flushLock = new Object();
247
248  private final boolean enabled;
249
250  /*
251   * If more than this many logs, force flush of oldest region to oldest edit
252   * goes to disk.  If too many and we crash, then will take forever replaying.
253   * Keep the number of logs tidy.
254   */
255  private final int maxLogs;
256
257  /**
258   * Thread that handles optional sync'ing
259   */
260  private final LogSyncer logSyncer;
261
262  /** Number of log close errors tolerated before we abort */
263  private final int closeErrorsTolerated;
264
265  private final AtomicInteger closeErrorCount = new AtomicInteger();
266
267  /**
268   * Pattern used to validate a HLog file name
269   */
270  private static final Pattern pattern = 
271      Pattern.compile(".*\\.\\d*("+HLog.META_HLOG_FILE_EXTN+")*");
272
273  static byte [] COMPLETE_CACHE_FLUSH;
274  static {
275    try {
276      COMPLETE_CACHE_FLUSH =
277        "HBASE::CACHEFLUSH".getBytes(HConstants.UTF8_ENCODING);
278    } catch (UnsupportedEncodingException e) {
279      assert(false);
280    }
281  }
282
283  public static class Metric {
284    public long min = Long.MAX_VALUE;
285    public long max = 0;
286    public long total = 0;
287    public int count = 0;
288
289    synchronized void inc(final long val) {
290      min = Math.min(min, val);
291      max = Math.max(max, val);
292      total += val;
293      ++count;
294    }
295
296    synchronized Metric get() {
297      Metric copy = new Metric();
298      copy.min = min;
299      copy.max = max;
300      copy.total = total;
301      copy.count = count;
302      this.min = Long.MAX_VALUE;
303      this.max = 0;
304      this.total = 0;
305      this.count = 0;
306      return copy;
307    }
308  }
309
310  // For measuring latency of writes
311  private static Metric writeTime = new Metric();
312  private static Metric writeSize = new Metric();
313  // For measuring latency of syncs
314  private static Metric syncTime = new Metric();
315  //For measuring slow HLog appends
316  private static AtomicLong slowHLogAppendCount = new AtomicLong();
317  private static Metric slowHLogAppendTime = new Metric();
318  
319  public static Metric getWriteTime() {
320    return writeTime.get();
321  }
322
323  public static Metric getWriteSize() {
324    return writeSize.get();
325  }
326
327  public static Metric getSyncTime() {
328    return syncTime.get();
329  }
330
331  public static long getSlowAppendCount() {
332    return slowHLogAppendCount.get();
333  }
334  
335  public static Metric getSlowAppendTime() {
336    return slowHLogAppendTime.get();
337  }
338
339  /**
340   * Constructor.
341   *
342   * @param fs filesystem handle
343   * @param dir path to where hlogs are stored
344   * @param oldLogDir path to where hlogs are archived
345   * @param conf configuration to use
346   * @throws IOException
347   */
348  public HLog(final FileSystem fs, final Path dir, final Path oldLogDir,
349              final Configuration conf)
350  throws IOException {
351    this(fs, dir, oldLogDir, conf, null, true, null, false);
352  }
353
354  /**
355   * Create an edit log at the given <code>dir</code> location.
356   *
357   * You should never have to load an existing log. If there is a log at
358   * startup, it should have already been processed and deleted by the time the
359   * HLog object is started up.
360   *
361   * @param fs filesystem handle
362   * @param dir path to where hlogs are stored
363   * @param oldLogDir path to where hlogs are archived
364   * @param conf configuration to use
365   * @param listeners Listeners on WAL events. Listeners passed here will
366   * be registered before we do anything else; e.g. the
367   * Constructor {@link #rollWriter()}.
368   * @param prefix should always be hostname and port in distributed env and
369   *        it will be URL encoded before being used.
370   *        If prefix is null, "hlog" will be used
371   * @throws IOException
372   */
373  public HLog(final FileSystem fs, final Path dir, final Path oldLogDir,
374      final Configuration conf, final List<WALActionsListener> listeners,
375      final String prefix) throws IOException {
376    this(fs, dir, oldLogDir, conf, listeners, true, prefix, false);
377  }
378
379  /**
380   * Create an edit log at the given <code>dir</code> location.
381   *
382   * You should never have to load an existing log. If there is a log at
383   * startup, it should have already been processed and deleted by the time the
384   * HLog object is started up.
385   *
386   * @param fs filesystem handle
387   * @param dir path to where hlogs are stored
388   * @param oldLogDir path to where hlogs are archived
389   * @param conf configuration to use
390   * @param listeners Listeners on WAL events. Listeners passed here will
391   * be registered before we do anything else; e.g. the
392   * Constructor {@link #rollWriter()}.
393   * @param failIfLogDirExists If true IOException will be thrown if dir already exists.
394   * @param prefix should always be hostname and port in distributed env and
395   *        it will be URL encoded before being used.
396   *        If prefix is null, "hlog" will be used
397   * @param forMeta if this hlog is meant for meta updates
398   * @throws IOException
399   */
400  public HLog(final FileSystem fs, final Path dir, final Path oldLogDir,
401      final Configuration conf, final List<WALActionsListener> listeners,
402      final boolean failIfLogDirExists, final String prefix, boolean forMeta)
403  throws IOException {
404    super();
405    this.fs = fs;
406    this.dir = dir;
407    this.conf = conf;
408    this.hlogFs = new HLogFileSystem(conf);
409    if (listeners != null) {
410      for (WALActionsListener i: listeners) {
411        registerWALActionsListener(i);
412      }
413    }
414    this.blocksize = conf.getLong("hbase.regionserver.hlog.blocksize",
415        FSUtils.getDefaultBlockSize(this.fs, this.dir));
416    // Roll at 95% of block size.
417    float multi = conf.getFloat("hbase.regionserver.logroll.multiplier", 0.95f);
418    this.logrollsize = (long)(this.blocksize * multi);
419    this.optionalFlushInterval =
420      conf.getLong("hbase.regionserver.optionallogflushinterval", 1 * 1000);
421    boolean dirExists = false;
422    if (failIfLogDirExists && (dirExists = this.fs.exists(dir))) {
423      throw new IOException("Target HLog directory already exists: " + dir);
424    }
425    if (!dirExists && !HBaseFileSystem.makeDirOnFileSystem(fs, dir)) {
426      throw new IOException("Unable to mkdir " + dir);
427    }
428    this.oldLogDir = oldLogDir;
429    if (!fs.exists(oldLogDir) && !HBaseFileSystem.makeDirOnFileSystem(fs, oldLogDir)) {
430      throw new IOException("Unable to mkdir " + this.oldLogDir);
431    }
432    this.forMeta = forMeta;
433    this.maxLogs = conf.getInt("hbase.regionserver.maxlogs", 32);
434    this.minTolerableReplication = conf.getInt(
435        "hbase.regionserver.hlog.tolerable.lowreplication",
436        FSUtils.getDefaultReplication(this.fs, this.dir));
437    this.lowReplicationRollLimit = conf.getInt(
438        "hbase.regionserver.hlog.lowreplication.rolllimit", 5);
439    this.enabled = conf.getBoolean("hbase.regionserver.hlog.enabled", true);
440    this.closeErrorsTolerated = conf.getInt(
441        "hbase.regionserver.logroll.errors.tolerated", 0);
442
443    LOG.info("HLog configuration: blocksize=" +
444      StringUtils.byteDesc(this.blocksize) +
445      ", rollsize=" + StringUtils.byteDesc(this.logrollsize) +
446      ", enabled=" + this.enabled +
447      ", optionallogflushinternal=" + this.optionalFlushInterval + "ms");
448    // If prefix is null||empty then just name it hlog
449    this.prefix = prefix == null || prefix.isEmpty() ?
450        "hlog" : URLEncoder.encode(prefix, "UTF8");
451    // rollWriter sets this.hdfs_out if it can.
452    rollWriter();
453
454    // handle the reflection necessary to call getNumCurrentReplicas()
455    this.getNumCurrentReplicas = getGetNumCurrentReplicas(this.hdfs_out);
456
457    logSyncer = new LogSyncer(this.optionalFlushInterval);
458    // When optionalFlushInterval is set as 0, don't start a thread for deferred log sync.
459    if (this.optionalFlushInterval > 0) {
460      Threads.setDaemonThreadRunning(logSyncer.getThread(), Thread.currentThread().getName()
461          + ".logSyncer");
462    } else {
463      LOG.info("hbase.regionserver.optionallogflushinterval is set as "
464          + this.optionalFlushInterval + ". Deferred log syncing won't work. "
465          + "Any Mutation, marked to be deferred synced, will be flushed immediately.");
466    }
467    coprocessorHost = new WALCoprocessorHost(this, conf);
468  }
469
470  /**
471   * Find the 'getNumCurrentReplicas' on the passed <code>os</code> stream.
472   * @return Method or null.
473   */
474  private Method getGetNumCurrentReplicas(final FSDataOutputStream os) {
475    Method m = null;
476    if (os != null) {
477      Class<? extends OutputStream> wrappedStreamClass = os.getWrappedStream()
478          .getClass();
479      try {
480        m = wrappedStreamClass.getDeclaredMethod("getNumCurrentReplicas",
481            new Class<?>[] {});
482        m.setAccessible(true);
483      } catch (NoSuchMethodException e) {
484        LOG.info("FileSystem's output stream doesn't support"
485            + " getNumCurrentReplicas; --HDFS-826 not available; fsOut="
486            + wrappedStreamClass.getName());
487      } catch (SecurityException e) {
488        LOG.info("Doesn't have access to getNumCurrentReplicas on "
489            + "FileSystems's output stream --HDFS-826 not available; fsOut="
490            + wrappedStreamClass.getName(), e);
491        m = null; // could happen on setAccessible()
492      }
493    }
494    if (m != null) {
495      LOG.info("Using getNumCurrentReplicas--HDFS-826");
496    }
497    return m;
498  }
499
500  public void registerWALActionsListener(final WALActionsListener listener) {
501    this.listeners.add(listener);
502  }
503
504  public boolean unregisterWALActionsListener(final WALActionsListener listener) {
505    return this.listeners.remove(listener);
506  }
507
508  /**
509   * @return Current state of the monotonically increasing file id.
510   */
511  public long getFilenum() {
512    return this.filenum;
513  }
514
515  /**
516   * Called by HRegionServer when it opens a new region to ensure that log
517   * sequence numbers are always greater than the latest sequence number of the
518   * region being brought on-line.
519   *
520   * @param newvalue We'll set log edit/sequence number to this value if it
521   * is greater than the current value.
522   */
523  public void setSequenceNumber(final long newvalue) {
524    for (long id = this.logSeqNum.get(); id < newvalue &&
525        !this.logSeqNum.compareAndSet(id, newvalue); id = this.logSeqNum.get()) {
526      // This could spin on occasion but better the occasional spin than locking
527      // every increment of sequence number.
528      LOG.debug("Changed sequenceid from " + logSeqNum + " to " + newvalue);
529    }
530  }
531
532  /**
533   * @return log sequence number
534   */
535  public long getSequenceNumber() {
536    return logSeqNum.get();
537  }
538
539  /**
540   * Method used internal to this class and for tests only.
541   * @return The wrapped stream our writer is using; its not the
542   * writer's 'out' FSDatoOutputStream but the stream that this 'out' wraps
543   * (In hdfs its an instance of DFSDataOutputStream).
544   */
545  // usage: see TestLogRolling.java
546  OutputStream getOutputStream() {
547    return this.hdfs_out.getWrappedStream();
548  }
549
550  /**
551   * Roll the log writer. That is, start writing log messages to a new file.
552   *
553   * Because a log cannot be rolled during a cache flush, and a cache flush
554   * spans two method calls, a special lock needs to be obtained so that a cache
555   * flush cannot start when the log is being rolled and the log cannot be
556   * rolled during a cache flush.
557   *
558   * <p>Note that this method cannot be synchronized because it is possible that
559   * startCacheFlush runs, obtaining the cacheFlushLock, then this method could
560   * start which would obtain the lock on this but block on obtaining the
561   * cacheFlushLock and then completeCacheFlush could be called which would wait
562   * for the lock on this and consequently never release the cacheFlushLock
563   *
564   * @return If lots of logs, flush the returned regions so next time through
565   * we can clean logs. Returns null if nothing to flush.  Names are actual
566   * region names as returned by {@link HRegionInfo#getEncodedName()}
567   * @throws org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException
568   * @throws IOException
569   */
570  public byte [][] rollWriter() throws FailedLogCloseException, IOException {
571    return rollWriter(false);
572  }
573
574  /**
575   * Roll the log writer. That is, start writing log messages to a new file.
576   *
577   * Because a log cannot be rolled during a cache flush, and a cache flush
578   * spans two method calls, a special lock needs to be obtained so that a cache
579   * flush cannot start when the log is being rolled and the log cannot be
580   * rolled during a cache flush.
581   *
582   * <p>Note that this method cannot be synchronized because it is possible that
583   * startCacheFlush runs, obtaining the cacheFlushLock, then this method could
584   * start which would obtain the lock on this but block on obtaining the
585   * cacheFlushLock and then completeCacheFlush could be called which would wait
586   * for the lock on this and consequently never release the cacheFlushLock
587   *
588   * @param force If true, force creation of a new writer even if no entries
589   * have been written to the current writer
590   * @return If lots of logs, flush the returned regions so next time through
591   * we can clean logs. Returns null if nothing to flush.  Names are actual
592   * region names as returned by {@link HRegionInfo#getEncodedName()}
593   * @throws org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException
594   * @throws IOException
595   */
596  public byte [][] rollWriter(boolean force)
597      throws FailedLogCloseException, IOException {
598    // Return if nothing to flush.
599    if (!force && this.writer != null && this.numEntries.get() <= 0) {
600      return null;
601    }
602    byte [][] regionsToFlush = null;
603    this.cacheFlushLock.lock();
604    this.logRollRunning = true;
605    try {
606      if (closed) {
607        LOG.debug("HLog closed.  Skipping rolling of writer");
608        return regionsToFlush;
609      }
610      // Do all the preparation outside of the updateLock to block
611      // as less as possible the incoming writes
612      long currentFilenum = this.filenum;
613      Path oldPath = null;
614      if (currentFilenum > 0) {
615        //computeFilename  will take care of meta hlog filename
616        oldPath = computeFilename(currentFilenum);
617      }
618      this.filenum = System.currentTimeMillis();
619      Path newPath = computeFilename();
620
621      // Tell our listeners that a new log is about to be created
622      if (!this.listeners.isEmpty()) {
623        for (WALActionsListener i : this.listeners) {
624          i.preLogRoll(oldPath, newPath);
625        }
626      }
627      HLog.Writer nextWriter = this.createWriterInstance(fs, newPath, conf);
628      // Can we get at the dfsclient outputstream?  If an instance of
629      // SFLW, it'll have done the necessary reflection to get at the
630      // protected field name.
631      FSDataOutputStream nextHdfsOut = null;
632      if (nextWriter instanceof SequenceFileLogWriter) {
633        nextHdfsOut = ((SequenceFileLogWriter)nextWriter).getWriterFSDataOutputStream();
634        // perform the costly sync before we get the lock to roll writers.
635        try {
636          nextWriter.sync();
637        } catch (IOException e) {
638          // optimization failed, no need to abort here.
639          LOG.warn("pre-sync failed", e);
640        }
641      }
642
643      synchronized (updateLock) {
644        // Clean up current writer.
645        Path oldFile = cleanupCurrentWriter(currentFilenum);
646        this.writer = nextWriter;
647        this.hdfs_out = nextHdfsOut;
648
649        long oldFileLen = 0;
650        if (oldFile != null) {
651          oldFileLen = this.fs.getFileStatus(oldFile).getLen();
652          this.totalLogSize.addAndGet(oldFileLen);
653        }
654        LOG.info((oldFile != null?
655            "Roll " + FSUtils.getPath(oldFile) + ", entries=" +
656            this.numEntries.get() +
657            ", filesize=" + oldFileLen + ". ": "") +
658          " for " + FSUtils.getPath(newPath));
659        this.numEntries.set(0);
660      }
661      // Tell our listeners that a new log was created
662      if (!this.listeners.isEmpty()) {
663        for (WALActionsListener i : this.listeners) {
664          i.postLogRoll(oldPath, newPath);
665        }
666      }
667
668      // Can we delete any of the old log files?
669      if (this.outputfiles.size() > 0) {
670        if (this.lastSeqWritten.isEmpty()) {
671          LOG.debug("Last sequenceid written is empty. Deleting all old hlogs");
672          // If so, then no new writes have come in since all regions were
673          // flushed (and removed from the lastSeqWritten map). Means can
674          // remove all but currently open log file.
675          for (Map.Entry<Long, Path> e : this.outputfiles.entrySet()) {
676            Path path = e.getValue();
677            this.totalLogSize.addAndGet(-this.fs.getFileStatus(path).getLen());
678            archiveLogFile(path, e.getKey());
679          }
680          this.outputfiles.clear();
681        } else {
682          regionsToFlush = cleanOldLogs();
683        }
684      }
685    } finally {
686      this.logRollRunning = false;
687      this.cacheFlushLock.unlock();
688    }
689    return regionsToFlush;
690  }
691
692  /**
693   * This method allows subclasses to inject different writers without having to
694   * extend other methods like rollWriter().
695   * 
696   * @param fs
697   * @param path
698   * @param conf
699   * @return Writer instance
700   * @throws IOException
701   */
702  protected Writer createWriterInstance(final FileSystem fs, final Path path,
703      final Configuration conf) throws IOException {
704    if (forMeta) {
705      //TODO: set a higher replication for the hlog files (HBASE-6773)
706    }
707    return this.hlogFs.createWriter(fs, conf, path);
708  }
709
710  /**
711   * Get a reader for the WAL.
712   * The proper way to tail a log that can be under construction is to first use this method
713   * to get a reader then call {@link HLog.Reader#reset()} to see the new data. It will also
714   * take care of keeping implementation-specific context (like compression).
715   * @param fs
716   * @param path
717   * @param conf
718   * @return A WAL reader.  Close when done with it.
719   * @throws IOException
720   */
721  public static Reader getReader(final FileSystem fs, final Path path,
722                                 Configuration conf)
723      throws IOException {
724    try {
725
726      if (logReaderClass == null) {
727
728        logReaderClass = conf.getClass("hbase.regionserver.hlog.reader.impl",
729            SequenceFileLogReader.class, Reader.class);
730      }
731
732
733      HLog.Reader reader = logReaderClass.newInstance();
734      reader.init(fs, path, conf);
735      return reader;
736    } catch (IOException e) {
737      throw e;
738    }
739    catch (Exception e) {
740      throw new IOException("Cannot get log reader", e);
741    }
742  }
743
744  /**
745   * Get a writer for the WAL.
746   * @param path
747   * @param conf
748   * @return A WAL writer.  Close when done with it.
749   * @throws IOException
750   */
751  public static Writer createWriter(final FileSystem fs,
752      final Path path, Configuration conf)
753  throws IOException {
754    try {
755      if (logWriterClass == null) {
756        logWriterClass = conf.getClass("hbase.regionserver.hlog.writer.impl",
757            SequenceFileLogWriter.class, Writer.class);
758      }
759      HLog.Writer writer = (HLog.Writer) logWriterClass.newInstance();
760      writer.init(fs, path, conf);
761      return writer;
762    } catch (Exception e) {
763      throw new IOException("cannot get log writer", e);
764    }
765  }
766
767  /*
768   * Clean up old commit logs.
769   * @return If lots of logs, flush the returned region so next time through
770   * we can clean logs. Returns null if nothing to flush.  Returns array of
771   * encoded region names to flush.
772   * @throws IOException
773   */
774  private byte [][] cleanOldLogs() throws IOException {
775    Long oldestOutstandingSeqNum = getOldestOutstandingSeqNum();
776    // Get the set of all log files whose last sequence number is smaller than
777    // the oldest edit's sequence number.
778    TreeSet<Long> sequenceNumbers =
779      new TreeSet<Long>(this.outputfiles.headMap(
780        (Long.valueOf(oldestOutstandingSeqNum.longValue()))).keySet());
781    // Now remove old log files (if any)
782    int logsToRemove = sequenceNumbers.size();
783    if (logsToRemove > 0) {
784      if (LOG.isDebugEnabled()) {
785        // Find associated region; helps debugging.
786        byte [] oldestRegion = getOldestRegion(oldestOutstandingSeqNum);
787        LOG.debug("Found " + logsToRemove + " hlogs to remove" +
788          " out of total " + this.outputfiles.size() + ";" +
789          " oldest outstanding sequenceid is " + oldestOutstandingSeqNum +
790          " from region " + Bytes.toStringBinary(oldestRegion));
791      }
792      for (Long seq : sequenceNumbers) {
793        archiveLogFile(this.outputfiles.remove(seq), seq);
794      }
795    }
796
797    // If too many log files, figure which regions we need to flush.
798    // Array is an array of encoded region names.
799    byte [][] regions = null;
800    int logCount = this.outputfiles == null? 0: this.outputfiles.size();
801    if (logCount > this.maxLogs && logCount > 0) {
802      // This is an array of encoded region names.
803      regions = findMemstoresWithEditsEqualOrOlderThan(this.outputfiles.firstKey(),
804        this.lastSeqWritten);
805      if (regions != null) {
806        StringBuilder sb = new StringBuilder();
807        for (int i = 0; i < regions.length; i++) {
808          if (i > 0) sb.append(", ");
809          sb.append(Bytes.toStringBinary(regions[i]));
810        }
811        LOG.info("Too many hlogs: logs=" + logCount + ", maxlogs=" +
812           this.maxLogs + "; forcing flush of " + regions.length + " regions(s): " +
813           sb.toString());
814      }
815    }
816    return regions;
817  }
818
819  /**
820   * Return regions (memstores) that have edits that are equal or less than
821   * the passed <code>oldestWALseqid</code>.
822   * @param oldestWALseqid
823   * @param regionsToSeqids Encoded region names to sequence ids
824   * @return All regions whose seqid is < than <code>oldestWALseqid</code> (Not
825   * necessarily in order).  Null if no regions found.
826   */
827  static byte [][] findMemstoresWithEditsEqualOrOlderThan(final long oldestWALseqid,
828      final Map<byte [], Long> regionsToSeqids) {
829    //  This method is static so it can be unit tested the easier.
830    List<byte []> regions = null;
831    for (Map.Entry<byte [], Long> e: regionsToSeqids.entrySet()) {
832      if (e.getValue().longValue() <= oldestWALseqid) {
833        if (regions == null) regions = new ArrayList<byte []>();
834        // Key is encoded region name.
835        regions.add(e.getKey());
836      }
837    }
838    return regions == null?
839      null: regions.toArray(new byte [][] {HConstants.EMPTY_BYTE_ARRAY});
840  }
841
842  /*
843   * @return Logs older than this id are safe to remove.
844   */
845  private Long getOldestOutstandingSeqNum() {
846    return Collections.min(this.lastSeqWritten.values());
847  }
848
849  /**
850   * @param oldestOutstandingSeqNum
851   * @return (Encoded) name of oldest outstanding region.
852   */
853  private byte [] getOldestRegion(final Long oldestOutstandingSeqNum) {
854    byte [] oldestRegion = null;
855    for (Map.Entry<byte [], Long> e: this.lastSeqWritten.entrySet()) {
856      if (e.getValue().longValue() == oldestOutstandingSeqNum.longValue()) {
857        // Key is encoded region name.
858        oldestRegion = e.getKey();
859        break;
860      }
861    }
862    return oldestRegion;
863  }
864
865  /*
866   * Cleans up current writer closing and adding to outputfiles.
867   * Presumes we're operating inside an updateLock scope.
868   * @return Path to current writer or null if none.
869   * @throws IOException
870   */
871  Path cleanupCurrentWriter(final long currentfilenum) throws IOException {
872    Path oldFile = null;
873    if (this.writer != null) {
874      // Close the current writer, get a new one.
875      try {
876        // Wait till all current transactions are written to the hlog.
877        // No new transactions can occur because we have the updatelock.
878        if (this.unflushedEntries.get() != this.syncedTillHere) {
879          LOG.debug("cleanupCurrentWriter " +
880                   " waiting for transactions to get synced " +
881                   " total " + this.unflushedEntries.get() +
882                   " synced till here " + syncedTillHere);
883          sync();
884        }
885        this.writer.close();
886        this.writer = null;
887        closeErrorCount.set(0);
888      } catch (IOException e) {
889        LOG.error("Failed close of HLog writer", e);
890        int errors = closeErrorCount.incrementAndGet();
891        if (errors <= closeErrorsTolerated && !hasDeferredEntries()) {
892          LOG.warn("Riding over HLog close failure! error count="+errors);
893        } else {
894          if (hasDeferredEntries()) {
895            LOG.error("Aborting due to unflushed edits in HLog");
896          }
897          // Failed close of log file.  Means we're losing edits.  For now,
898          // shut ourselves down to minimize loss.  Alternative is to try and
899          // keep going.  See HBASE-930.
900          FailedLogCloseException flce =
901            new FailedLogCloseException("#" + currentfilenum);
902          flce.initCause(e);
903          throw flce;
904        }
905      }
906      if (currentfilenum >= 0) {
907        oldFile = computeFilename(currentfilenum);
908        this.outputfiles.put(Long.valueOf(this.logSeqNum.get()), oldFile);
909      }
910    }
911    return oldFile;
912  }
913
914  private void archiveLogFile(final Path p, final Long seqno) throws IOException {
915    Path newPath = getHLogArchivePath(this.oldLogDir, p);
916    LOG.info("moving old hlog file " + FSUtils.getPath(p) +
917      " whose highest sequenceid is " + seqno + " to " +
918      FSUtils.getPath(newPath));
919
920    // Tell our listeners that a log is going to be archived.
921    if (!this.listeners.isEmpty()) {
922      for (WALActionsListener i : this.listeners) {
923        i.preLogArchive(p, newPath);
924      }
925    }
926    if (!HBaseFileSystem.renameAndSetModifyTime(this.fs, p, newPath)) {
927      throw new IOException("Unable to rename " + p + " to " + newPath);
928    }
929    // Tell our listeners that a log has been archived.
930    if (!this.listeners.isEmpty()) {
931      for (WALActionsListener i : this.listeners) {
932        i.postLogArchive(p, newPath);
933      }
934    }
935  }
936
937  /**
938   * This is a convenience method that computes a new filename with a given
939   * using the current HLog file-number
940   * @return Path
941   */
942  protected Path computeFilename() {
943    return computeFilename(this.filenum);
944  }
945
946  /**
947   * This is a convenience method that computes a new filename with a given
948   * file-number.
949   * @param filenum to use
950   * @return Path
951   */
952  protected Path computeFilename(long filenum) {
953    if (filenum < 0) {
954      throw new RuntimeException("hlog file number can't be < 0");
955    }
956    String child = prefix + "." + filenum;
957    if (forMeta) {
958      child += HLog.META_HLOG_FILE_EXTN;
959    }
960    return new Path(dir, child);
961  }
962
963  public static boolean isMetaFile(Path p) {
964    if (p.getName().endsWith(HLog.META_HLOG_FILE_EXTN)) {
965      return true;
966    }
967    return false;
968  }
969
970  /**
971   * Shut down the log and delete the log directory
972   *
973   * @throws IOException
974   */
975  public void closeAndDelete() throws IOException {
976    close();
977    if (!fs.exists(this.dir)) return;
978    FileStatus[] files = fs.listStatus(this.dir);
979    for(FileStatus file : files) {
980
981      Path p = getHLogArchivePath(this.oldLogDir, file.getPath());
982      // Tell our listeners that a log is going to be archived.
983      if (!this.listeners.isEmpty()) {
984        for (WALActionsListener i : this.listeners) {
985          i.preLogArchive(file.getPath(), p);
986        }
987      }
988      if (!HBaseFileSystem.renameAndSetModifyTime(fs, file.getPath(), p)) {
989        throw new IOException("Unable to rename " + file.getPath() + " to " + p);
990      }
991      // Tell our listeners that a log was archived.
992      if (!this.listeners.isEmpty()) {
993        for (WALActionsListener i : this.listeners) {
994          i.postLogArchive(file.getPath(), p);
995        }
996      }
997    }
998    LOG.debug("Moved " + files.length + " log files to " +
999      FSUtils.getPath(this.oldLogDir));
1000    if (!HBaseFileSystem.deleteDirFromFileSystem(fs, dir)) {
1001      LOG.info("Unable to delete " + dir);
1002    }
1003  }
1004
1005  /**
1006   * Shut down the log.
1007   *
1008   * @throws IOException
1009   */
1010  public void close() throws IOException {
1011    // When optionalFlushInterval is 0, the logSyncer is not started as a Thread.
1012    if (this.optionalFlushInterval > 0) {
1013      try {
1014        logSyncer.close();
1015        // Make sure we synced everything
1016        logSyncer.join(this.optionalFlushInterval * 2);
1017      } catch (InterruptedException e) {
1018        LOG.error("Exception while waiting for syncer thread to die", e);
1019      }
1020    }
1021
1022    cacheFlushLock.lock();
1023    try {
1024      // Tell our listeners that the log is closing
1025      if (!this.listeners.isEmpty()) {
1026        for (WALActionsListener i : this.listeners) {
1027          i.logCloseRequested();
1028        }
1029      }
1030      synchronized (updateLock) {
1031        this.closed = true;
1032        if (LOG.isDebugEnabled()) {
1033          LOG.debug("closing hlog writer in " + this.dir.toString());
1034        }
1035        if (this.writer != null) {
1036          this.writer.close();
1037        }
1038      }
1039    } finally {
1040      cacheFlushLock.unlock();
1041    }
1042  }
1043
1044  /**
1045   * @param now
1046   * @param regionName
1047   * @param tableName
1048   * @param clusterId
1049   * @return New log key.
1050   */
1051  protected HLogKey makeKey(byte[] regionName, byte[] tableName, long seqnum,
1052      long now, UUID clusterId) {
1053    return new HLogKey(regionName, tableName, seqnum, now, clusterId);
1054  }
1055
1056
1057  /** Append an entry to the log.
1058   *
1059   * @param regionInfo
1060   * @param logEdit
1061   * @param logKey
1062   * @param doSync shall we sync after writing the transaction
1063   * @return The txid of this transaction
1064   * @throws IOException
1065   */
1066  public long append(HRegionInfo regionInfo, HLogKey logKey, WALEdit logEdit,
1067                     HTableDescriptor htd, boolean doSync)
1068  throws IOException {
1069    if (this.closed) {
1070      throw new IOException("Cannot append; log is closed");
1071    }
1072    long txid = 0;
1073    synchronized (updateLock) {
1074      long seqNum = obtainSeqNum();
1075      logKey.setLogSeqNum(seqNum);
1076      // The 'lastSeqWritten' map holds the sequence number of the oldest
1077      // write for each region (i.e. the first edit added to the particular
1078      // memstore). When the cache is flushed, the entry for the
1079      // region being flushed is removed if the sequence number of the flush
1080      // is greater than or equal to the value in lastSeqWritten.
1081      this.lastSeqWritten.putIfAbsent(regionInfo.getEncodedNameAsBytes(),
1082        Long.valueOf(seqNum));
1083      doWrite(regionInfo, logKey, logEdit, htd);
1084      txid = this.unflushedEntries.incrementAndGet();
1085      this.numEntries.incrementAndGet();
1086      if (htd.isDeferredLogFlush()) {
1087        lastDeferredTxid = txid;
1088      }
1089    }
1090
1091    // Sync if catalog region, and if not then check if that table supports
1092    // deferred log flushing
1093    if (doSync &&
1094        (regionInfo.isMetaRegion() ||
1095        !htd.isDeferredLogFlush())) {
1096      // sync txn to file system
1097      this.sync(txid);
1098    }
1099    return txid;
1100  }
1101
1102  /**
1103   * Only used in tests.
1104   *
1105   * @param info
1106   * @param tableName
1107   * @param edits
1108   * @param now
1109   * @param htd
1110   * @throws IOException
1111   */
1112  public void append(HRegionInfo info, byte [] tableName, WALEdit edits,
1113    final long now, HTableDescriptor htd)
1114  throws IOException {
1115    append(info, tableName, edits, HConstants.DEFAULT_CLUSTER_ID, now, htd);
1116  }
1117
1118  /**
1119   * Append a set of edits to the log. Log edits are keyed by (encoded)
1120   * regionName, rowname, and log-sequence-id.
1121   *
1122   * Later, if we sort by these keys, we obtain all the relevant edits for a
1123   * given key-range of the HRegion (TODO). Any edits that do not have a
1124   * matching COMPLETE_CACHEFLUSH message can be discarded.
1125   *
1126   * <p>
1127   * Logs cannot be restarted once closed, or once the HLog process dies. Each
1128   * time the HLog starts, it must create a new log. This means that other
1129   * systems should process the log appropriately upon each startup (and prior
1130   * to initializing HLog).
1131   *
1132   * synchronized prevents appends during the completion of a cache flush or for
1133   * the duration of a log roll.
1134   *
1135   * @param info
1136   * @param tableName
1137   * @param edits
1138   * @param clusterId The originating clusterId for this edit (for replication)
1139   * @param now
1140   * @param doSync shall we sync?
1141   * @return txid of this transaction
1142   * @throws IOException
1143   */
1144  private long append(HRegionInfo info, byte [] tableName, WALEdit edits, UUID clusterId,
1145      final long now, HTableDescriptor htd, boolean doSync)
1146    throws IOException {
1147      if (edits.isEmpty()) return this.unflushedEntries.get();;
1148      if (this.closed) {
1149        throw new IOException("Cannot append; log is closed");
1150      }
1151      long txid = 0;
1152      synchronized (this.updateLock) {
1153        long seqNum = obtainSeqNum();
1154        // The 'lastSeqWritten' map holds the sequence number of the oldest
1155        // write for each region (i.e. the first edit added to the particular
1156        // memstore). . When the cache is flushed, the entry for the
1157        // region being flushed is removed if the sequence number of the flush
1158        // is greater than or equal to the value in lastSeqWritten.
1159        // Use encoded name.  Its shorter, guaranteed unique and a subset of
1160        // actual  name.
1161        byte [] encodedRegionName = info.getEncodedNameAsBytes();
1162        this.lastSeqWritten.putIfAbsent(encodedRegionName, seqNum);
1163        HLogKey logKey = makeKey(encodedRegionName, tableName, seqNum, now, clusterId);
1164        doWrite(info, logKey, edits, htd);
1165        this.numEntries.incrementAndGet();
1166        txid = this.unflushedEntries.incrementAndGet();
1167        if (htd.isDeferredLogFlush()) {
1168          lastDeferredTxid = txid;
1169        }
1170      }
1171      // Sync if catalog region, and if not then check if that table supports
1172      // deferred log flushing
1173      if (doSync && 
1174          (info.isMetaRegion() ||
1175          !htd.isDeferredLogFlush())) {
1176        // sync txn to file system
1177        this.sync(txid);
1178      }
1179      return txid;
1180    }
1181
1182  /**
1183   * Append a set of edits to the log. Log edits are keyed by (encoded)
1184   * regionName, rowname, and log-sequence-id. The HLog is not flushed
1185   * after this transaction is written to the log.
1186   *
1187   * @param info
1188   * @param tableName
1189   * @param edits
1190   * @param clusterId The originating clusterId for this edit (for replication)
1191   * @param now
1192   * @return txid of this transaction
1193   * @throws IOException
1194   */
1195  public long appendNoSync(HRegionInfo info, byte [] tableName, WALEdit edits, 
1196    UUID clusterId, final long now, HTableDescriptor htd)
1197    throws IOException {
1198    return append(info, tableName, edits, clusterId, now, htd, false);
1199  }
1200
1201  /**
1202   * Append a set of edits to the log. Log edits are keyed by (encoded)
1203   * regionName, rowname, and log-sequence-id. The HLog is flushed
1204   * after this transaction is written to the log.
1205   *
1206   * @param info
1207   * @param tableName
1208   * @param edits
1209   * @param clusterId The originating clusterId for this edit (for replication)
1210   * @param now
1211   * @return txid of this transaction
1212   * @throws IOException
1213   */
1214  public long append(HRegionInfo info, byte [] tableName, WALEdit edits, 
1215    UUID clusterId, final long now, HTableDescriptor htd)
1216    throws IOException {
1217    return append(info, tableName, edits, clusterId, now, htd, true);
1218  }
1219
1220  /**
1221   * This class is responsible to hold the HLog's appended Entry list
1222   * and to sync them according to a configurable interval.
1223   *
1224   * Deferred log flushing works first by piggy backing on this process by
1225   * simply not sync'ing the appended Entry. It can also be sync'd by other
1226   * non-deferred log flushed entries outside of this thread.
1227   */
1228  class LogSyncer extends HasThread {
1229
1230    private final long optionalFlushInterval;
1231
1232    private AtomicBoolean closeLogSyncer = new AtomicBoolean(false);
1233
1234    // List of pending writes to the HLog. There corresponds to transactions
1235    // that have not yet returned to the client. We keep them cached here
1236    // instead of writing them to HDFS piecemeal, because the HDFS write 
1237    // method is pretty heavyweight as far as locking is concerned. The 
1238    // goal is to increase the batchsize for writing-to-hdfs as well as
1239    // sync-to-hdfs, so that we can get better system throughput.
1240    private List<Entry> pendingWrites = new LinkedList<Entry>();
1241
1242    LogSyncer(long optionalFlushInterval) {
1243      this.optionalFlushInterval = optionalFlushInterval;
1244    }
1245
1246    @Override
1247    public void run() {
1248      try {
1249        // awaiting with a timeout doesn't always
1250        // throw exceptions on interrupt
1251        while(!this.isInterrupted() && !closeLogSyncer.get()) {
1252
1253          try {
1254            if (unflushedEntries.get() <= syncedTillHere) {
1255              synchronized (closeLogSyncer) {
1256                closeLogSyncer.wait(this.optionalFlushInterval);
1257              }
1258            }
1259            // Calling sync since we waited or had unflushed entries.
1260            // Entries appended but not sync'd are taken care of here AKA
1261            // deferred log flush
1262            sync();
1263          } catch (IOException e) {
1264            LOG.error("Error while syncing, requesting close of hlog ", e);
1265            requestLogRoll();
1266          }
1267        }
1268      } catch (InterruptedException e) {
1269        LOG.debug(getName() + " interrupted while waiting for sync requests");
1270      } finally {
1271        LOG.info(getName() + " exiting");
1272      }
1273    }
1274
1275    // appends new writes to the pendingWrites. It is better to keep it in
1276    // our own queue rather than writing it to the HDFS output stream because
1277    // HDFSOutputStream.writeChunk is not lightweight at all.
1278    synchronized void append(Entry e) throws IOException {
1279      pendingWrites.add(e);
1280    }
1281
1282    // Returns all currently pending writes. New writes
1283    // will accumulate in a new list.
1284    synchronized List<Entry> getPendingWrites() {
1285      List<Entry> save = this.pendingWrites;
1286      this.pendingWrites = new LinkedList<Entry>();
1287      return save;
1288    }
1289
1290    // writes out pending entries to the HLog
1291    void hlogFlush(Writer writer, List<Entry> pending) throws IOException {
1292      if (pending == null) return;
1293
1294      // write out all accumulated Entries to hdfs.
1295      for (Entry e : pending) {
1296        writer.append(e);
1297      }
1298    }
1299
1300    void close() {
1301      synchronized (closeLogSyncer) {
1302        closeLogSyncer.set(true);
1303        closeLogSyncer.notifyAll();
1304      }
1305    }
1306  }
1307
1308  // sync all known transactions
1309  private void syncer() throws IOException {
1310    syncer(this.unflushedEntries.get()); // sync all pending items
1311  }
1312
1313  // sync all transactions upto the specified txid
1314  private void syncer(long txid) throws IOException {
1315    // if the transaction that we are interested in is already
1316    // synced, then return immediately.
1317    if (txid <= this.syncedTillHere) {
1318      return;
1319    }
1320    Writer tempWriter;
1321    synchronized (this.updateLock) {
1322      if (this.closed) return;
1323      tempWriter = this.writer; // guaranteed non-null
1324    }
1325    try {
1326      long doneUpto;
1327      long now = System.currentTimeMillis();
1328      // First flush all the pending writes to HDFS. Then 
1329      // issue the sync to HDFS. If sync is successful, then update
1330      // syncedTillHere to indicate that transactions till this
1331      // number has been successfully synced.
1332      IOException ioe = null;
1333      List<Entry> pending = null;
1334      synchronized (flushLock) {
1335        if (txid <= this.syncedTillHere) {
1336          return;
1337        }
1338        doneUpto = this.unflushedEntries.get();
1339        pending = logSyncer.getPendingWrites();
1340        try {
1341          logSyncer.hlogFlush(tempWriter, pending);
1342        } catch(IOException io) {
1343          ioe = io;
1344          LOG.error("syncer encountered error, will retry. txid=" + txid, ioe);
1345        }
1346      }
1347      if (ioe != null && pending != null) {
1348        synchronized (this.updateLock) {
1349          synchronized (flushLock) {
1350            // HBASE-4387, HBASE-5623, retry with updateLock held
1351            tempWriter = this.writer;
1352            logSyncer.hlogFlush(tempWriter, pending);
1353          }
1354        }
1355      }
1356      // another thread might have sync'ed avoid double-sync'ing
1357      if (txid <= this.syncedTillHere) {
1358        return;
1359      }
1360      try {
1361        tempWriter.sync();
1362      } catch (IOException io) {
1363        synchronized (this.updateLock) {
1364          // HBASE-4387, HBASE-5623, retry with updateLock held
1365          tempWriter = this.writer;
1366          tempWriter.sync();
1367        }
1368      }
1369      this.syncedTillHere = Math.max(this.syncedTillHere, doneUpto);
1370
1371      syncTime.inc(System.currentTimeMillis() - now);
1372      if (!this.logRollRunning) {
1373        checkLowReplication();
1374        try {
1375          curLogSize = tempWriter.getLength();
1376          if (curLogSize > this.logrollsize) {
1377            requestLogRoll();
1378          }
1379        } catch (IOException x) {
1380          LOG.debug("Log roll failed and will be retried. (This is not an error)");
1381        }
1382      }
1383    } catch (IOException e) {
1384      LOG.fatal("Could not sync. Requesting close of hlog", e);
1385      requestLogRoll();
1386      throw e;
1387    }
1388  }
1389
1390  private void checkLowReplication() {
1391    // if the number of replicas in HDFS has fallen below the configured
1392    // value, then roll logs.
1393    try {
1394      int numCurrentReplicas = getLogReplication();
1395      if (numCurrentReplicas != 0
1396          && numCurrentReplicas < this.minTolerableReplication) {
1397        if (this.lowReplicationRollEnabled) {
1398          if (this.consecutiveLogRolls < this.lowReplicationRollLimit) {
1399            LOG.warn("HDFS pipeline error detected. " + "Found "
1400                + numCurrentReplicas + " replicas but expecting no less than "
1401                + this.minTolerableReplication + " replicas. "
1402                + " Requesting close of hlog.");
1403            requestLogRoll();
1404            // If rollWriter is requested, increase consecutiveLogRolls. Once it
1405            // is larger than lowReplicationRollLimit, disable the
1406            // LowReplication-Roller
1407            this.consecutiveLogRolls++;
1408          } else {
1409            LOG.warn("Too many consecutive RollWriter requests, it's a sign of "
1410                + "the total number of live datanodes is lower than the tolerable replicas.");
1411            this.consecutiveLogRolls = 0;
1412            this.lowReplicationRollEnabled = false;
1413          }
1414        }
1415      } else if (numCurrentReplicas >= this.minTolerableReplication) {
1416
1417        if (!this.lowReplicationRollEnabled) {
1418          // The new writer's log replicas is always the default value.
1419          // So we should not enable LowReplication-Roller. If numEntries
1420          // is lower than or equals 1, we consider it as a new writer.
1421          if (this.numEntries.get() <= 1) {
1422            return;
1423          }
1424          // Once the live datanode number and the replicas return to normal,
1425          // enable the LowReplication-Roller.
1426          this.lowReplicationRollEnabled = true;
1427          LOG.info("LowReplication-Roller was enabled.");
1428        }
1429      }
1430    } catch (Exception e) {
1431      LOG.warn("Unable to invoke DFSOutputStream.getNumCurrentReplicas" + e +
1432          " still proceeding ahead...");
1433    }
1434  }
1435
1436  /**
1437   * This method gets the datanode replication count for the current HLog.
1438   *
1439   * If the pipeline isn't started yet or is empty, you will get the default
1440   * replication factor.  Therefore, if this function returns 0, it means you
1441   * are not properly running with the HDFS-826 patch.
1442   * @throws InvocationTargetException
1443   * @throws IllegalAccessException
1444   * @throws IllegalArgumentException
1445   *
1446   * @throws Exception
1447   */
1448  int getLogReplication()
1449  throws IllegalArgumentException, IllegalAccessException, InvocationTargetException {
1450    if (this.getNumCurrentReplicas != null && this.hdfs_out != null) {
1451      Object repl = this.getNumCurrentReplicas.invoke(getOutputStream(), NO_ARGS);
1452      if (repl instanceof Integer) {
1453        return ((Integer)repl).intValue();
1454      }
1455    }
1456    return 0;
1457  }
1458
1459  boolean canGetCurReplicas() {
1460    return this.getNumCurrentReplicas != null;
1461  }
1462
1463  public void hsync() throws IOException {
1464    syncer();
1465  }
1466
1467  public void hflush() throws IOException {
1468    syncer();
1469  }
1470
1471  public void sync() throws IOException {
1472    syncer();
1473  }
1474
1475  public void sync(long txid) throws IOException {
1476    syncer(txid);
1477  }
1478
1479  private void requestLogRoll() {
1480    if (!this.listeners.isEmpty()) {
1481      for (WALActionsListener i: this.listeners) {
1482        i.logRollRequested();
1483      }
1484    }
1485  }
1486
1487  protected void doWrite(HRegionInfo info, HLogKey logKey, WALEdit logEdit,
1488                           HTableDescriptor htd)
1489  throws IOException {
1490    if (!this.enabled) {
1491      return;
1492    }
1493    if (!this.listeners.isEmpty()) {
1494      for (WALActionsListener i: this.listeners) {
1495        i.visitLogEntryBeforeWrite(htd, logKey, logEdit);
1496      }
1497    }
1498    try {
1499      long now = System.currentTimeMillis();
1500      // coprocessor hook:
1501      if (!coprocessorHost.preWALWrite(info, logKey, logEdit)) {
1502        // write to our buffer for the Hlog file.
1503        logSyncer.append(new HLog.Entry(logKey, logEdit));
1504      }
1505      long took = System.currentTimeMillis() - now;
1506      coprocessorHost.postWALWrite(info, logKey, logEdit);
1507      writeTime.inc(took);
1508      long len = 0;
1509      for (KeyValue kv : logEdit.getKeyValues()) {
1510        len += kv.getLength();
1511      }
1512      writeSize.inc(len);
1513      if (took > 1000) {
1514        LOG.warn(String.format(
1515          "%s took %d ms appending an edit to hlog; editcount=%d, len~=%s",
1516          Thread.currentThread().getName(), took, this.numEntries.get(),
1517          StringUtils.humanReadableInt(len)));
1518        slowHLogAppendCount.incrementAndGet();
1519        slowHLogAppendTime.inc(took);
1520      }
1521    } catch (IOException e) {
1522      LOG.fatal("Could not append. Requesting close of hlog", e);
1523      requestLogRoll();
1524      throw e;
1525    }
1526  }
1527
1528
1529  /** @return How many items have been added to the log */
1530  int getNumEntries() {
1531    return numEntries.get();
1532  }
1533
1534  /**
1535   * Obtain a log sequence number.
1536   */
1537  public long obtainSeqNum() {
1538    return this.logSeqNum.incrementAndGet();
1539  }
1540
1541  /** @return the number of log files in use, including current one */
1542  public int getNumLogFiles() {
1543    return outputfiles.size() + 1;
1544  }
1545  
1546  /** @return the total size of log files in use, including current one */
1547  public long getNumLogFileSize() {
1548    return totalLogSize.get() + curLogSize;
1549  }
1550
1551  private byte[] getSnapshotName(byte[] encodedRegionName) {
1552    byte snp[] = new byte[encodedRegionName.length + 3];
1553    // an encoded region name has only hex digits. s, n or p are not hex
1554    // and therefore snapshot-names will never collide with
1555    // encoded-region-names
1556    snp[0] = 's'; snp[1] = 'n'; snp[2] = 'p';
1557    for (int i = 0; i < encodedRegionName.length; i++) {
1558      snp[i+3] = encodedRegionName[i];
1559    }
1560    return snp;
1561  }
1562
1563  /**
1564   * By acquiring a log sequence ID, we can allow log messages to continue while
1565   * we flush the cache.
1566   *
1567   * Acquire a lock so that we do not roll the log between the start and
1568   * completion of a cache-flush. Otherwise the log-seq-id for the flush will
1569   * not appear in the correct logfile.
1570   *
1571   * Ensuring that flushes and log-rolls don't happen concurrently also allows
1572   * us to temporarily put a log-seq-number in lastSeqWritten against the region
1573   * being flushed that might not be the earliest in-memory log-seq-number for
1574   * that region. By the time the flush is completed or aborted and before the
1575   * cacheFlushLock is released it is ensured that lastSeqWritten again has the
1576   * oldest in-memory edit's lsn for the region that was being flushed.
1577   *
1578   * In this method, by removing the entry in lastSeqWritten for the region
1579   * being flushed we ensure that the next edit inserted in this region will be
1580   * correctly recorded in {@link #append(HRegionInfo, byte[], WALEdit, long, HTableDescriptor)} The
1581   * lsn of the earliest in-memory lsn - which is now in the memstore snapshot -
1582   * is saved temporarily in the lastSeqWritten map while the flush is active.
1583   *
1584   * @return sequence ID to pass
1585   *         {@link #completeCacheFlush(byte[], byte[], long, boolean)} (byte[],
1586   *         byte[], long)}
1587   * @see #completeCacheFlush(byte[], byte[], long, boolean)
1588   * @see #abortCacheFlush(byte[])
1589   */
1590  public long startCacheFlush(final byte[] encodedRegionName) {
1591    this.cacheFlushLock.lock();
1592    Long seq = this.lastSeqWritten.remove(encodedRegionName);
1593    // seq is the lsn of the oldest edit associated with this region. If a
1594    // snapshot already exists - because the last flush failed - then seq will
1595    // be the lsn of the oldest edit in the snapshot
1596    if (seq != null) {
1597      // keeping the earliest sequence number of the snapshot in
1598      // lastSeqWritten maintains the correctness of
1599      // getOldestOutstandingSeqNum(). But it doesn't matter really because
1600      // everything is being done inside of cacheFlush lock.
1601      Long oldseq =
1602        lastSeqWritten.put(getSnapshotName(encodedRegionName), seq);
1603      if (oldseq != null) {
1604        LOG.error("Logic Error Snapshot seq id from earlier flush still" +
1605            " present! for region " + Bytes.toString(encodedRegionName) +
1606            " overwritten oldseq=" + oldseq + "with new seq=" + seq);
1607        Runtime.getRuntime().halt(1);
1608      }
1609    }
1610    return obtainSeqNum();
1611  }
1612
1613
1614  /**
1615   * Complete the cache flush
1616   *
1617   * Protected by cacheFlushLock
1618   *
1619   * @param encodedRegionName
1620   * @param tableName
1621   * @param logSeqId
1622   * @throws IOException
1623   */
1624  public void completeCacheFlush(final byte [] encodedRegionName,
1625      final byte [] tableName, final long logSeqId, final boolean isMetaRegion)
1626  throws IOException {
1627    try {
1628      if (this.closed) {
1629        return;
1630      }
1631      long txid = 0;
1632      synchronized (updateLock) {
1633        long now = System.currentTimeMillis();
1634        WALEdit edit = completeCacheFlushLogEdit();
1635        HLogKey key = makeKey(encodedRegionName, tableName, logSeqId,
1636            System.currentTimeMillis(), HConstants.DEFAULT_CLUSTER_ID);
1637        logSyncer.append(new Entry(key, edit));
1638        txid = this.unflushedEntries.incrementAndGet();
1639        writeTime.inc(System.currentTimeMillis() - now);
1640        long len = 0;
1641        for (KeyValue kv : edit.getKeyValues()) {
1642          len += kv.getLength();
1643        }
1644        writeSize.inc(len);
1645        this.numEntries.incrementAndGet();
1646      }
1647      // sync txn to file system
1648      this.sync(txid);
1649
1650    } finally {
1651      // updateLock not needed for removing snapshot's entry
1652      // Cleaning up of lastSeqWritten is in the finally clause because we
1653      // don't want to confuse getOldestOutstandingSeqNum()
1654      this.lastSeqWritten.remove(getSnapshotName(encodedRegionName));
1655      this.cacheFlushLock.unlock();
1656    }
1657  }
1658
1659  private WALEdit completeCacheFlushLogEdit() {
1660    KeyValue kv = new KeyValue(METAROW, METAFAMILY, null,
1661      System.currentTimeMillis(), COMPLETE_CACHE_FLUSH);
1662    WALEdit e = new WALEdit();
1663    e.add(kv);
1664    return e;
1665  }
1666
1667  /**
1668   * Abort a cache flush.
1669   * Call if the flush fails. Note that the only recovery for an aborted flush
1670   * currently is a restart of the regionserver so the snapshot content dropped
1671   * by the failure gets restored to the memstore.
1672   */
1673  public void abortCacheFlush(byte[] encodedRegionName) {
1674    Long snapshot_seq =
1675      this.lastSeqWritten.remove(getSnapshotName(encodedRegionName));
1676    if (snapshot_seq != null) {
1677      // updateLock not necessary because we are racing against
1678      // lastSeqWritten.putIfAbsent() in append() and we will always win
1679      // before releasing cacheFlushLock make sure that the region's entry in
1680      // lastSeqWritten points to the earliest edit in the region
1681      Long current_memstore_earliest_seq =
1682        this.lastSeqWritten.put(encodedRegionName, snapshot_seq);
1683      if (current_memstore_earliest_seq != null &&
1684          (current_memstore_earliest_seq.longValue() <=
1685            snapshot_seq.longValue())) {
1686        LOG.error("Logic Error region " + Bytes.toString(encodedRegionName) +
1687            "acquired edits out of order current memstore seq=" +
1688            current_memstore_earliest_seq + " snapshot seq=" + snapshot_seq);
1689        Runtime.getRuntime().halt(1);
1690      }
1691    }
1692    this.cacheFlushLock.unlock();
1693  }
1694
1695  /**
1696   * @param family
1697   * @return true if the column is a meta column
1698   */
1699  public static boolean isMetaFamily(byte [] family) {
1700    return Bytes.equals(METAFAMILY, family);
1701  }
1702
1703  /**
1704   * Get LowReplication-Roller status
1705   * 
1706   * @return lowReplicationRollEnabled
1707   */
1708  public boolean isLowReplicationRollEnabled() {
1709    return lowReplicationRollEnabled;
1710  }
1711
1712  @SuppressWarnings("unchecked")
1713  public static Class<? extends HLogKey> getKeyClass(Configuration conf) {
1714     return (Class<? extends HLogKey>)
1715       conf.getClass("hbase.regionserver.hlog.keyclass", HLogKey.class);
1716  }
1717
1718  public static HLogKey newKey(Configuration conf) throws IOException {
1719    Class<? extends HLogKey> keyClass = getKeyClass(conf);
1720    try {
1721      return keyClass.newInstance();
1722    } catch (InstantiationException e) {
1723      throw new IOException("cannot create hlog key");
1724    } catch (IllegalAccessException e) {
1725      throw new IOException("cannot create hlog key");
1726    }
1727  }
1728
1729  /**
1730   * Utility class that lets us keep track of the edit with it's key
1731   * Only used when splitting logs
1732   */
1733  public static class Entry implements Writable {
1734    private WALEdit edit;
1735    private HLogKey key;
1736
1737    public Entry() {
1738      edit = new WALEdit();
1739      key = new HLogKey();
1740    }
1741
1742    /**
1743     * Constructor for both params
1744     * @param edit log's edit
1745     * @param key log's key
1746     */
1747    public Entry(HLogKey key, WALEdit edit) {
1748      super();
1749      this.key = key;
1750      this.edit = edit;
1751    }
1752    /**
1753     * Gets the edit
1754     * @return edit
1755     */
1756    public WALEdit getEdit() {
1757      return edit;
1758    }
1759    /**
1760     * Gets the key
1761     * @return key
1762     */
1763    public HLogKey getKey() {
1764      return key;
1765    }
1766
1767    @Override
1768    public String toString() {
1769      return this.key + "=" + this.edit;
1770    }
1771
1772    @Override
1773    public void write(DataOutput dataOutput) throws IOException {
1774      this.key.write(dataOutput);
1775      this.edit.write(dataOutput);
1776    }
1777
1778    @Override
1779    public void readFields(DataInput dataInput) throws IOException {
1780      this.key.readFields(dataInput);
1781      this.edit.readFields(dataInput);
1782    }
1783  }
1784
1785  /**
1786   * Construct the HLog directory name
1787   *
1788   * @param serverName Server name formatted as described in {@link ServerName}
1789   * @return the HLog directory name
1790   */
1791  public static String getHLogDirectoryName(final String serverName) {
1792    StringBuilder dirName = new StringBuilder(HConstants.HREGION_LOGDIR_NAME);
1793    dirName.append("/");
1794    dirName.append(serverName);
1795    return dirName.toString();
1796  }
1797
1798  /**
1799   * Get the directory we are making logs in.
1800   * 
1801   * @return dir
1802   */
1803  protected Path getDir() {
1804    return dir;
1805  }
1806  
1807  /**
1808   * @param filename name of the file to validate
1809   * @return <tt>true</tt> if the filename matches an HLog, <tt>false</tt>
1810   *         otherwise
1811   */
1812  public static boolean validateHLogFilename(String filename) {
1813    return pattern.matcher(filename).matches();
1814  }
1815
1816  static Path getHLogArchivePath(Path oldLogDir, Path p) {
1817    return new Path(oldLogDir, p.getName());
1818  }
1819
1820  static String formatRecoveredEditsFileName(final long seqid) {
1821    return String.format("%019d", seqid);
1822  }
1823
1824  /**
1825   * Returns sorted set of edit files made by wal-log splitter, excluding files
1826   * with '.temp' suffix.
1827   * @param fs
1828   * @param regiondir
1829   * @return Files in passed <code>regiondir</code> as a sorted set.
1830   * @throws IOException
1831   */
1832  public static NavigableSet<Path> getSplitEditFilesSorted(final FileSystem fs,
1833      final Path regiondir)
1834  throws IOException {
1835    NavigableSet<Path> filesSorted = new TreeSet<Path>();
1836    Path editsdir = getRegionDirRecoveredEditsDir(regiondir);
1837    if (!fs.exists(editsdir)) return filesSorted;
1838    FileStatus[] files = FSUtils.listStatus(fs, editsdir, new PathFilter() {
1839      @Override
1840      public boolean accept(Path p) {
1841        boolean result = false;
1842        try {
1843          // Return files and only files that match the editfile names pattern.
1844          // There can be other files in this directory other than edit files.
1845          // In particular, on error, we'll move aside the bad edit file giving
1846          // it a timestamp suffix.  See moveAsideBadEditsFile.
1847          Matcher m = EDITFILES_NAME_PATTERN.matcher(p.getName());
1848          result = fs.isFile(p) && m.matches();
1849          // Skip the file whose name ends with RECOVERED_LOG_TMPFILE_SUFFIX,
1850          // because it means splithlog thread is writting this file.
1851          if (p.getName().endsWith(RECOVERED_LOG_TMPFILE_SUFFIX)) {
1852            result = false;
1853          }
1854        } catch (IOException e) {
1855          LOG.warn("Failed isFile check on " + p);
1856        }
1857        return result;
1858      }
1859    });
1860    if (files == null) return filesSorted;
1861    for (FileStatus status: files) {
1862      filesSorted.add(status.getPath());
1863    }
1864    return filesSorted;
1865  }
1866
1867  /**
1868   * Move aside a bad edits file.
1869   * @param fs
1870   * @param edits Edits file to move aside.
1871   * @return The name of the moved aside file.
1872   * @throws IOException
1873   */
1874  public static Path moveAsideBadEditsFile(final FileSystem fs,
1875      final Path edits)
1876  throws IOException {
1877    Path moveAsideName = new Path(edits.getParent(), edits.getName() + "." +
1878      System.currentTimeMillis());
1879    if (!HBaseFileSystem.renameDirForFileSystem(fs, edits, moveAsideName)) {
1880      LOG.warn("Rename failed from " + edits + " to " + moveAsideName);
1881    }
1882    return moveAsideName;
1883  }
1884
1885  /**
1886   * @param regiondir This regions directory in the filesystem.
1887   * @return The directory that holds recovered edits files for the region
1888   * <code>regiondir</code>
1889   */
1890  public static Path getRegionDirRecoveredEditsDir(final Path regiondir) {
1891    return new Path(regiondir, RECOVERED_EDITS_DIR);
1892  }
1893
1894  public static final long FIXED_OVERHEAD = ClassSize.align(
1895    ClassSize.OBJECT + (5 * ClassSize.REFERENCE) +
1896    ClassSize.ATOMIC_INTEGER + Bytes.SIZEOF_INT + (3 * Bytes.SIZEOF_LONG));
1897
1898  private static void usage() {
1899    System.err.println("Usage: HLog <ARGS>");
1900    System.err.println("Arguments:");
1901    System.err.println(" --dump  Dump textual representation of passed one or more files");
1902    System.err.println("         For example: HLog --dump hdfs://example.com:9000/hbase/.logs/MACHINE/LOGFILE");
1903    System.err.println(" --split Split the passed directory of WAL logs");
1904    System.err.println("         For example: HLog --split hdfs://example.com:9000/hbase/.logs/DIR");
1905  }
1906
1907  private static void split(final Configuration conf, final Path p)
1908  throws IOException {
1909    FileSystem fs = FileSystem.get(conf);
1910    if (!fs.exists(p)) {
1911      throw new FileNotFoundException(p.toString());
1912    }
1913    final Path baseDir = new Path(conf.get(HConstants.HBASE_DIR));
1914    final Path oldLogDir = new Path(baseDir, HConstants.HREGION_OLDLOGDIR_NAME);
1915    if (!fs.getFileStatus(p).isDir()) {
1916      throw new IOException(p + " is not a directory");
1917    }
1918
1919    HLogSplitter logSplitter = HLogSplitter.createLogSplitter(
1920        conf, baseDir, p, oldLogDir, fs);
1921    logSplitter.splitLog();
1922  }
1923
1924  /**
1925   * @return Coprocessor host.
1926   */
1927  public WALCoprocessorHost getCoprocessorHost() {
1928    return coprocessorHost;
1929  }
1930
1931  /** Provide access to currently deferred sequence num for tests */
1932  boolean hasDeferredEntries() {
1933    return lastDeferredTxid > syncedTillHere;
1934  }
1935
1936  /**
1937   * Pass one or more log file names and it will either dump out a text version
1938   * on <code>stdout</code> or split the specified log files.
1939   *
1940   * @param args
1941   * @throws IOException
1942   */
1943  public static void main(String[] args) throws IOException {
1944    if (args.length < 2) {
1945      usage();
1946      System.exit(-1);
1947    }
1948    // either dump using the HLogPrettyPrinter or split, depending on args
1949    if (args[0].compareTo("--dump") == 0) {
1950      HLogPrettyPrinter.run(Arrays.copyOfRange(args, 1, args.length));
1951    } else if (args[0].compareTo("--split") == 0) {
1952      Configuration conf = HBaseConfiguration.create();
1953      for (int i = 1; i < args.length; i++) {
1954        try {
1955          conf.set("fs.default.name", args[i]);
1956          conf.set("fs.defaultFS", args[i]);
1957          Path logPath = new Path(args[i]);
1958          split(conf, logPath);
1959        } catch (Throwable t) {
1960          t.printStackTrace(System.err);
1961          System.exit(-1);
1962        }
1963      }
1964    } else {
1965      usage();
1966      System.exit(-1);
1967    }
1968  }
1969}