001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util; 019 020import java.io.IOException; 021import java.io.InterruptedIOException; 022import java.util.concurrent.CompletableFuture; 023import java.util.concurrent.CompletionException; 024import java.util.concurrent.ExecutionException; 025import java.util.concurrent.Executor; 026import java.util.concurrent.Future; 027import java.util.function.BiConsumer; 028import org.apache.yetus.audience.InterfaceAudience; 029import org.slf4j.Logger; 030import org.slf4j.LoggerFactory; 031 032/** 033 * Helper class for processing futures. 034 */ 035@InterfaceAudience.Private 036public final class FutureUtils { 037 038 private static final Logger LOG = LoggerFactory.getLogger(FutureUtils.class); 039 040 private FutureUtils() { 041 } 042 043 /** 044 * This is method is used when you just want to add a listener to the given future. We will call 045 * {@link CompletableFuture#whenComplete(BiConsumer)} to register the {@code action} to the 046 * {@code future}. Ignoring the return value of a Future is considered as a bad practice as it may 047 * suppress exceptions thrown from the code that completes the future, and this method will catch 048 * all the exception thrown from the {@code action} to catch possible code bugs. 049 * <p/> 050 * And the error phone check will always report FutureReturnValueIgnored because every method in 051 * the {@link CompletableFuture} class will return a new {@link CompletableFuture}, so you always 052 * have one future that has not been checked. So we introduce this method and add a suppress 053 * warnings annotation here. 054 */ 055 @SuppressWarnings("FutureReturnValueIgnored") 056 public static <T> void addListener(CompletableFuture<T> future, 057 BiConsumer<? super T, ? super Throwable> action) { 058 future.whenComplete((resp, error) -> { 059 try { 060 // See this post on stack overflow(shorten since the url is too long), 061 // https://s.apache.org/completionexception 062 // For a chain of CompleableFuture, only the first child CompletableFuture can get the 063 // original exception, others will get a CompletionException, which wraps the original 064 // exception. So here we unwrap it before passing it to the callback action. 065 action.accept(resp, unwrapCompletionException(error)); 066 } catch (Throwable t) { 067 LOG.error("Unexpected error caught when processing CompletableFuture", t); 068 } 069 }); 070 } 071 072 /** 073 * Almost the same with {@link #addListener(CompletableFuture, BiConsumer)} method above, the only 074 * exception is that we will call 075 * {@link CompletableFuture#whenCompleteAsync(BiConsumer, Executor)}. 076 * @see #addListener(CompletableFuture, BiConsumer) 077 */ 078 @SuppressWarnings("FutureReturnValueIgnored") 079 public static <T> void addListener(CompletableFuture<T> future, 080 BiConsumer<? super T, ? super Throwable> action, Executor executor) { 081 future.whenCompleteAsync((resp, error) -> { 082 try { 083 action.accept(resp, unwrapCompletionException(error)); 084 } catch (Throwable t) { 085 LOG.error("Unexpected error caught when processing CompletableFuture", t); 086 } 087 }, executor); 088 } 089 090 /** 091 * Return a {@link CompletableFuture} which is same with the given {@code future}, but execute all 092 * the callbacks in the given {@code executor}. 093 */ 094 public static <T> CompletableFuture<T> wrapFuture(CompletableFuture<T> future, 095 Executor executor) { 096 CompletableFuture<T> wrappedFuture = new CompletableFuture<>(); 097 addListener(future, (r, e) -> { 098 if (e != null) { 099 wrappedFuture.completeExceptionally(e); 100 } else { 101 wrappedFuture.complete(r); 102 } 103 }, executor); 104 return wrappedFuture; 105 } 106 107 /** 108 * Get the cause of the {@link Throwable} if it is a {@link CompletionException}. 109 */ 110 public static Throwable unwrapCompletionException(Throwable error) { 111 if (error instanceof CompletionException) { 112 Throwable cause = error.getCause(); 113 if (cause != null) { 114 return cause; 115 } 116 } 117 return error; 118 } 119 120 // This method is used to record the stack trace that calling the FutureUtils.get method. As in 121 // async client, the retry will be done in the retry timer thread, so the exception we get from 122 // the CompletableFuture will have a stack trace starting from the root of the retry timer. If we 123 // just throw this exception out when calling future.get(by unwrapping the ExecutionException), 124 // the upper layer even can not know where is the exception thrown... 125 // See HBASE-22316. 126 private static void setStackTrace(Throwable error) { 127 StackTraceElement[] localStackTrace = Thread.currentThread().getStackTrace(); 128 StackTraceElement[] originalStackTrace = error.getStackTrace(); 129 StackTraceElement[] newStackTrace = 130 new StackTraceElement[localStackTrace.length + originalStackTrace.length + 1]; 131 System.arraycopy(localStackTrace, 0, newStackTrace, 0, localStackTrace.length); 132 newStackTrace[localStackTrace.length] = 133 new StackTraceElement("--------Future", "get--------", null, -1); 134 System.arraycopy(originalStackTrace, 0, newStackTrace, localStackTrace.length + 1, 135 originalStackTrace.length); 136 error.setStackTrace(newStackTrace); 137 } 138 139 private static IOException rethrow(ExecutionException error) throws IOException { 140 Throwable cause = error.getCause(); 141 if (cause instanceof IOException) { 142 setStackTrace(cause); 143 throw (IOException) cause; 144 } else if (cause instanceof RuntimeException) { 145 setStackTrace(cause); 146 throw (RuntimeException) cause; 147 } else if (cause instanceof Error) { 148 setStackTrace(cause); 149 throw (Error) cause; 150 } else { 151 throw new IOException(cause); 152 } 153 } 154 155 /** 156 * A helper class for getting the result of a Future, and convert the error to an 157 * {@link IOException}. 158 */ 159 public static <T> T get(Future<T> future) throws IOException { 160 try { 161 return future.get(); 162 } catch (InterruptedException e) { 163 throw (IOException) new InterruptedIOException().initCause(e); 164 } catch (ExecutionException e) { 165 throw rethrow(e); 166 } 167 } 168 169 /** 170 * Returns a CompletableFuture that is already completed exceptionally with the given exception. 171 */ 172 public static <T> CompletableFuture<T> failedFuture(Throwable e) { 173 CompletableFuture<T> future = new CompletableFuture<>(); 174 future.completeExceptionally(e); 175 return future; 176 } 177}