001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import java.io.FileNotFoundException;
021import java.io.IOException;
022import java.io.InterruptedIOException;
023import java.lang.reflect.InvocationTargetException;
024import java.lang.reflect.Method;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.fs.FileSystem;
027import org.apache.hadoop.fs.FilterFileSystem;
028import org.apache.hadoop.fs.Path;
029import org.apache.hadoop.hdfs.DistributedFileSystem;
030import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
031import org.apache.yetus.audience.InterfaceAudience;
032import org.slf4j.Logger;
033import org.slf4j.LoggerFactory;
034
035/**
036 * Utility methods for recovering file lease for hdfs.
037 */
038@InterfaceAudience.Private
039public final class RecoverLeaseFSUtils {
040
041  private static final Logger LOG = LoggerFactory.getLogger(RecoverLeaseFSUtils.class);
042
043  private static Class<?> leaseRecoverableClazz = null;
044  private static Method recoverLeaseMethod = null;
045  public static final String LEASE_RECOVERABLE_CLASS_NAME = "org.apache.hadoop.fs.LeaseRecoverable";
046  static {
047    LOG.debug("Initialize RecoverLeaseFSUtils");
048    initializeRecoverLeaseMethod(LEASE_RECOVERABLE_CLASS_NAME);
049  }
050
051  /**
052   * Initialize reflection classes and methods. If LeaseRecoverable class is not found, look for
053   * DistributedFilSystem#recoverLease method.
054   */
055  static void initializeRecoverLeaseMethod(String className) {
056    try {
057      leaseRecoverableClazz = Class.forName(className);
058      recoverLeaseMethod = leaseRecoverableClazz.getMethod("recoverLease", Path.class);
059      LOG.debug("set recoverLeaseMethod to " + className + ".recoverLease()");
060    } catch (ClassNotFoundException e) {
061      LOG.debug(
062        "LeaseRecoverable interface not in the classpath, this means Hadoop 3.3.5 or below.");
063      try {
064        recoverLeaseMethod = DistributedFileSystem.class.getMethod("recoverLease", Path.class);
065      } catch (NoSuchMethodException ex) {
066        LOG.error("Cannot find recoverLease method in DistributedFileSystem class. "
067          + "It should never happen. Abort.", ex);
068        throw new RuntimeException(ex);
069      }
070    } catch (NoSuchMethodException e) {
071      LOG.error("Cannot find recoverLease method in LeaseRecoverable class. "
072        + "It should never happen. Abort.", e);
073      throw new RuntimeException(e);
074    }
075  }
076
077  private RecoverLeaseFSUtils() {
078  }
079
080  public static void recoverFileLease(FileSystem fs, Path p, Configuration conf)
081    throws IOException {
082    recoverFileLease(fs, p, conf, null);
083  }
084
085  /**
086   * Recover the lease from Hadoop file system, retrying multiple times.
087   */
088  public static void recoverFileLease(FileSystem fs, Path p, Configuration conf,
089    CancelableProgressable reporter) throws IOException {
090    if (fs instanceof FilterFileSystem) {
091      fs = ((FilterFileSystem) fs).getRawFileSystem();
092    }
093
094    // lease recovery not needed for local file system case.
095    if (isLeaseRecoverable(fs)) {
096      recoverDFSFileLease(fs, p, conf, reporter);
097    }
098  }
099
100  public static boolean isLeaseRecoverable(FileSystem fs) {
101    // return true if HDFS.
102    if (fs instanceof DistributedFileSystem) {
103      return true;
104    }
105    // return true if the file system implements LeaseRecoverable interface.
106    if (leaseRecoverableClazz != null) {
107      return leaseRecoverableClazz.isAssignableFrom(fs.getClass());
108    }
109    // return false if the file system is not HDFS and does not implement LeaseRecoverable.
110    return false;
111  }
112
113  /*
114   * Run the dfs recover lease. recoverLease is asynchronous. It returns: -false when it starts the
115   * lease recovery (i.e. lease recovery not *yet* done) - true when the lease recovery has
116   * succeeded or the file is closed. But, we have to be careful. Each time we call recoverLease, it
117   * starts the recover lease process over from the beginning. We could put ourselves in a situation
118   * where we are doing nothing but starting a recovery, interrupting it to start again, and so on.
119   * The findings over in HBASE-8354 have it that the namenode will try to recover the lease on the
120   * file's primary node. If all is well, it should return near immediately. But, as is common, it
121   * is the very primary node that has crashed and so the namenode will be stuck waiting on a socket
122   * timeout before it will ask another datanode to start the recovery. It does not help if we call
123   * recoverLease in the meantime and in particular, after the socket timeout, a recoverLease
124   * invocation will cause us to start over from square one (possibly waiting on socket timeout
125   * against primary node). So, in the below, we do the following: 1. Call recoverLease. 2. If it
126   * returns true, break. 3. If it returns false, wait a few seconds and then call it again. 4. If
127   * it returns true, break. 5. If it returns false, wait for what we think the datanode socket
128   * timeout is (configurable) and then try again. 6. If it returns true, break. 7. If it returns
129   * false, repeat starting at step 5. above. If HDFS-4525 is available, call it every second, and
130   * we might be able to exit early.
131   */
132  private static boolean recoverDFSFileLease(final FileSystem dfs, final Path p,
133    final Configuration conf, final CancelableProgressable reporter) throws IOException {
134    LOG.info("Recover lease on dfs file " + p);
135    long startWaiting = EnvironmentEdgeManager.currentTime();
136    // Default is 15 minutes. It's huge, but the idea is that if we have a major issue, HDFS
137    // usually needs 10 minutes before marking the nodes as dead. So we're putting ourselves
138    // beyond that limit 'to be safe'.
139    long recoveryTimeout = conf.getInt("hbase.lease.recovery.timeout", 900000) + startWaiting;
140    // This setting should be a little above what the cluster dfs heartbeat is set to.
141    long firstPause = conf.getInt("hbase.lease.recovery.first.pause", 4000);
142    // This should be set to how long it'll take for us to timeout against primary datanode if it
143    // is dead. We set it to 64 seconds, 4 seconds than the default READ_TIMEOUT in HDFS, the
144    // default value for DFS_CLIENT_SOCKET_TIMEOUT_KEY. If recovery is still failing after this
145    // timeout, then further recovery will take liner backoff with this base, to avoid endless
146    // preemptions when this value is not properly configured.
147    long subsequentPauseBase = conf.getLong("hbase.lease.recovery.dfs.timeout", 64 * 1000);
148
149    Method isFileClosedMeth = null;
150    // whether we need to look for isFileClosed method
151    boolean findIsFileClosedMeth = true;
152    boolean recovered = false;
153    // We break the loop if we succeed the lease recovery, timeout, or we throw an exception.
154    for (int nbAttempt = 0; !recovered; nbAttempt++) {
155      recovered = recoverLease(dfs, nbAttempt, p, startWaiting);
156      if (recovered) {
157        break;
158      }
159      checkIfCancelled(reporter);
160      if (checkIfTimedout(conf, recoveryTimeout, nbAttempt, p, startWaiting)) {
161        break;
162      }
163      try {
164        // On the first time through wait the short 'firstPause'.
165        if (nbAttempt == 0) {
166          Thread.sleep(firstPause);
167        } else {
168          // Cycle here until (subsequentPause * nbAttempt) elapses. While spinning, check
169          // isFileClosed if available (should be in hadoop 2.0.5... not in hadoop 1 though).
170          long localStartWaiting = EnvironmentEdgeManager.currentTime();
171          while (
172            (EnvironmentEdgeManager.currentTime() - localStartWaiting)
173                < subsequentPauseBase * nbAttempt
174          ) {
175            Thread.sleep(conf.getInt("hbase.lease.recovery.pause", 1000));
176            if (findIsFileClosedMeth) {
177              try {
178                isFileClosedMeth =
179                  dfs.getClass().getMethod("isFileClosed", new Class[] { Path.class });
180              } catch (NoSuchMethodException nsme) {
181                LOG.debug("isFileClosed not available");
182              } finally {
183                findIsFileClosedMeth = false;
184              }
185            }
186            if (isFileClosedMeth != null && isFileClosed(dfs, isFileClosedMeth, p)) {
187              recovered = true;
188              break;
189            }
190            checkIfCancelled(reporter);
191          }
192        }
193      } catch (InterruptedException ie) {
194        InterruptedIOException iioe = new InterruptedIOException();
195        iioe.initCause(ie);
196        throw iioe;
197      }
198    }
199    return recovered;
200  }
201
202  private static boolean checkIfTimedout(final Configuration conf, final long recoveryTimeout,
203    final int nbAttempt, final Path p, final long startWaiting) {
204    if (recoveryTimeout < EnvironmentEdgeManager.currentTime()) {
205      LOG.warn("Cannot recoverLease after trying for "
206        + conf.getInt("hbase.lease.recovery.timeout", 900000)
207        + "ms (hbase.lease.recovery.timeout); continuing, but may be DATALOSS!!!; "
208        + getLogMessageDetail(nbAttempt, p, startWaiting));
209      return true;
210    }
211    return false;
212  }
213
214  /**
215   * Try to recover the lease.
216   * @return True if dfs#recoverLease came by true.
217   */
218  private static boolean recoverLease(final FileSystem dfs, final int nbAttempt, final Path p,
219    final long startWaiting) throws FileNotFoundException {
220    boolean recovered = false;
221    try {
222      recovered = (Boolean) recoverLeaseMethod.invoke(dfs, p);
223      LOG.info((recovered ? "Recovered lease, " : "Failed to recover lease, ")
224        + getLogMessageDetail(nbAttempt, p, startWaiting));
225    } catch (InvocationTargetException ite) {
226      final Throwable e = ite.getCause();
227      if (e instanceof LeaseExpiredException && e.getMessage().contains("File does not exist")) {
228        // This exception comes out instead of FNFE, fix it
229        throw new FileNotFoundException("The given WAL wasn't found at " + p);
230      } else if (e instanceof FileNotFoundException) {
231        throw (FileNotFoundException) e;
232      }
233      LOG.warn(getLogMessageDetail(nbAttempt, p, startWaiting), e);
234    } catch (IllegalAccessException e) {
235      LOG.error("Failed to call recoverLease on {}. Abort.", dfs, e);
236      throw new RuntimeException(e);
237    }
238    return recovered;
239  }
240
241  /** Returns Detail to append to any log message around lease recovering. */
242  private static String getLogMessageDetail(final int nbAttempt, final Path p,
243    final long startWaiting) {
244    return "attempt=" + nbAttempt + " on file=" + p + " after "
245      + (EnvironmentEdgeManager.currentTime() - startWaiting) + "ms";
246  }
247
248  /**
249   * Call HDFS-4525 isFileClosed if it is available.
250   * @return True if file is closed.
251   */
252  private static boolean isFileClosed(final FileSystem dfs, final Method m, final Path p) {
253    try {
254      return (Boolean) m.invoke(dfs, p);
255    } catch (SecurityException e) {
256      LOG.warn("No access", e);
257    } catch (Exception e) {
258      LOG.warn("Failed invocation for " + p.toString(), e);
259    }
260    return false;
261  }
262
263  private static void checkIfCancelled(final CancelableProgressable reporter)
264    throws InterruptedIOException {
265    if (reporter == null) {
266      return;
267    }
268    if (!reporter.progress()) {
269      throw new InterruptedIOException("Operation cancelled");
270    }
271  }
272}