001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import java.io.IOException;
021import java.io.InterruptedIOException;
022import java.util.concurrent.CompletableFuture;
023import java.util.concurrent.CompletionException;
024import java.util.concurrent.ExecutionException;
025import java.util.concurrent.Executor;
026import java.util.concurrent.Future;
027import java.util.function.BiConsumer;
028import org.apache.yetus.audience.InterfaceAudience;
029import org.slf4j.Logger;
030import org.slf4j.LoggerFactory;
031
032import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
033
034/**
035 * Helper class for processing futures.
036 */
037@InterfaceAudience.Private
038public final class FutureUtils {
039
040  private static final Logger LOG = LoggerFactory.getLogger(FutureUtils.class);
041
042  private FutureUtils() {
043  }
044
045  /**
046   * This is method is used when you just want to add a listener to the given future. We will call
047   * {@link CompletableFuture#whenComplete(BiConsumer)} to register the {@code action} to the
048   * {@code future}. Ignoring the return value of a Future is considered as a bad practice as it may
049   * suppress exceptions thrown from the code that completes the future, and this method will catch
050   * all the exception thrown from the {@code action} to catch possible code bugs.
051   * <p/>
052   * And the error phone check will always report FutureReturnValueIgnored because every method in
053   * the {@link CompletableFuture} class will return a new {@link CompletableFuture}, so you always
054   * have one future that has not been checked. So we introduce this method and add a suppress
055   * warnings annotation here.
056   */
057  @SuppressWarnings("FutureReturnValueIgnored")
058  public static <T> void addListener(CompletableFuture<T> future,
059      BiConsumer<? super T, ? super Throwable> action) {
060    future.whenComplete((resp, error) -> {
061      try {
062        // See this post on stack overflow(shorten since the url is too long),
063        // https://s.apache.org/completionexception
064        // For a chain of CompleableFuture, only the first child CompletableFuture can get the
065        // original exception, others will get a CompletionException, which wraps the original
066        // exception. So here we unwrap it before passing it to the callback action.
067        action.accept(resp, unwrapCompletionException(error));
068      } catch (Throwable t) {
069        LOG.error("Unexpected error caught when processing CompletableFuture", t);
070      }
071    });
072  }
073
074  /**
075   * Almost the same with {@link #addListener(CompletableFuture, BiConsumer)} method above, the only
076   * exception is that we will call
077   * {@link CompletableFuture#whenCompleteAsync(BiConsumer, Executor)}.
078   * @see #addListener(CompletableFuture, BiConsumer)
079   */
080  @SuppressWarnings("FutureReturnValueIgnored")
081  public static <T> void addListener(CompletableFuture<T> future,
082      BiConsumer<? super T, ? super Throwable> action, Executor executor) {
083    future.whenCompleteAsync((resp, error) -> {
084      try {
085        action.accept(resp, unwrapCompletionException(error));
086      } catch (Throwable t) {
087        LOG.error("Unexpected error caught when processing CompletableFuture", t);
088      }
089    }, executor);
090  }
091
092  /**
093   * Return a {@link CompletableFuture} which is same with the given {@code future}, but execute all
094   * the callbacks in the given {@code executor}.
095   */
096  public static <T> CompletableFuture<T> wrapFuture(CompletableFuture<T> future,
097      Executor executor) {
098    CompletableFuture<T> wrappedFuture = new CompletableFuture<>();
099    addListener(future, (r, e) -> {
100      if (e != null) {
101        wrappedFuture.completeExceptionally(e);
102      } else {
103        wrappedFuture.complete(r);
104      }
105    }, executor);
106    return wrappedFuture;
107  }
108
109  /**
110   * Get the cause of the {@link Throwable} if it is a {@link CompletionException}.
111   */
112  public static Throwable unwrapCompletionException(Throwable error) {
113    if (error instanceof CompletionException) {
114      Throwable cause = error.getCause();
115      if (cause != null) {
116        return cause;
117      }
118    }
119    return error;
120  }
121
122  /**
123   * A helper class for getting the result of a Future, and convert the error to an
124   * {@link IOException}.
125   */
126  public static <T> T get(Future<T> future) throws IOException {
127    try {
128      return future.get();
129    } catch (InterruptedException e) {
130      throw (IOException) new InterruptedIOException().initCause(e);
131    } catch (ExecutionException e) {
132      Throwable cause = e.getCause();
133      Throwables.propagateIfPossible(cause, IOException.class);
134      throw new IOException(cause);
135    }
136  }
137
138  /**
139   * Returns a CompletableFuture that is already completed exceptionally with the given exception.
140   */
141  public static <T> CompletableFuture<T> failedFuture(Throwable e) {
142    CompletableFuture<T> future = new CompletableFuture<>();
143    future.completeExceptionally(e);
144    return future;
145  }
146}