001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util; 019 020import java.io.IOException; 021import java.io.InterruptedIOException; 022import java.util.concurrent.CompletableFuture; 023import java.util.concurrent.CompletionException; 024import java.util.concurrent.ExecutionException; 025import java.util.concurrent.Executor; 026import java.util.concurrent.Future; 027import java.util.function.BiConsumer; 028import org.apache.yetus.audience.InterfaceAudience; 029import org.slf4j.Logger; 030import org.slf4j.LoggerFactory; 031 032import org.apache.hbase.thirdparty.com.google.common.base.Throwables; 033 034/** 035 * Helper class for processing futures. 036 */ 037@InterfaceAudience.Private 038public final class FutureUtils { 039 040 private static final Logger LOG = LoggerFactory.getLogger(FutureUtils.class); 041 042 private FutureUtils() { 043 } 044 045 /** 046 * This is method is used when you just want to add a listener to the given future. We will call 047 * {@link CompletableFuture#whenComplete(BiConsumer)} to register the {@code action} to the 048 * {@code future}. Ignoring the return value of a Future is considered as a bad practice as it may 049 * suppress exceptions thrown from the code that completes the future, and this method will catch 050 * all the exception thrown from the {@code action} to catch possible code bugs. 051 * <p/> 052 * And the error phone check will always report FutureReturnValueIgnored because every method in 053 * the {@link CompletableFuture} class will return a new {@link CompletableFuture}, so you always 054 * have one future that has not been checked. So we introduce this method and add a suppress 055 * warnings annotation here. 056 */ 057 @SuppressWarnings("FutureReturnValueIgnored") 058 public static <T> void addListener(CompletableFuture<T> future, 059 BiConsumer<? super T, ? super Throwable> action) { 060 future.whenComplete((resp, error) -> { 061 try { 062 // See this post on stack overflow(shorten since the url is too long), 063 // https://s.apache.org/completionexception 064 // For a chain of CompleableFuture, only the first child CompletableFuture can get the 065 // original exception, others will get a CompletionException, which wraps the original 066 // exception. So here we unwrap it before passing it to the callback action. 067 action.accept(resp, unwrapCompletionException(error)); 068 } catch (Throwable t) { 069 LOG.error("Unexpected error caught when processing CompletableFuture", t); 070 } 071 }); 072 } 073 074 /** 075 * Almost the same with {@link #addListener(CompletableFuture, BiConsumer)} method above, the only 076 * exception is that we will call 077 * {@link CompletableFuture#whenCompleteAsync(BiConsumer, Executor)}. 078 * @see #addListener(CompletableFuture, BiConsumer) 079 */ 080 @SuppressWarnings("FutureReturnValueIgnored") 081 public static <T> void addListener(CompletableFuture<T> future, 082 BiConsumer<? super T, ? super Throwable> action, Executor executor) { 083 future.whenCompleteAsync((resp, error) -> { 084 try { 085 action.accept(resp, unwrapCompletionException(error)); 086 } catch (Throwable t) { 087 LOG.error("Unexpected error caught when processing CompletableFuture", t); 088 } 089 }, executor); 090 } 091 092 /** 093 * Return a {@link CompletableFuture} which is same with the given {@code future}, but execute all 094 * the callbacks in the given {@code executor}. 095 */ 096 public static <T> CompletableFuture<T> wrapFuture(CompletableFuture<T> future, 097 Executor executor) { 098 CompletableFuture<T> wrappedFuture = new CompletableFuture<>(); 099 addListener(future, (r, e) -> { 100 if (e != null) { 101 wrappedFuture.completeExceptionally(e); 102 } else { 103 wrappedFuture.complete(r); 104 } 105 }, executor); 106 return wrappedFuture; 107 } 108 109 /** 110 * Get the cause of the {@link Throwable} if it is a {@link CompletionException}. 111 */ 112 public static Throwable unwrapCompletionException(Throwable error) { 113 if (error instanceof CompletionException) { 114 Throwable cause = error.getCause(); 115 if (cause != null) { 116 return cause; 117 } 118 } 119 return error; 120 } 121 122 /** 123 * A helper class for getting the result of a Future, and convert the error to an 124 * {@link IOException}. 125 */ 126 public static <T> T get(Future<T> future) throws IOException { 127 try { 128 return future.get(); 129 } catch (InterruptedException e) { 130 throw (IOException) new InterruptedIOException().initCause(e); 131 } catch (ExecutionException e) { 132 Throwable cause = e.getCause(); 133 Throwables.propagateIfPossible(cause, IOException.class); 134 throw new IOException(cause); 135 } 136 } 137 138 /** 139 * Returns a CompletableFuture that is already completed exceptionally with the given exception. 140 */ 141 public static <T> CompletableFuture<T> failedFuture(Throwable e) { 142 CompletableFuture<T> future = new CompletableFuture<>(); 143 future.completeExceptionally(e); 144 return future; 145 } 146}