001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util; 019 020import java.io.IOException; 021import java.io.InterruptedIOException; 022import java.util.concurrent.CompletableFuture; 023import java.util.concurrent.CompletionException; 024import java.util.concurrent.ExecutionException; 025import java.util.concurrent.Executor; 026import java.util.concurrent.Future; 027import java.util.concurrent.TimeUnit; 028import java.util.concurrent.TimeoutException; 029import java.util.function.BiConsumer; 030import org.apache.hadoop.hbase.exceptions.TimeoutIOException; 031import org.apache.yetus.audience.InterfaceAudience; 032import org.slf4j.Logger; 033import org.slf4j.LoggerFactory; 034 035/** 036 * Helper class for processing futures. 037 */ 038@InterfaceAudience.Private 039public final class FutureUtils { 040 041 private static final Logger LOG = LoggerFactory.getLogger(FutureUtils.class); 042 043 private FutureUtils() { 044 } 045 046 /** 047 * This is method is used when you just want to add a listener to the given future. We will call 048 * {@link CompletableFuture#whenComplete(BiConsumer)} to register the {@code action} to the 049 * {@code future}. Ignoring the return value of a Future is considered as a bad practice as it may 050 * suppress exceptions thrown from the code that completes the future, and this method will catch 051 * all the exception thrown from the {@code action} to catch possible code bugs. 052 * <p/> 053 * And the error phone check will always report FutureReturnValueIgnored because every method in 054 * the {@link CompletableFuture} class will return a new {@link CompletableFuture}, so you always 055 * have one future that has not been checked. So we introduce this method and add a suppress 056 * warnings annotation here. 057 */ 058 @SuppressWarnings("FutureReturnValueIgnored") 059 public static <T> void addListener(CompletableFuture<T> future, 060 BiConsumer<? super T, ? super Throwable> action) { 061 future.whenComplete((resp, error) -> { 062 try { 063 // See this post on stack overflow(shorten since the url is too long), 064 // https://s.apache.org/completionexception 065 // For a chain of CompleableFuture, only the first child CompletableFuture can get the 066 // original exception, others will get a CompletionException, which wraps the original 067 // exception. So here we unwrap it before passing it to the callback action. 068 action.accept(resp, unwrapCompletionException(error)); 069 } catch (Throwable t) { 070 LOG.error("Unexpected error caught when processing CompletableFuture", t); 071 } 072 }); 073 } 074 075 /** 076 * Almost the same with {@link #addListener(CompletableFuture, BiConsumer)} method above, the only 077 * exception is that we will call 078 * {@link CompletableFuture#whenCompleteAsync(BiConsumer, Executor)}. 079 * @see #addListener(CompletableFuture, BiConsumer) 080 */ 081 @SuppressWarnings("FutureReturnValueIgnored") 082 public static <T> void addListener(CompletableFuture<T> future, 083 BiConsumer<? super T, ? super Throwable> action, Executor executor) { 084 future.whenCompleteAsync((resp, error) -> { 085 try { 086 action.accept(resp, unwrapCompletionException(error)); 087 } catch (Throwable t) { 088 LOG.error("Unexpected error caught when processing CompletableFuture", t); 089 } 090 }, executor); 091 } 092 093 /** 094 * Return a {@link CompletableFuture} which is same with the given {@code future}, but execute all 095 * the callbacks in the given {@code executor}. 096 */ 097 public static <T> CompletableFuture<T> wrapFuture(CompletableFuture<T> future, 098 Executor executor) { 099 CompletableFuture<T> wrappedFuture = new CompletableFuture<>(); 100 addListener(future, (r, e) -> { 101 if (e != null) { 102 wrappedFuture.completeExceptionally(e); 103 } else { 104 wrappedFuture.complete(r); 105 } 106 }, executor); 107 return wrappedFuture; 108 } 109 110 /** 111 * Get the cause of the {@link Throwable} if it is a {@link CompletionException}. 112 */ 113 public static Throwable unwrapCompletionException(Throwable error) { 114 if (error instanceof CompletionException) { 115 Throwable cause = error.getCause(); 116 if (cause != null) { 117 return cause; 118 } 119 } 120 return error; 121 } 122 123 // This method is used to record the stack trace that calling the FutureUtils.get method. As in 124 // async client, the retry will be done in the retry timer thread, so the exception we get from 125 // the CompletableFuture will have a stack trace starting from the root of the retry timer. If we 126 // just throw this exception out when calling future.get(by unwrapping the ExecutionException), 127 // the upper layer even can not know where is the exception thrown... 128 // See HBASE-22316. 129 private static void setStackTrace(Throwable error) { 130 StackTraceElement[] localStackTrace = Thread.currentThread().getStackTrace(); 131 StackTraceElement[] originalStackTrace = error.getStackTrace(); 132 StackTraceElement[] newStackTrace = 133 new StackTraceElement[localStackTrace.length + originalStackTrace.length + 1]; 134 System.arraycopy(localStackTrace, 0, newStackTrace, 0, localStackTrace.length); 135 newStackTrace[localStackTrace.length] = 136 new StackTraceElement("--------Future", "get--------", null, -1); 137 System.arraycopy(originalStackTrace, 0, newStackTrace, localStackTrace.length + 1, 138 originalStackTrace.length); 139 error.setStackTrace(newStackTrace); 140 } 141 142 private static IOException rethrow(ExecutionException error) throws IOException { 143 Throwable cause = error.getCause(); 144 if (cause instanceof IOException) { 145 setStackTrace(cause); 146 throw (IOException) cause; 147 } else if (cause instanceof RuntimeException) { 148 setStackTrace(cause); 149 throw (RuntimeException) cause; 150 } else if (cause instanceof Error) { 151 setStackTrace(cause); 152 throw (Error) cause; 153 } else { 154 throw new IOException(cause); 155 } 156 } 157 158 /** 159 * A helper class for getting the result of a Future, and convert the error to an 160 * {@link IOException}. 161 */ 162 public static <T> T get(Future<T> future) throws IOException { 163 try { 164 return future.get(); 165 } catch (InterruptedException e) { 166 throw (IOException) new InterruptedIOException().initCause(e); 167 } catch (ExecutionException e) { 168 throw rethrow(e); 169 } 170 } 171 172 /** 173 * A helper class for getting the result of a Future, and convert the error to an 174 * {@link IOException}. 175 */ 176 public static <T> T get(Future<T> future, long timeout, TimeUnit unit) throws IOException { 177 try { 178 return future.get(timeout, unit); 179 } catch (InterruptedException e) { 180 throw (IOException) new InterruptedIOException().initCause(e); 181 } catch (ExecutionException e) { 182 throw rethrow(e); 183 } catch (TimeoutException e) { 184 throw new TimeoutIOException(e); 185 } 186 } 187 188 /** 189 * Returns a CompletableFuture that is already completed exceptionally with the given exception. 190 */ 191 public static <T> CompletableFuture<T> failedFuture(Throwable e) { 192 CompletableFuture<T> future = new CompletableFuture<>(); 193 future.completeExceptionally(e); 194 return future; 195 } 196}