001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import java.io.IOException;
021import java.io.InterruptedIOException;
022import java.util.concurrent.CompletableFuture;
023import java.util.concurrent.CompletionException;
024import java.util.concurrent.ExecutionException;
025import java.util.concurrent.Executor;
026import java.util.concurrent.Future;
027import java.util.function.BiConsumer;
028import org.apache.yetus.audience.InterfaceAudience;
029import org.slf4j.Logger;
030import org.slf4j.LoggerFactory;
031
032/**
033 * Helper class for processing futures.
034 */
035@InterfaceAudience.Private
036public final class FutureUtils {
037
038  private static final Logger LOG = LoggerFactory.getLogger(FutureUtils.class);
039
040  private FutureUtils() {
041  }
042
043  /**
044   * This is method is used when you just want to add a listener to the given future. We will call
045   * {@link CompletableFuture#whenComplete(BiConsumer)} to register the {@code action} to the
046   * {@code future}. Ignoring the return value of a Future is considered as a bad practice as it may
047   * suppress exceptions thrown from the code that completes the future, and this method will catch
048   * all the exception thrown from the {@code action} to catch possible code bugs.
049   * <p/>
050   * And the error phone check will always report FutureReturnValueIgnored because every method in
051   * the {@link CompletableFuture} class will return a new {@link CompletableFuture}, so you always
052   * have one future that has not been checked. So we introduce this method and add a suppress
053   * warnings annotation here.
054   */
055  @SuppressWarnings("FutureReturnValueIgnored")
056  public static <T> void addListener(CompletableFuture<T> future,
057      BiConsumer<? super T, ? super Throwable> action) {
058    future.whenComplete((resp, error) -> {
059      try {
060        // See this post on stack overflow(shorten since the url is too long),
061        // https://s.apache.org/completionexception
062        // For a chain of CompleableFuture, only the first child CompletableFuture can get the
063        // original exception, others will get a CompletionException, which wraps the original
064        // exception. So here we unwrap it before passing it to the callback action.
065        action.accept(resp, unwrapCompletionException(error));
066      } catch (Throwable t) {
067        LOG.error("Unexpected error caught when processing CompletableFuture", t);
068      }
069    });
070  }
071
072  /**
073   * Almost the same with {@link #addListener(CompletableFuture, BiConsumer)} method above, the only
074   * exception is that we will call
075   * {@link CompletableFuture#whenCompleteAsync(BiConsumer, Executor)}.
076   * @see #addListener(CompletableFuture, BiConsumer)
077   */
078  @SuppressWarnings("FutureReturnValueIgnored")
079  public static <T> void addListener(CompletableFuture<T> future,
080      BiConsumer<? super T, ? super Throwable> action, Executor executor) {
081    future.whenCompleteAsync((resp, error) -> {
082      try {
083        action.accept(resp, unwrapCompletionException(error));
084      } catch (Throwable t) {
085        LOG.error("Unexpected error caught when processing CompletableFuture", t);
086      }
087    }, executor);
088  }
089
090  /**
091   * Return a {@link CompletableFuture} which is same with the given {@code future}, but execute all
092   * the callbacks in the given {@code executor}.
093   */
094  public static <T> CompletableFuture<T> wrapFuture(CompletableFuture<T> future,
095      Executor executor) {
096    CompletableFuture<T> wrappedFuture = new CompletableFuture<>();
097    addListener(future, (r, e) -> {
098      if (e != null) {
099        wrappedFuture.completeExceptionally(e);
100      } else {
101        wrappedFuture.complete(r);
102      }
103    }, executor);
104    return wrappedFuture;
105  }
106
107  /**
108   * Get the cause of the {@link Throwable} if it is a {@link CompletionException}.
109   */
110  public static Throwable unwrapCompletionException(Throwable error) {
111    if (error instanceof CompletionException) {
112      Throwable cause = error.getCause();
113      if (cause != null) {
114        return cause;
115      }
116    }
117    return error;
118  }
119
120  // This method is used to record the stack trace that calling the FutureUtils.get method. As in
121  // async client, the retry will be done in the retry timer thread, so the exception we get from
122  // the CompletableFuture will have a stack trace starting from the root of the retry timer. If we
123  // just throw this exception out when calling future.get(by unwrapping the ExecutionException),
124  // the upper layer even can not know where is the exception thrown...
125  // See HBASE-22316.
126  private static void setStackTrace(Throwable error) {
127    StackTraceElement[] localStackTrace = Thread.currentThread().getStackTrace();
128    StackTraceElement[] originalStackTrace = error.getStackTrace();
129    StackTraceElement[] newStackTrace =
130      new StackTraceElement[localStackTrace.length + originalStackTrace.length + 1];
131    System.arraycopy(localStackTrace, 0, newStackTrace, 0, localStackTrace.length);
132    newStackTrace[localStackTrace.length] =
133      new StackTraceElement("--------Future", "get--------", null, -1);
134    System.arraycopy(originalStackTrace, 0, newStackTrace, localStackTrace.length + 1,
135      originalStackTrace.length);
136    error.setStackTrace(newStackTrace);
137  }
138
139  private static IOException rethrow(ExecutionException error) throws IOException {
140    Throwable cause = error.getCause();
141    if (cause instanceof IOException) {
142      setStackTrace(cause);
143      throw (IOException) cause;
144    } else if (cause instanceof RuntimeException) {
145      setStackTrace(cause);
146      throw (RuntimeException) cause;
147    } else if (cause instanceof Error) {
148      setStackTrace(cause);
149      throw (Error) cause;
150    } else {
151      throw new IOException(cause);
152    }
153  }
154
155  /**
156   * A helper class for getting the result of a Future, and convert the error to an
157   * {@link IOException}.
158   */
159  public static <T> T get(Future<T> future) throws IOException {
160    try {
161      return future.get();
162    } catch (InterruptedException e) {
163      throw (IOException) new InterruptedIOException().initCause(e);
164    } catch (ExecutionException e) {
165      throw rethrow(e);
166    }
167  }
168
169  /**
170   * Returns a CompletableFuture that is already completed exceptionally with the given exception.
171   */
172  public static <T> CompletableFuture<T> failedFuture(Throwable e) {
173    CompletableFuture<T> future = new CompletableFuture<>();
174    future.completeExceptionally(e);
175    return future;
176  }
177}