001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.wal;
019
020import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DELIMITER;
021import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkArgument;
022import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkNotNull;
023
024import com.lmax.disruptor.RingBuffer;
025import java.io.FileNotFoundException;
026import java.io.IOException;
027import java.io.InterruptedIOException;
028import java.lang.management.MemoryType;
029import java.net.URLEncoder;
030import java.util.ArrayList;
031import java.util.Arrays;
032import java.util.Comparator;
033import java.util.List;
034import java.util.Map;
035import java.util.OptionalLong;
036import java.util.Set;
037import java.util.concurrent.ConcurrentNavigableMap;
038import java.util.concurrent.ConcurrentSkipListMap;
039import java.util.concurrent.CopyOnWriteArrayList;
040import java.util.concurrent.ExecutionException;
041import java.util.concurrent.TimeUnit;
042import java.util.concurrent.atomic.AtomicBoolean;
043import java.util.concurrent.atomic.AtomicInteger;
044import java.util.concurrent.atomic.AtomicLong;
045import java.util.concurrent.locks.ReentrantLock;
046import org.apache.commons.lang3.mutable.MutableLong;
047import org.apache.hadoop.conf.Configuration;
048import org.apache.hadoop.fs.FileStatus;
049import org.apache.hadoop.fs.FileSystem;
050import org.apache.hadoop.fs.Path;
051import org.apache.hadoop.fs.PathFilter;
052import org.apache.hadoop.hbase.Cell;
053import org.apache.hadoop.hbase.HBaseConfiguration;
054import org.apache.hadoop.hbase.HConstants;
055import org.apache.hadoop.hbase.PrivateCellUtil;
056import org.apache.hadoop.hbase.client.RegionInfo;
057import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
058import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
059import org.apache.hadoop.hbase.ipc.RpcServer;
060import org.apache.hadoop.hbase.ipc.ServerCall;
061import org.apache.hadoop.hbase.log.HBaseMarkers;
062import org.apache.hadoop.hbase.regionserver.HRegion;
063import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
064import org.apache.hadoop.hbase.trace.TraceUtil;
065import org.apache.hadoop.hbase.util.Bytes;
066import org.apache.hadoop.hbase.util.CommonFSUtils;
067import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
068import org.apache.hadoop.hbase.util.FSUtils;
069import org.apache.hadoop.hbase.util.Pair;
070import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
071import org.apache.hadoop.hbase.wal.WAL;
072import org.apache.hadoop.hbase.wal.WALEdit;
073import org.apache.hadoop.hbase.wal.WALFactory;
074import org.apache.hadoop.hbase.wal.WALKeyImpl;
075import org.apache.hadoop.hbase.wal.WALPrettyPrinter;
076import org.apache.hadoop.hbase.wal.WALProvider.WriterBase;
077import org.apache.hadoop.hbase.wal.WALSplitter;
078import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
079import org.apache.hadoop.util.StringUtils;
080import org.apache.htrace.core.TraceScope;
081import org.apache.yetus.audience.InterfaceAudience;
082import org.slf4j.Logger;
083import org.slf4j.LoggerFactory;
084
085import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
086
087/**
088 * Implementation of {@link WAL} to go against {@link FileSystem}; i.e. keep WALs in HDFS. Only one
089 * WAL is ever being written at a time. When a WAL hits a configured maximum size, it is rolled.
090 * This is done internal to the implementation.
091 * <p>
092 * As data is flushed from the MemStore to other on-disk structures (files sorted by key, hfiles), a
093 * WAL becomes obsolete. We can let go of all the log edits/entries for a given HRegion-sequence id.
094 * A bunch of work in the below is done keeping account of these region sequence ids -- what is
095 * flushed out to hfiles, and what is yet in WAL and in memory only.
096 * <p>
097 * It is only practical to delete entire files. Thus, we delete an entire on-disk file
098 * <code>F</code> when all of the edits in <code>F</code> have a log-sequence-id that's older
099 * (smaller) than the most-recent flush.
100 * <p>
101 * To read an WAL, call
102 * {@link WALFactory#createReader(org.apache.hadoop.fs.FileSystem, org.apache.hadoop.fs.Path)}. *
103 * <h2>Failure Semantic</h2> If an exception on append or sync, roll the WAL because the current WAL
104 * is now a lame duck; any more appends or syncs will fail also with the same original exception. If
105 * we have made successful appends to the WAL and we then are unable to sync them, our current
106 * semantic is to return error to the client that the appends failed but also to abort the current
107 * context, usually the hosting server. We need to replay the WALs. <br>
108 * TODO: Change this semantic. A roll of WAL may be sufficient as long as we have flagged client
109 * that the append failed. <br>
110 * TODO: replication may pick up these last edits though they have been marked as failed append
111 * (Need to keep our own file lengths, not rely on HDFS).
112 */
113@InterfaceAudience.Private
114public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
115
116  private static final Logger LOG = LoggerFactory.getLogger(AbstractFSWAL.class);
117
118  protected static final int DEFAULT_SLOW_SYNC_TIME_MS = 100; // in ms
119
120  private static final int DEFAULT_WAL_SYNC_TIMEOUT_MS = 5 * 60 * 1000; // in ms, 5min
121
122  /**
123   * file system instance
124   */
125  protected final FileSystem fs;
126
127  /**
128   * WAL directory, where all WAL files would be placed.
129   */
130  protected final Path walDir;
131
132  /**
133   * dir path where old logs are kept.
134   */
135  protected final Path walArchiveDir;
136
137  /**
138   * Matches just those wal files that belong to this wal instance.
139   */
140  protected final PathFilter ourFiles;
141
142  /**
143   * Prefix of a WAL file, usually the region server name it is hosted on.
144   */
145  protected final String walFilePrefix;
146
147  /**
148   * Suffix included on generated wal file names
149   */
150  protected final String walFileSuffix;
151
152  /**
153   * Prefix used when checking for wal membership.
154   */
155  protected final String prefixPathStr;
156
157  protected final WALCoprocessorHost coprocessorHost;
158
159  /**
160   * conf object
161   */
162  protected final Configuration conf;
163
164  /** Listeners that are called on WAL events. */
165  protected final List<WALActionsListener> listeners = new CopyOnWriteArrayList<>();
166
167  /**
168   * Class that does accounting of sequenceids in WAL subsystem. Holds oldest outstanding sequence
169   * id as yet not flushed as well as the most recent edit sequence id appended to the WAL. Has
170   * facility for answering questions such as "Is it safe to GC a WAL?".
171   */
172  protected final SequenceIdAccounting sequenceIdAccounting = new SequenceIdAccounting();
173
174  protected final long slowSyncNs;
175
176  private final long walSyncTimeoutNs;
177
178  // If > than this size, roll the log.
179  protected final long logrollsize;
180
181  /**
182   * Block size to use writing files.
183   */
184  protected final long blocksize;
185
186  /*
187   * If more than this many logs, force flush of oldest region to oldest edit goes to disk. If too
188   * many and we crash, then will take forever replaying. Keep the number of logs tidy.
189   */
190  protected final int maxLogs;
191
192  protected final boolean useHsync;
193
194  /**
195   * This lock makes sure only one log roll runs at a time. Should not be taken while any other lock
196   * is held. We don't just use synchronized because that results in bogus and tedious findbugs
197   * warning when it thinks synchronized controls writer thread safety. It is held when we are
198   * actually rolling the log. It is checked when we are looking to see if we should roll the log or
199   * not.
200   */
201  protected final ReentrantLock rollWriterLock = new ReentrantLock(true);
202
203  // The timestamp (in ms) when the log file was created.
204  protected final AtomicLong filenum = new AtomicLong(-1);
205
206  // Number of transactions in the current Wal.
207  protected final AtomicInteger numEntries = new AtomicInteger(0);
208
209  /**
210   * The highest known outstanding unsync'd WALEdit transaction id. Usually, we use a queue to pass
211   * WALEdit to background consumer thread, and the transaction id is the sequence number of the
212   * corresponding entry in queue.
213   */
214  protected volatile long highestUnsyncedTxid = -1;
215
216  /**
217   * Updated to the transaction id of the last successful sync call. This can be less than
218   * {@link #highestUnsyncedTxid} for case where we have an append where a sync has not yet come in
219   * for it.
220   */
221  protected final AtomicLong highestSyncedTxid = new AtomicLong(0);
222
223  /**
224   * The total size of wal
225   */
226  protected final AtomicLong totalLogSize = new AtomicLong(0);
227  /**
228   * Current log file.
229   */
230  volatile W writer;
231
232  // Last time to check low replication on hlog's pipeline
233  private long lastTimeCheckLowReplication = EnvironmentEdgeManager.currentTime();
234
235  protected volatile boolean closed = false;
236
237  protected final AtomicBoolean shutdown = new AtomicBoolean(false);
238  /**
239   * WAL Comparator; it compares the timestamp (log filenum), present in the log file name. Throws
240   * an IllegalArgumentException if used to compare paths from different wals.
241   */
242  final Comparator<Path> LOG_NAME_COMPARATOR =
243    (o1, o2) -> Long.compare(getFileNumFromFileName(o1), getFileNumFromFileName(o2));
244
245  private static final class WalProps {
246
247    /**
248     * Map the encoded region name to the highest sequence id.
249     * <p/>Contains all the regions it has an entry for.
250     */
251    public final Map<byte[], Long> encodedName2HighestSequenceId;
252
253    /**
254     * The log file size. Notice that the size may not be accurate if we do asynchronous close in
255     * sub classes.
256     */
257    public final long logSize;
258
259    public WalProps(Map<byte[], Long> encodedName2HighestSequenceId, long logSize) {
260      this.encodedName2HighestSequenceId = encodedName2HighestSequenceId;
261      this.logSize = logSize;
262    }
263  }
264
265  /**
266   * Map of WAL log file to properties. The map is sorted by the log file creation timestamp
267   * (contained in the log file name).
268   */
269  protected ConcurrentNavigableMap<Path, WalProps> walFile2Props =
270    new ConcurrentSkipListMap<>(LOG_NAME_COMPARATOR);
271
272  /**
273   * Map of {@link SyncFuture}s owned by Thread objects. Used so we reuse SyncFutures.
274   * Thread local is used so JVM can GC the terminated thread for us. See HBASE-21228
275   * <p>
276   */
277  private final ThreadLocal<SyncFuture> cachedSyncFutures;
278
279  /**
280   * The class name of the runtime implementation, used as prefix for logging/tracing.
281   * <p>
282   * Performance testing shows getClass().getSimpleName() might be a bottleneck so we store it here,
283   * refer to HBASE-17676 for more details
284   * </p>
285   */
286  protected final String implClassName;
287
288  protected final AtomicBoolean rollRequested = new AtomicBoolean(false);
289
290  public long getFilenum() {
291    return this.filenum.get();
292  }
293
294  /**
295   * A log file has a creation timestamp (in ms) in its file name ({@link #filenum}. This helper
296   * method returns the creation timestamp from a given log file. It extracts the timestamp assuming
297   * the filename is created with the {@link #computeFilename(long filenum)} method.
298   * @return timestamp, as in the log file name.
299   */
300  protected long getFileNumFromFileName(Path fileName) {
301    checkNotNull(fileName, "file name can't be null");
302    if (!ourFiles.accept(fileName)) {
303      throw new IllegalArgumentException(
304          "The log file " + fileName + " doesn't belong to this WAL. (" + toString() + ")");
305    }
306    final String fileNameString = fileName.toString();
307    String chompedPath = fileNameString.substring(prefixPathStr.length(),
308      (fileNameString.length() - walFileSuffix.length()));
309    return Long.parseLong(chompedPath);
310  }
311
312  private int calculateMaxLogFiles(Configuration conf, long logRollSize) {
313    Pair<Long, MemoryType> globalMemstoreSize = MemorySizeUtil.getGlobalMemStoreSize(conf);
314    return (int) ((globalMemstoreSize.getFirst() * 2) / logRollSize);
315  }
316
317  // must be power of 2
318  protected final int getPreallocatedEventCount() {
319    // Preallocate objects to use on the ring buffer. The way that appends and syncs work, we will
320    // be stuck and make no progress if the buffer is filled with appends only and there is no
321    // sync. If no sync, then the handlers will be outstanding just waiting on sync completion
322    // before they return.
323    int preallocatedEventCount =
324      this.conf.getInt("hbase.regionserver.wal.disruptor.event.count", 1024 * 16);
325    checkArgument(preallocatedEventCount >= 0,
326      "hbase.regionserver.wal.disruptor.event.count must > 0");
327    int floor = Integer.highestOneBit(preallocatedEventCount);
328    if (floor == preallocatedEventCount) {
329      return floor;
330    }
331    // max capacity is 1 << 30
332    if (floor >= 1 << 29) {
333      return 1 << 30;
334    }
335    return floor << 1;
336  }
337
338  protected AbstractFSWAL(final FileSystem fs, final Path rootDir, final String logDir,
339      final String archiveDir, final Configuration conf, final List<WALActionsListener> listeners,
340      final boolean failIfWALExists, final String prefix, final String suffix)
341      throws FailedLogCloseException, IOException {
342    this.fs = fs;
343    this.walDir = new Path(rootDir, logDir);
344    this.walArchiveDir = new Path(rootDir, archiveDir);
345    this.conf = conf;
346
347    if (!fs.exists(walDir) && !fs.mkdirs(walDir)) {
348      throw new IOException("Unable to mkdir " + walDir);
349    }
350
351    if (!fs.exists(this.walArchiveDir)) {
352      if (!fs.mkdirs(this.walArchiveDir)) {
353        throw new IOException("Unable to mkdir " + this.walArchiveDir);
354      }
355    }
356
357    // If prefix is null||empty then just name it wal
358    this.walFilePrefix =
359      prefix == null || prefix.isEmpty() ? "wal" : URLEncoder.encode(prefix, "UTF8");
360    // we only correctly differentiate suffices when numeric ones start with '.'
361    if (suffix != null && !(suffix.isEmpty()) && !(suffix.startsWith(WAL_FILE_NAME_DELIMITER))) {
362      throw new IllegalArgumentException("WAL suffix must start with '" + WAL_FILE_NAME_DELIMITER +
363        "' but instead was '" + suffix + "'");
364    }
365    // Now that it exists, set the storage policy for the entire directory of wal files related to
366    // this FSHLog instance
367    String storagePolicy =
368        conf.get(HConstants.WAL_STORAGE_POLICY, HConstants.DEFAULT_WAL_STORAGE_POLICY);
369    CommonFSUtils.setStoragePolicy(fs, this.walDir, storagePolicy);
370    this.walFileSuffix = (suffix == null) ? "" : URLEncoder.encode(suffix, "UTF8");
371    this.prefixPathStr = new Path(walDir, walFilePrefix + WAL_FILE_NAME_DELIMITER).toString();
372
373    this.ourFiles = new PathFilter() {
374      @Override
375      public boolean accept(final Path fileName) {
376        // The path should start with dir/<prefix> and end with our suffix
377        final String fileNameString = fileName.toString();
378        if (!fileNameString.startsWith(prefixPathStr)) {
379          return false;
380        }
381        if (walFileSuffix.isEmpty()) {
382          // in the case of the null suffix, we need to ensure the filename ends with a timestamp.
383          return org.apache.commons.lang3.StringUtils
384              .isNumeric(fileNameString.substring(prefixPathStr.length()));
385        } else if (!fileNameString.endsWith(walFileSuffix)) {
386          return false;
387        }
388        return true;
389      }
390    };
391
392    if (failIfWALExists) {
393      final FileStatus[] walFiles = CommonFSUtils.listStatus(fs, walDir, ourFiles);
394      if (null != walFiles && 0 != walFiles.length) {
395        throw new IOException("Target WAL already exists within directory " + walDir);
396      }
397    }
398
399    // Register listeners. TODO: Should this exist anymore? We have CPs?
400    if (listeners != null) {
401      for (WALActionsListener i : listeners) {
402        registerWALActionsListener(i);
403      }
404    }
405    this.coprocessorHost = new WALCoprocessorHost(this, conf);
406
407    // Schedule a WAL roll when the WAL is 50% of the HDFS block size. Scheduling at 50% of block
408    // size should make it so WAL rolls before we get to the end-of-block (Block transitions cost
409    // some latency). In hbase-1 we did this differently. We scheduled a roll when we hit 95% of
410    // the block size but experience from the field has it that this was not enough time for the
411    // roll to happen before end-of-block. So the new accounting makes WALs of about the same
412    // size as those made in hbase-1 (to prevent surprise), we now have default block size as
413    // 2 times the DFS default: i.e. 2 * DFS default block size rolling at 50% full will generally
414    // make similar size logs to 1 * DFS default block size rolling at 95% full. See HBASE-19148.
415    this.blocksize = WALUtil.getWALBlockSize(this.conf, this.fs, this.walDir);
416    float multiplier = conf.getFloat("hbase.regionserver.logroll.multiplier", 0.5f);
417    this.logrollsize = (long)(this.blocksize * multiplier);
418    this.maxLogs = conf.getInt("hbase.regionserver.maxlogs",
419      Math.max(32, calculateMaxLogFiles(conf, logrollsize)));
420
421    LOG.info("WAL configuration: blocksize=" + StringUtils.byteDesc(blocksize) + ", rollsize=" +
422      StringUtils.byteDesc(this.logrollsize) + ", prefix=" + this.walFilePrefix + ", suffix=" +
423      walFileSuffix + ", logDir=" + this.walDir + ", archiveDir=" + this.walArchiveDir);
424    this.slowSyncNs = TimeUnit.MILLISECONDS
425        .toNanos(conf.getInt("hbase.regionserver.hlog.slowsync.ms", DEFAULT_SLOW_SYNC_TIME_MS));
426    this.walSyncTimeoutNs = TimeUnit.MILLISECONDS
427        .toNanos(conf.getLong("hbase.regionserver.hlog.sync.timeout", DEFAULT_WAL_SYNC_TIMEOUT_MS));
428    this.cachedSyncFutures = new ThreadLocal<SyncFuture>() {
429      @Override
430      protected SyncFuture initialValue() {
431        return new SyncFuture();
432      }
433    };
434    this.implClassName = getClass().getSimpleName();
435    this.useHsync = conf.getBoolean(HRegion.WAL_HSYNC_CONF_KEY, HRegion.DEFAULT_WAL_HSYNC);
436  }
437
438  /**
439   * Used to initialize the WAL. Usually just call rollWriter to create the first log writer.
440   */
441  public void init() throws IOException {
442    rollWriter();
443  }
444
445  @Override
446  public void registerWALActionsListener(WALActionsListener listener) {
447    this.listeners.add(listener);
448  }
449
450  @Override
451  public boolean unregisterWALActionsListener(WALActionsListener listener) {
452    return this.listeners.remove(listener);
453  }
454
455  @Override
456  public WALCoprocessorHost getCoprocessorHost() {
457    return coprocessorHost;
458  }
459
460  @Override
461  public Long startCacheFlush(byte[] encodedRegionName, Set<byte[]> families) {
462    return this.sequenceIdAccounting.startCacheFlush(encodedRegionName, families);
463  }
464
465  @Override
466  public Long startCacheFlush(byte[] encodedRegionName, Map<byte[], Long> familyToSeq) {
467    return this.sequenceIdAccounting.startCacheFlush(encodedRegionName, familyToSeq);
468  }
469
470  @Override
471  public void completeCacheFlush(byte[] encodedRegionName) {
472    this.sequenceIdAccounting.completeCacheFlush(encodedRegionName);
473  }
474
475  @Override
476  public void abortCacheFlush(byte[] encodedRegionName) {
477    this.sequenceIdAccounting.abortCacheFlush(encodedRegionName);
478  }
479
480  @Override
481  public long getEarliestMemStoreSeqNum(byte[] encodedRegionName) {
482    // Used by tests. Deprecated as too subtle for general usage.
483    return this.sequenceIdAccounting.getLowestSequenceId(encodedRegionName);
484  }
485
486  @Override
487  public long getEarliestMemStoreSeqNum(byte[] encodedRegionName, byte[] familyName) {
488    // This method is used by tests and for figuring if we should flush or not because our
489    // sequenceids are too old. It is also used reporting the master our oldest sequenceid for use
490    // figuring what edits can be skipped during log recovery. getEarliestMemStoreSequenceId
491    // from this.sequenceIdAccounting is looking first in flushingOldestStoreSequenceIds, the
492    // currently flushing sequence ids, and if anything found there, it is returning these. This is
493    // the right thing to do for the reporting oldest sequenceids to master; we won't skip edits if
494    // we crash during the flush. For figuring what to flush, we might get requeued if our sequence
495    // id is old even though we are currently flushing. This may mean we do too much flushing.
496    return this.sequenceIdAccounting.getLowestSequenceId(encodedRegionName, familyName);
497  }
498
499  @Override
500  public byte[][] rollWriter() throws FailedLogCloseException, IOException {
501    return rollWriter(false);
502  }
503
504  /**
505   * This is a convenience method that computes a new filename with a given file-number.
506   * @param filenum to use
507   * @return Path
508   */
509  protected Path computeFilename(final long filenum) {
510    if (filenum < 0) {
511      throw new RuntimeException("WAL file number can't be < 0");
512    }
513    String child = walFilePrefix + WAL_FILE_NAME_DELIMITER + filenum + walFileSuffix;
514    return new Path(walDir, child);
515  }
516
517  /**
518   * This is a convenience method that computes a new filename with a given using the current WAL
519   * file-number
520   * @return Path
521   */
522  public Path getCurrentFileName() {
523    return computeFilename(this.filenum.get());
524  }
525
526  /**
527   * retrieve the next path to use for writing. Increments the internal filenum.
528   */
529  private Path getNewPath() throws IOException {
530    this.filenum.set(System.currentTimeMillis());
531    Path newPath = getCurrentFileName();
532    while (fs.exists(newPath)) {
533      this.filenum.incrementAndGet();
534      newPath = getCurrentFileName();
535    }
536    return newPath;
537  }
538
539  @VisibleForTesting
540  Path getOldPath() {
541    long currentFilenum = this.filenum.get();
542    Path oldPath = null;
543    if (currentFilenum > 0) {
544      // ComputeFilename will take care of meta wal filename
545      oldPath = computeFilename(currentFilenum);
546    } // I presume if currentFilenum is <= 0, this is first file and null for oldPath if fine?
547    return oldPath;
548  }
549
550  /**
551   * Tell listeners about pre log roll.
552   */
553  private void tellListenersAboutPreLogRoll(final Path oldPath, final Path newPath)
554      throws IOException {
555    coprocessorHost.preWALRoll(oldPath, newPath);
556
557    if (!this.listeners.isEmpty()) {
558      for (WALActionsListener i : this.listeners) {
559        i.preLogRoll(oldPath, newPath);
560      }
561    }
562  }
563
564  /**
565   * Tell listeners about post log roll.
566   */
567  private void tellListenersAboutPostLogRoll(final Path oldPath, final Path newPath)
568      throws IOException {
569    if (!this.listeners.isEmpty()) {
570      for (WALActionsListener i : this.listeners) {
571        i.postLogRoll(oldPath, newPath);
572      }
573    }
574
575    coprocessorHost.postWALRoll(oldPath, newPath);
576  }
577
578  // public only until class moves to o.a.h.h.wal
579  /** @return the number of rolled log files */
580  public int getNumRolledLogFiles() {
581    return walFile2Props.size();
582  }
583
584  // public only until class moves to o.a.h.h.wal
585  /** @return the number of log files in use */
586  public int getNumLogFiles() {
587    // +1 for current use log
588    return getNumRolledLogFiles() + 1;
589  }
590
591  /**
592   * If the number of un-archived WAL files ('live' WALs) is greater than maximum allowed,
593   * check the first (oldest) WAL, and return those regions which should be flushed so that
594   * it can be let-go/'archived'.
595   * @return regions (encodedRegionNames) to flush in order to archive oldest WAL file.
596   */
597  byte[][] findRegionsToForceFlush() throws IOException {
598    byte[][] regions = null;
599    int logCount = getNumRolledLogFiles();
600    if (logCount > this.maxLogs && logCount > 0) {
601      Map.Entry<Path, WalProps> firstWALEntry = this.walFile2Props.firstEntry();
602      regions =
603        this.sequenceIdAccounting.findLower(firstWALEntry.getValue().encodedName2HighestSequenceId);
604    }
605    if (regions != null) {
606      StringBuilder sb = new StringBuilder();
607      for (int i = 0; i < regions.length; i++) {
608        if (i > 0) {
609          sb.append(", ");
610        }
611        sb.append(Bytes.toStringBinary(regions[i]));
612      }
613      LOG.info("Too many WALs; count=" + logCount + ", max=" + this.maxLogs +
614        "; forcing flush of " + regions.length + " regions(s): " + sb.toString());
615    }
616    return regions;
617  }
618
619  /**
620   * Archive old logs. A WAL is eligible for archiving if all its WALEdits have been flushed.
621   */
622  private void cleanOldLogs() throws IOException {
623    List<Pair<Path, Long>> logsToArchive = null;
624    // For each log file, look at its Map of regions to highest sequence id; if all sequence ids
625    // are older than what is currently in memory, the WAL can be GC'd.
626    for (Map.Entry<Path, WalProps> e : this.walFile2Props.entrySet()) {
627      Path log = e.getKey();
628      Map<byte[], Long> sequenceNums = e.getValue().encodedName2HighestSequenceId;
629      if (this.sequenceIdAccounting.areAllLower(sequenceNums)) {
630        if (logsToArchive == null) {
631          logsToArchive = new ArrayList<>();
632        }
633        logsToArchive.add(Pair.newPair(log, e.getValue().logSize));
634        if (LOG.isTraceEnabled()) {
635          LOG.trace("WAL file ready for archiving " + log);
636        }
637      }
638    }
639    if (logsToArchive != null) {
640      for (Pair<Path, Long> logAndSize : logsToArchive) {
641        this.totalLogSize.addAndGet(-logAndSize.getSecond());
642        archiveLogFile(logAndSize.getFirst());
643        this.walFile2Props.remove(logAndSize.getFirst());
644      }
645    }
646  }
647
648  /*
649   * only public so WALSplitter can use.
650   * @return archived location of a WAL file with the given path p
651   */
652  public static Path getWALArchivePath(Path archiveDir, Path p) {
653    return new Path(archiveDir, p.getName());
654  }
655
656  private void archiveLogFile(final Path p) throws IOException {
657    Path newPath = getWALArchivePath(this.walArchiveDir, p);
658    // Tell our listeners that a log is going to be archived.
659    if (!this.listeners.isEmpty()) {
660      for (WALActionsListener i : this.listeners) {
661        i.preLogArchive(p, newPath);
662      }
663    }
664    LOG.info("Archiving " + p + " to " + newPath);
665    if (!CommonFSUtils.renameAndSetModifyTime(this.fs, p, newPath)) {
666      throw new IOException("Unable to rename " + p + " to " + newPath);
667    }
668    // Tell our listeners that a log has been archived.
669    if (!this.listeners.isEmpty()) {
670      for (WALActionsListener i : this.listeners) {
671        i.postLogArchive(p, newPath);
672      }
673    }
674  }
675
676  protected final void logRollAndSetupWalProps(Path oldPath, Path newPath, long oldFileLen) {
677    int oldNumEntries = this.numEntries.getAndSet(0);
678    String newPathString = newPath != null ? CommonFSUtils.getPath(newPath) : null;
679    if (oldPath != null) {
680      this.walFile2Props.put(oldPath,
681        new WalProps(this.sequenceIdAccounting.resetHighest(), oldFileLen));
682      this.totalLogSize.addAndGet(oldFileLen);
683      LOG.info("Rolled WAL {} with entries={}, filesize={}; new WAL {}",
684        CommonFSUtils.getPath(oldPath), oldNumEntries, StringUtils.byteDesc(oldFileLen),
685        newPathString);
686    } else {
687      LOG.info("New WAL {}", newPathString);
688    }
689  }
690
691  /**
692   * Cleans up current writer closing it and then puts in place the passed in {@code nextWriter}.
693   * <p/>
694   * <ul>
695   * <li>In the case of creating a new WAL, oldPath will be null.</li>
696   * <li>In the case of rolling over from one file to the next, none of the parameters will be null.
697   * </li>
698   * <li>In the case of closing out this FSHLog with no further use newPath and nextWriter will be
699   * null.</li>
700   * </ul>
701   * @param oldPath may be null
702   * @param newPath may be null
703   * @param nextWriter may be null
704   * @return the passed in <code>newPath</code>
705   * @throws IOException if there is a problem flushing or closing the underlying FS
706   */
707  @VisibleForTesting
708  Path replaceWriter(Path oldPath, Path newPath, W nextWriter) throws IOException {
709    try (TraceScope scope = TraceUtil.createTrace("FSHFile.replaceWriter")) {
710      doReplaceWriter(oldPath, newPath, nextWriter);
711      return newPath;
712    }
713  }
714
715  protected final void blockOnSync(SyncFuture syncFuture) throws IOException {
716    // Now we have published the ringbuffer, halt the current thread until we get an answer back.
717    try {
718      if (syncFuture != null) {
719        if (closed) {
720          throw new IOException("WAL has been closed");
721        } else {
722          syncFuture.get(walSyncTimeoutNs);
723        }
724      }
725    } catch (TimeoutIOException tioe) {
726      // SyncFuture reuse by thread, if TimeoutIOException happens, ringbuffer
727      // still refer to it, so if this thread use it next time may get a wrong
728      // result.
729      this.cachedSyncFutures.remove();
730      throw tioe;
731    } catch (InterruptedException ie) {
732      LOG.warn("Interrupted", ie);
733      throw convertInterruptedExceptionToIOException(ie);
734    } catch (ExecutionException e) {
735      throw ensureIOException(e.getCause());
736    }
737  }
738
739  private static IOException ensureIOException(final Throwable t) {
740    return (t instanceof IOException) ? (IOException) t : new IOException(t);
741  }
742
743  private IOException convertInterruptedExceptionToIOException(final InterruptedException ie) {
744    Thread.currentThread().interrupt();
745    IOException ioe = new InterruptedIOException();
746    ioe.initCause(ie);
747    return ioe;
748  }
749
750  @Override
751  public byte[][] rollWriter(boolean force) throws FailedLogCloseException, IOException {
752    rollWriterLock.lock();
753    try {
754      // Return if nothing to flush.
755      if (!force && this.writer != null && this.numEntries.get() <= 0) {
756        return null;
757      }
758      byte[][] regionsToFlush = null;
759      if (this.closed) {
760        LOG.debug("WAL closed. Skipping rolling of writer");
761        return regionsToFlush;
762      }
763      try (TraceScope scope = TraceUtil.createTrace("FSHLog.rollWriter")) {
764        Path oldPath = getOldPath();
765        Path newPath = getNewPath();
766        // Any exception from here on is catastrophic, non-recoverable so we currently abort.
767        W nextWriter = this.createWriterInstance(newPath);
768        tellListenersAboutPreLogRoll(oldPath, newPath);
769        // NewPath could be equal to oldPath if replaceWriter fails.
770        newPath = replaceWriter(oldPath, newPath, nextWriter);
771        tellListenersAboutPostLogRoll(oldPath, newPath);
772        if (LOG.isDebugEnabled()) {
773          LOG.debug("Create new " + implClassName + " writer with pipeline: " +
774            Arrays.toString(getPipeline()));
775        }
776        // Can we delete any of the old log files?
777        if (getNumRolledLogFiles() > 0) {
778          cleanOldLogs();
779          regionsToFlush = findRegionsToForceFlush();
780        }
781      } catch (CommonFSUtils.StreamLacksCapabilityException exception) {
782        // If the underlying FileSystem can't do what we ask, treat as IO failure so
783        // we'll abort.
784        throw new IOException(
785            "Underlying FileSystem can't meet stream requirements. See RS log " + "for details.",
786            exception);
787      }
788      return regionsToFlush;
789    } finally {
790      rollWriterLock.unlock();
791    }
792  }
793
794  // public only until class moves to o.a.h.h.wal
795  /** @return the size of log files in use */
796  public long getLogFileSize() {
797    return this.totalLogSize.get();
798  }
799
800  // public only until class moves to o.a.h.h.wal
801  public void requestLogRoll() {
802    requestLogRoll(false);
803  }
804
805  /**
806   * Get the backing files associated with this WAL.
807   * @return may be null if there are no files.
808   */
809  @VisibleForTesting
810  FileStatus[] getFiles() throws IOException {
811    return CommonFSUtils.listStatus(fs, walDir, ourFiles);
812  }
813
814  @Override
815  public void shutdown() throws IOException {
816    if (!shutdown.compareAndSet(false, true)) {
817      return;
818    }
819    closed = true;
820    // Tell our listeners that the log is closing
821    if (!this.listeners.isEmpty()) {
822      for (WALActionsListener i : this.listeners) {
823        i.logCloseRequested();
824      }
825    }
826    rollWriterLock.lock();
827    try {
828      doShutdown();
829    } finally {
830      rollWriterLock.unlock();
831    }
832  }
833
834  @Override
835  public void close() throws IOException {
836    shutdown();
837    final FileStatus[] files = getFiles();
838    if (null != files && 0 != files.length) {
839      for (FileStatus file : files) {
840        Path p = getWALArchivePath(this.walArchiveDir, file.getPath());
841        // Tell our listeners that a log is going to be archived.
842        if (!this.listeners.isEmpty()) {
843          for (WALActionsListener i : this.listeners) {
844            i.preLogArchive(file.getPath(), p);
845          }
846        }
847
848        if (!CommonFSUtils.renameAndSetModifyTime(fs, file.getPath(), p)) {
849          throw new IOException("Unable to rename " + file.getPath() + " to " + p);
850        }
851        // Tell our listeners that a log was archived.
852        if (!this.listeners.isEmpty()) {
853          for (WALActionsListener i : this.listeners) {
854            i.postLogArchive(file.getPath(), p);
855          }
856        }
857      }
858      LOG.debug(
859        "Moved " + files.length + " WAL file(s) to " + CommonFSUtils.getPath(this.walArchiveDir));
860    }
861    LOG.info("Closed WAL: " + toString());
862  }
863
864  /**
865   * updates the sequence number of a specific store. depending on the flag: replaces current seq
866   * number if the given seq id is bigger, or even if it is lower than existing one
867   */
868  @Override
869  public void updateStore(byte[] encodedRegionName, byte[] familyName, Long sequenceid,
870      boolean onlyIfGreater) {
871    sequenceIdAccounting.updateStore(encodedRegionName, familyName, sequenceid, onlyIfGreater);
872  }
873
874  protected final SyncFuture getSyncFuture(long sequence, boolean forceSync) {
875    return cachedSyncFutures.get().reset(sequence).setForceSync(forceSync);
876  }
877
878  protected boolean isLogRollRequested() {
879    return rollRequested.get();
880  }
881
882  protected final void requestLogRoll(boolean tooFewReplicas) {
883    // If we have already requested a roll, don't do it again
884    // And only set rollRequested to true when there is a registered listener
885    if (!this.listeners.isEmpty() && rollRequested.compareAndSet(false, true)) {
886      for (WALActionsListener i : this.listeners) {
887        i.logRollRequested(tooFewReplicas);
888      }
889    }
890  }
891
892  long getUnflushedEntriesCount() {
893    long highestSynced = this.highestSyncedTxid.get();
894    long highestUnsynced = this.highestUnsyncedTxid;
895    return highestSynced >= highestUnsynced ? 0 : highestUnsynced - highestSynced;
896  }
897
898  boolean isUnflushedEntries() {
899    return getUnflushedEntriesCount() > 0;
900  }
901
902  /**
903   * Exposed for testing only. Use to tricks like halt the ring buffer appending.
904   */
905  @VisibleForTesting
906  protected void atHeadOfRingBufferEventHandlerAppend() {
907    // Noop
908  }
909
910  protected final boolean appendEntry(W writer, FSWALEntry entry) throws IOException {
911    // TODO: WORK ON MAKING THIS APPEND FASTER. DOING WAY TOO MUCH WORK WITH CPs, PBing, etc.
912    atHeadOfRingBufferEventHandlerAppend();
913    long start = EnvironmentEdgeManager.currentTime();
914    byte[] encodedRegionName = entry.getKey().getEncodedRegionName();
915    long regionSequenceId = entry.getKey().getSequenceId();
916
917    // Edits are empty, there is nothing to append. Maybe empty when we are looking for a
918    // region sequence id only, a region edit/sequence id that is not associated with an actual
919    // edit. It has to go through all the rigmarole to be sure we have the right ordering.
920    if (entry.getEdit().isEmpty()) {
921      return false;
922    }
923
924    // Coprocessor hook.
925    coprocessorHost.preWALWrite(entry.getRegionInfo(), entry.getKey(), entry.getEdit());
926    if (!listeners.isEmpty()) {
927      for (WALActionsListener i : listeners) {
928        i.visitLogEntryBeforeWrite(entry.getKey(), entry.getEdit());
929      }
930    }
931    doAppend(writer, entry);
932    assert highestUnsyncedTxid < entry.getTxid();
933    highestUnsyncedTxid = entry.getTxid();
934    if (entry.isCloseRegion()) {
935      // let's clean all the records of this region
936      sequenceIdAccounting.onRegionClose(encodedRegionName);
937    } else {
938      sequenceIdAccounting.update(encodedRegionName, entry.getFamilyNames(), regionSequenceId,
939        entry.isInMemStore());
940    }
941    coprocessorHost.postWALWrite(entry.getRegionInfo(), entry.getKey(), entry.getEdit());
942    // Update metrics.
943    postAppend(entry, EnvironmentEdgeManager.currentTime() - start);
944    numEntries.incrementAndGet();
945    return true;
946  }
947
948  private long postAppend(final Entry e, final long elapsedTime) throws IOException {
949    long len = 0;
950    if (!listeners.isEmpty()) {
951      for (Cell cell : e.getEdit().getCells()) {
952        len += PrivateCellUtil.estimatedSerializedSizeOf(cell);
953      }
954      for (WALActionsListener listener : listeners) {
955        listener.postAppend(len, elapsedTime, e.getKey(), e.getEdit());
956      }
957    }
958    return len;
959  }
960
961  protected final void postSync(final long timeInNanos, final int handlerSyncs) {
962    if (timeInNanos > this.slowSyncNs) {
963      String msg = new StringBuilder().append("Slow sync cost: ").append(timeInNanos / 1000000)
964          .append(" ms, current pipeline: ").append(Arrays.toString(getPipeline())).toString();
965      TraceUtil.addTimelineAnnotation(msg);
966      LOG.info(msg);
967    }
968    if (!listeners.isEmpty()) {
969      for (WALActionsListener listener : listeners) {
970        listener.postSync(timeInNanos, handlerSyncs);
971      }
972    }
973  }
974
975  protected final long stampSequenceIdAndPublishToRingBuffer(RegionInfo hri, WALKeyImpl key,
976    WALEdit edits, boolean inMemstore, RingBuffer<RingBufferTruck> ringBuffer)
977    throws IOException {
978    if (this.closed) {
979      throw new IOException(
980        "Cannot append; log is closed, regionName = " + hri.getRegionNameAsString());
981    }
982    MutableLong txidHolder = new MutableLong();
983    MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin(() -> {
984      txidHolder.setValue(ringBuffer.next());
985    });
986    long txid = txidHolder.longValue();
987    ServerCall<?> rpcCall = RpcServer.getCurrentCall().filter(c -> c instanceof ServerCall)
988      .filter(c -> c.getCellScanner() != null).map(c -> (ServerCall) c).orElse(null);
989    try (TraceScope scope = TraceUtil.createTrace(implClassName + ".append")) {
990      FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore, rpcCall);
991      entry.stampRegionSequenceId(we);
992      ringBuffer.get(txid).load(entry);
993    } finally {
994      ringBuffer.publish(txid);
995    }
996    return txid;
997  }
998
999  @Override
1000  public String toString() {
1001    return implClassName + " " + walFilePrefix + ":" + walFileSuffix + "(num " + filenum + ")";
1002  }
1003
1004  /**
1005   * if the given {@code path} is being written currently, then return its length.
1006   * <p>
1007   * This is used by replication to prevent replicating unacked log entries. See
1008   * https://issues.apache.org/jira/browse/HBASE-14004 for more details.
1009   */
1010  @Override
1011  public OptionalLong getLogFileSizeIfBeingWritten(Path path) {
1012    rollWriterLock.lock();
1013    try {
1014      Path currentPath = getOldPath();
1015      if (path.equals(currentPath)) {
1016        W writer = this.writer;
1017        return writer != null ? OptionalLong.of(writer.getLength()) : OptionalLong.empty();
1018      } else {
1019        return OptionalLong.empty();
1020      }
1021    } finally {
1022      rollWriterLock.unlock();
1023    }
1024  }
1025
1026  @Override
1027  public long appendData(RegionInfo info, WALKeyImpl key, WALEdit edits) throws IOException {
1028    return append(info, key, edits, true);
1029  }
1030
1031  @Override
1032  public long appendMarker(RegionInfo info, WALKeyImpl key, WALEdit edits)
1033    throws IOException {
1034    return append(info, key, edits, false);
1035  }
1036
1037  /**
1038   * Append a set of edits to the WAL.
1039   * <p/>
1040   * The WAL is not flushed/sync'd after this transaction completes BUT on return this edit must
1041   * have its region edit/sequence id assigned else it messes up our unification of mvcc and
1042   * sequenceid. On return <code>key</code> will have the region edit/sequence id filled in.
1043   * <p/>
1044   * NOTE: This append, at a time that is usually after this call returns, starts an mvcc
1045   * transaction by calling 'begin' wherein which we assign this update a sequenceid. At assignment
1046   * time, we stamp all the passed in Cells inside WALEdit with their sequenceId. You must
1047   * 'complete' the transaction this mvcc transaction by calling
1048   * MultiVersionConcurrencyControl#complete(...) or a variant otherwise mvcc will get stuck. Do it
1049   * in the finally of a try/finally block within which this append lives and any subsequent
1050   * operations like sync or update of memstore, etc. Get the WriteEntry to pass mvcc out of the
1051   * passed in WALKey <code>walKey</code> parameter. Be warned that the WriteEntry is not
1052   * immediately available on return from this method. It WILL be available subsequent to a sync of
1053   * this append; otherwise, you will just have to wait on the WriteEntry to get filled in.
1054   * @param info the regioninfo associated with append
1055   * @param key Modified by this call; we add to it this edits region edit/sequence id.
1056   * @param edits Edits to append. MAY CONTAIN NO EDITS for case where we want to get an edit
1057   *          sequence id that is after all currently appended edits.
1058   * @param inMemstore Always true except for case where we are writing a region event meta
1059   *          marker edit, for example, a compaction completion record into the WAL or noting a
1060   *          Region Open event. In these cases the entry is just so we can finish an unfinished
1061   *          compaction after a crash when the new Server reads the WAL on recovery, etc. These
1062   *          transition event 'Markers' do not go via the memstore. When memstore is false,
1063   *          we presume a Marker event edit.
1064   * @return Returns a 'transaction id' and <code>key</code> will have the region edit/sequence id
1065   *         in it.
1066   */
1067  protected abstract long append(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean inMemstore)
1068      throws IOException;
1069
1070  protected abstract void doAppend(W writer, FSWALEntry entry) throws IOException;
1071
1072  protected abstract W createWriterInstance(Path path)
1073      throws IOException, CommonFSUtils.StreamLacksCapabilityException;
1074
1075  /**
1076   * Notice that you need to clear the {@link #rollRequested} flag in this method, as the new writer
1077   * will begin to work before returning from this method. If we clear the flag after returning from
1078   * this call, we may miss a roll request. The implementation class should choose a proper place to
1079   * clear the {@link #rollRequested} flag so we do not miss a roll request, typically before you
1080   * start writing to the new writer.
1081   */
1082  protected abstract void doReplaceWriter(Path oldPath, Path newPath, W nextWriter)
1083      throws IOException;
1084
1085  protected abstract void doShutdown() throws IOException;
1086
1087  protected abstract boolean doCheckLogLowReplication();
1088
1089  public void checkLogLowReplication(long checkInterval) {
1090    long now = EnvironmentEdgeManager.currentTime();
1091    if (now - lastTimeCheckLowReplication < checkInterval) {
1092      return;
1093    }
1094    // Will return immediately if we are in the middle of a WAL log roll currently.
1095    if (!rollWriterLock.tryLock()) {
1096      return;
1097    }
1098    try {
1099      lastTimeCheckLowReplication = now;
1100      if (doCheckLogLowReplication()) {
1101        requestLogRoll(true);
1102      }
1103    } finally {
1104      rollWriterLock.unlock();
1105    }
1106  }
1107
1108  /**
1109   * This method gets the pipeline for the current WAL.
1110   */
1111  @VisibleForTesting
1112  abstract DatanodeInfo[] getPipeline();
1113
1114  /**
1115   * This method gets the datanode replication count for the current WAL.
1116   */
1117  @VisibleForTesting
1118  abstract int getLogReplication();
1119
1120  private static void split(final Configuration conf, final Path p) throws IOException {
1121    FileSystem fs = FSUtils.getWALFileSystem(conf);
1122    if (!fs.exists(p)) {
1123      throw new FileNotFoundException(p.toString());
1124    }
1125    if (!fs.getFileStatus(p).isDirectory()) {
1126      throw new IOException(p + " is not a directory");
1127    }
1128
1129    final Path baseDir = FSUtils.getWALRootDir(conf);
1130    Path archiveDir = new Path(baseDir, HConstants.HREGION_OLDLOGDIR_NAME);
1131    if (conf.getBoolean(AbstractFSWALProvider.SEPARATE_OLDLOGDIR,
1132      AbstractFSWALProvider.DEFAULT_SEPARATE_OLDLOGDIR)) {
1133      archiveDir = new Path(archiveDir, p.getName());
1134    }
1135    WALSplitter.split(baseDir, p, archiveDir, fs, conf, WALFactory.getInstance(conf));
1136  }
1137
1138  private static void usage() {
1139    System.err.println("Usage: AbstractFSWAL <ARGS>");
1140    System.err.println("Arguments:");
1141    System.err.println(" --dump  Dump textual representation of passed one or more files");
1142    System.err.println("         For example: " +
1143      "AbstractFSWAL --dump hdfs://example.com:9000/hbase/WALs/MACHINE/LOGFILE");
1144    System.err.println(" --split Split the passed directory of WAL logs");
1145    System.err.println(
1146      "         For example: AbstractFSWAL --split hdfs://example.com:9000/hbase/WALs/DIR");
1147  }
1148
1149  /**
1150   * Pass one or more log file names and it will either dump out a text version on
1151   * <code>stdout</code> or split the specified log files.
1152   */
1153  public static void main(String[] args) throws IOException {
1154    if (args.length < 2) {
1155      usage();
1156      System.exit(-1);
1157    }
1158    // either dump using the WALPrettyPrinter or split, depending on args
1159    if (args[0].compareTo("--dump") == 0) {
1160      WALPrettyPrinter.run(Arrays.copyOfRange(args, 1, args.length));
1161    } else if (args[0].compareTo("--perf") == 0) {
1162      LOG.error(HBaseMarkers.FATAL, "Please use the WALPerformanceEvaluation tool instead. i.e.:");
1163      LOG.error(HBaseMarkers.FATAL,
1164        "\thbase org.apache.hadoop.hbase.wal.WALPerformanceEvaluation --iterations " + args[1]);
1165      System.exit(-1);
1166    } else if (args[0].compareTo("--split") == 0) {
1167      Configuration conf = HBaseConfiguration.create();
1168      for (int i = 1; i < args.length; i++) {
1169        try {
1170          Path logPath = new Path(args[i]);
1171          FSUtils.setFsDefault(conf, logPath);
1172          split(conf, logPath);
1173        } catch (IOException t) {
1174          t.printStackTrace(System.err);
1175          System.exit(-1);
1176        }
1177      }
1178    } else {
1179      usage();
1180      System.exit(-1);
1181    }
1182  }
1183}