001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import java.io.IOException;
021import java.io.InterruptedIOException;
022import java.util.concurrent.CompletableFuture;
023import java.util.concurrent.CompletionException;
024import java.util.concurrent.ExecutionException;
025import java.util.concurrent.Executor;
026import java.util.concurrent.Future;
027import java.util.concurrent.TimeUnit;
028import java.util.concurrent.TimeoutException;
029import java.util.function.BiConsumer;
030import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
031import org.apache.yetus.audience.InterfaceAudience;
032import org.slf4j.Logger;
033import org.slf4j.LoggerFactory;
034
035/**
036 * Helper class for processing futures.
037 */
038@InterfaceAudience.Private
039public final class FutureUtils {
040
041  private static final Logger LOG = LoggerFactory.getLogger(FutureUtils.class);
042
043  private FutureUtils() {
044  }
045
046  /**
047   * This is method is used when you just want to add a listener to the given future. We will call
048   * {@link CompletableFuture#whenComplete(BiConsumer)} to register the {@code action} to the
049   * {@code future}. Ignoring the return value of a Future is considered as a bad practice as it may
050   * suppress exceptions thrown from the code that completes the future, and this method will catch
051   * all the exception thrown from the {@code action} to catch possible code bugs.
052   * <p/>
053   * And the error phone check will always report FutureReturnValueIgnored because every method in
054   * the {@link CompletableFuture} class will return a new {@link CompletableFuture}, so you always
055   * have one future that has not been checked. So we introduce this method and add a suppress
056   * warnings annotation here.
057   */
058  @SuppressWarnings("FutureReturnValueIgnored")
059  public static <T> void addListener(CompletableFuture<T> future,
060      BiConsumer<? super T, ? super Throwable> action) {
061    future.whenComplete((resp, error) -> {
062      try {
063        // See this post on stack overflow(shorten since the url is too long),
064        // https://s.apache.org/completionexception
065        // For a chain of CompleableFuture, only the first child CompletableFuture can get the
066        // original exception, others will get a CompletionException, which wraps the original
067        // exception. So here we unwrap it before passing it to the callback action.
068        action.accept(resp, unwrapCompletionException(error));
069      } catch (Throwable t) {
070        LOG.error("Unexpected error caught when processing CompletableFuture", t);
071      }
072    });
073  }
074
075  /**
076   * Almost the same with {@link #addListener(CompletableFuture, BiConsumer)} method above, the only
077   * exception is that we will call
078   * {@link CompletableFuture#whenCompleteAsync(BiConsumer, Executor)}.
079   * @see #addListener(CompletableFuture, BiConsumer)
080   */
081  @SuppressWarnings("FutureReturnValueIgnored")
082  public static <T> void addListener(CompletableFuture<T> future,
083      BiConsumer<? super T, ? super Throwable> action, Executor executor) {
084    future.whenCompleteAsync((resp, error) -> {
085      try {
086        action.accept(resp, unwrapCompletionException(error));
087      } catch (Throwable t) {
088        LOG.error("Unexpected error caught when processing CompletableFuture", t);
089      }
090    }, executor);
091  }
092
093  /**
094   * Return a {@link CompletableFuture} which is same with the given {@code future}, but execute all
095   * the callbacks in the given {@code executor}.
096   */
097  public static <T> CompletableFuture<T> wrapFuture(CompletableFuture<T> future,
098      Executor executor) {
099    CompletableFuture<T> wrappedFuture = new CompletableFuture<>();
100    addListener(future, (r, e) -> {
101      if (e != null) {
102        wrappedFuture.completeExceptionally(e);
103      } else {
104        wrappedFuture.complete(r);
105      }
106    }, executor);
107    return wrappedFuture;
108  }
109
110  /**
111   * Get the cause of the {@link Throwable} if it is a {@link CompletionException}.
112   */
113  public static Throwable unwrapCompletionException(Throwable error) {
114    if (error instanceof CompletionException) {
115      Throwable cause = error.getCause();
116      if (cause != null) {
117        return cause;
118      }
119    }
120    return error;
121  }
122
123  // This method is used to record the stack trace that calling the FutureUtils.get method. As in
124  // async client, the retry will be done in the retry timer thread, so the exception we get from
125  // the CompletableFuture will have a stack trace starting from the root of the retry timer. If we
126  // just throw this exception out when calling future.get(by unwrapping the ExecutionException),
127  // the upper layer even can not know where is the exception thrown...
128  // See HBASE-22316.
129  private static void setStackTrace(Throwable error) {
130    StackTraceElement[] localStackTrace = Thread.currentThread().getStackTrace();
131    StackTraceElement[] originalStackTrace = error.getStackTrace();
132    StackTraceElement[] newStackTrace =
133      new StackTraceElement[localStackTrace.length + originalStackTrace.length + 1];
134    System.arraycopy(localStackTrace, 0, newStackTrace, 0, localStackTrace.length);
135    newStackTrace[localStackTrace.length] =
136      new StackTraceElement("--------Future", "get--------", null, -1);
137    System.arraycopy(originalStackTrace, 0, newStackTrace, localStackTrace.length + 1,
138      originalStackTrace.length);
139    error.setStackTrace(newStackTrace);
140  }
141
142  private static IOException rethrow(ExecutionException error) throws IOException {
143    Throwable cause = error.getCause();
144    if (cause instanceof IOException) {
145      setStackTrace(cause);
146      throw (IOException) cause;
147    } else if (cause instanceof RuntimeException) {
148      setStackTrace(cause);
149      throw (RuntimeException) cause;
150    } else if (cause instanceof Error) {
151      setStackTrace(cause);
152      throw (Error) cause;
153    } else {
154      throw new IOException(cause);
155    }
156  }
157
158  /**
159   * A helper class for getting the result of a Future, and convert the error to an
160   * {@link IOException}.
161   */
162  public static <T> T get(Future<T> future) throws IOException {
163    try {
164      return future.get();
165    } catch (InterruptedException e) {
166      throw (IOException) new InterruptedIOException().initCause(e);
167    } catch (ExecutionException e) {
168      throw rethrow(e);
169    }
170  }
171
172  /**
173   * A helper class for getting the result of a Future, and convert the error to an
174   * {@link IOException}.
175   */
176  public static <T> T get(Future<T> future, long timeout, TimeUnit unit) throws IOException {
177    try {
178      return future.get(timeout, unit);
179    } catch (InterruptedException e) {
180      throw (IOException) new InterruptedIOException().initCause(e);
181    } catch (ExecutionException e) {
182      throw rethrow(e);
183    } catch (TimeoutException e) {
184      throw new TimeoutIOException(e);
185    }
186  }
187
188  /**
189   * Returns a CompletableFuture that is already completed exceptionally with the given exception.
190   */
191  public static <T> CompletableFuture<T> failedFuture(Throwable e) {
192    CompletableFuture<T> future = new CompletableFuture<>();
193    future.completeExceptionally(e);
194    return future;
195  }
196}