001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import java.io.IOException;
021import java.io.InterruptedIOException;
022import java.util.concurrent.CompletableFuture;
023import java.util.concurrent.CompletionException;
024import java.util.concurrent.ExecutionException;
025import java.util.concurrent.Executor;
026import java.util.concurrent.Future;
027import java.util.concurrent.TimeUnit;
028import java.util.concurrent.TimeoutException;
029import java.util.function.BiConsumer;
030import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
031import org.apache.yetus.audience.InterfaceAudience;
032import org.slf4j.Logger;
033import org.slf4j.LoggerFactory;
034
035/**
036 * Helper class for processing futures.
037 */
038@InterfaceAudience.Private
039public final class FutureUtils {
040
041  private static final Logger LOG = LoggerFactory.getLogger(FutureUtils.class);
042
043  private FutureUtils() {
044  }
045
046  /**
047   * This is method is used when you just want to add a listener to the given future. We will call
048   * {@link CompletableFuture#whenComplete(BiConsumer)} to register the {@code action} to the
049   * {@code future}. Ignoring the return value of a Future is considered as a bad practice as it may
050   * suppress exceptions thrown from the code that completes the future, and this method will catch
051   * all the exception thrown from the {@code action} to catch possible code bugs.
052   * <p/>
053   * And the error phone check will always report FutureReturnValueIgnored because every method in
054   * the {@link CompletableFuture} class will return a new {@link CompletableFuture}, so you always
055   * have one future that has not been checked. So we introduce this method and add a suppress
056   * warnings annotation here.
057   */
058  @SuppressWarnings("FutureReturnValueIgnored")
059  public static <T> void addListener(CompletableFuture<T> future,
060      BiConsumer<? super T, ? super Throwable> action) {
061    future.whenComplete((resp, error) -> {
062      try {
063        // See this post on stack overflow(shorten since the url is too long),
064        // https://s.apache.org/completionexception
065        // For a chain of CompleableFuture, only the first child CompletableFuture can get the
066        // original exception, others will get a CompletionException, which wraps the original
067        // exception. So here we unwrap it before passing it to the callback action.
068        action.accept(resp, unwrapCompletionException(error));
069      } catch (Throwable t) {
070        LOG.error("Unexpected error caught when processing CompletableFuture", t);
071      }
072    });
073  }
074
075  /**
076   * Almost the same with {@link #addListener(CompletableFuture, BiConsumer)} method above, the only
077   * exception is that we will call
078   * {@link CompletableFuture#whenCompleteAsync(BiConsumer, Executor)}.
079   * @see #addListener(CompletableFuture, BiConsumer)
080   */
081  @SuppressWarnings("FutureReturnValueIgnored")
082  public static <T> void addListener(CompletableFuture<T> future,
083      BiConsumer<? super T, ? super Throwable> action, Executor executor) {
084    future.whenCompleteAsync((resp, error) -> {
085      try {
086        action.accept(resp, unwrapCompletionException(error));
087      } catch (Throwable t) {
088        LOG.error("Unexpected error caught when processing CompletableFuture", t);
089      }
090    }, executor);
091  }
092
093  /**
094   * Return a {@link CompletableFuture} which is same with the given {@code future}, but execute all
095   * the callbacks in the given {@code executor}.
096   */
097  public static <T> CompletableFuture<T> wrapFuture(CompletableFuture<T> future,
098      Executor executor) {
099    CompletableFuture<T> wrappedFuture = new CompletableFuture<>();
100    addListener(future, (r, e) -> {
101      if (e != null) {
102        wrappedFuture.completeExceptionally(e);
103      } else {
104        wrappedFuture.complete(r);
105      }
106    }, executor);
107    return wrappedFuture;
108  }
109
110  /**
111   * Get the cause of the {@link Throwable} if it is a {@link CompletionException}.
112   */
113  public static Throwable unwrapCompletionException(Throwable error) {
114    if (error instanceof CompletionException) {
115      Throwable cause = error.getCause();
116      if (cause != null) {
117        return cause;
118      }
119    }
120    return error;
121  }
122
123  // This method is used to record the stack trace that calling the FutureUtils.get method. As in
124  // async client, the retry will be done in the retry timer thread, so the exception we get from
125  // the CompletableFuture will have a stack trace starting from the root of the retry timer. If we
126  // just throw this exception out when calling future.get(by unwrapping the ExecutionException),
127  // the upper layer even can not know where is the exception thrown...
128  // See HBASE-22316.
129  private static void setStackTrace(Throwable error) {
130    StackTraceElement[] localStackTrace = Thread.currentThread().getStackTrace();
131    StackTraceElement[] originalStackTrace = error.getStackTrace();
132    StackTraceElement[] newStackTrace =
133      new StackTraceElement[localStackTrace.length + originalStackTrace.length + 1];
134    System.arraycopy(localStackTrace, 0, newStackTrace, 0, localStackTrace.length);
135    newStackTrace[localStackTrace.length] =
136      new StackTraceElement("--------Future", "get--------", null, -1);
137    System.arraycopy(originalStackTrace, 0, newStackTrace, localStackTrace.length + 1,
138      originalStackTrace.length);
139    error.setStackTrace(newStackTrace);
140  }
141
142  /**
143   * If we could propagate the given {@code error} directly, we will fill the stack trace with the
144   * current thread's stack trace so it is easier to trace where is the exception thrown. If not, we
145   * will just create a new IOException and then throw it.
146   */
147  public static IOException rethrow(Throwable error) throws IOException {
148    if (error instanceof IOException) {
149      setStackTrace(error);
150      throw (IOException) error;
151    } else if (error instanceof RuntimeException) {
152      setStackTrace(error);
153      throw (RuntimeException) error;
154    } else if (error instanceof Error) {
155      setStackTrace(error);
156      throw (Error) error;
157    } else {
158      throw new IOException(error);
159    }
160  }
161
162  /**
163   * A helper class for getting the result of a Future, and convert the error to an
164   * {@link IOException}.
165   */
166  public static <T> T get(Future<T> future) throws IOException {
167    try {
168      return future.get();
169    } catch (InterruptedException e) {
170      throw (IOException) new InterruptedIOException().initCause(e);
171    } catch (ExecutionException e) {
172      throw rethrow(e.getCause());
173    }
174  }
175
176  /**
177   * A helper class for getting the result of a Future, and convert the error to an
178   * {@link IOException}.
179   */
180  public static <T> T get(Future<T> future, long timeout, TimeUnit unit) throws IOException {
181    try {
182      return future.get(timeout, unit);
183    } catch (InterruptedException e) {
184      throw (IOException) new InterruptedIOException().initCause(e);
185    } catch (ExecutionException e) {
186      throw rethrow(e.getCause());
187    } catch (TimeoutException e) {
188      throw new TimeoutIOException(e);
189    }
190  }
191
192  /**
193   * Returns a CompletableFuture that is already completed exceptionally with the given exception.
194   */
195  public static <T> CompletableFuture<T> failedFuture(Throwable e) {
196    CompletableFuture<T> future = new CompletableFuture<>();
197    future.completeExceptionally(e);
198    return future;
199  }
200}