001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import java.util.ArrayList; 021import java.util.concurrent.CancellationException; 022import java.util.concurrent.ExecutionException; 023import java.util.concurrent.Executor; 024import java.util.concurrent.RunnableFuture; 025import java.util.concurrent.TimeUnit; 026import java.util.concurrent.TimeoutException; 027import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 028import org.apache.yetus.audience.InterfaceAudience; 029import org.slf4j.Logger; 030import org.slf4j.LoggerFactory; 031 032/** 033 * A completion service for the RpcRetryingCallerFactory. Keeps the list of the futures, and allows 034 * to cancel them all. This means as well that it can be used for a small set of tasks only. <br> 035 * Implementation is not Thread safe. CompletedTasks is implemented as a queue, the entry is added 036 * based on the time order. I.e, when the first task completes (whether it is a success or failure), 037 * it is added as a first entry in the queue, the next completed task is added as a second entry in 038 * the queue, ... When iterating through the queue, we know it is based on time order. If the first 039 * completed task succeeds, it is returned. If it is failure, the iteration goes on until it finds a 040 * success. 041 */ 042@InterfaceAudience.Private 043public class ResultBoundedCompletionService<V> { 044 private static final Logger LOG = LoggerFactory.getLogger(ResultBoundedCompletionService.class); 045 private final RpcRetryingCallerFactory retryingCallerFactory; 046 private final Executor executor; 047 private final QueueingFuture<V>[] tasks; // all the tasks 048 private final ArrayList<QueueingFuture> completedTasks; // completed tasks 049 private volatile boolean cancelled = false; 050 051 class QueueingFuture<T> implements RunnableFuture<T> { 052 private final RetryingCallable<T> future; 053 private T result = null; 054 private ExecutionException exeEx = null; 055 private volatile boolean cancelled = false; 056 private final int operationTimeout; 057 private final RpcRetryingCaller<T> retryingCaller; 058 private boolean resultObtained = false; 059 private final int replicaId; // replica id 060 061 public QueueingFuture(RetryingCallable<T> future, int rpcTimeout, int operationTimeout, 062 int id) { 063 this.future = future; 064 this.operationTimeout = operationTimeout; 065 this.retryingCaller = retryingCallerFactory.<T> newCaller(rpcTimeout); 066 this.replicaId = id; 067 } 068 069 @SuppressWarnings("unchecked") 070 @Override 071 public void run() { 072 try { 073 if (!cancelled) { 074 result = this.retryingCaller.callWithRetries(future, operationTimeout); 075 resultObtained = true; 076 } 077 } catch (Throwable t) { 078 exeEx = new ExecutionException(t); 079 } finally { 080 synchronized (tasks) { 081 // If this wasn't canceled then store the result. 082 if (!cancelled) { 083 completedTasks.add(QueueingFuture.this); 084 } 085 086 // Notify just in case there was someone waiting and this was canceled. 087 // That shouldn't happen but better safe than sorry. 088 tasks.notify(); 089 } 090 } 091 } 092 093 @Override 094 public boolean cancel(boolean mayInterruptIfRunning) { 095 if (resultObtained || exeEx != null) return false; 096 retryingCaller.cancel(); 097 if (future instanceof Cancellable) ((Cancellable) future).cancel(); 098 cancelled = true; 099 return true; 100 } 101 102 @Override 103 public boolean isCancelled() { 104 return cancelled; 105 } 106 107 @Override 108 public boolean isDone() { 109 return resultObtained || exeEx != null; 110 } 111 112 @Override 113 public T get() throws InterruptedException, ExecutionException { 114 try { 115 return get(1000, TimeUnit.DAYS); 116 } catch (TimeoutException e) { 117 throw new RuntimeException("You did wait for 1000 days here?", e); 118 } 119 } 120 121 @Override 122 public T get(long timeout, TimeUnit unit) 123 throws InterruptedException, ExecutionException, TimeoutException { 124 synchronized (tasks) { 125 if (resultObtained) { 126 return result; 127 } 128 if (exeEx != null) { 129 throw exeEx; 130 } 131 unit.timedWait(tasks, timeout); 132 } 133 if (resultObtained) { 134 return result; 135 } 136 if (exeEx != null) { 137 throw exeEx; 138 } 139 140 throw new TimeoutException("timeout=" + timeout + ", " + unit); 141 } 142 143 public int getReplicaId() { 144 return replicaId; 145 } 146 147 public ExecutionException getExeEx() { 148 return exeEx; 149 } 150 } 151 152 @SuppressWarnings("unchecked") 153 public ResultBoundedCompletionService(RpcRetryingCallerFactory retryingCallerFactory, 154 Executor executor, int maxTasks) { 155 this.retryingCallerFactory = retryingCallerFactory; 156 this.executor = executor; 157 this.tasks = new QueueingFuture[maxTasks]; 158 this.completedTasks = new ArrayList<>(maxTasks); 159 } 160 161 public void submit(RetryingCallable<V> task, int rpcTimeout, int operationTimeout, int id) { 162 QueueingFuture<V> newFuture = new QueueingFuture<>(task, rpcTimeout, operationTimeout, id); 163 // remove trace for runnable because HBASE-25373 and OpenTelemetry do not cover TraceRunnable 164 executor.execute(newFuture); 165 tasks[id] = newFuture; 166 } 167 168 public QueueingFuture<V> take() throws InterruptedException { 169 synchronized (tasks) { 170 while (!cancelled && (completedTasks.size() < 1)) 171 tasks.wait(); 172 } 173 return completedTasks.get(0); 174 } 175 176 /** 177 * Poll for the first completed task whether it is a success or execution exception. 178 * @param timeout - time to wait before it times out 179 * @param unit - time unit for timeout 180 */ 181 public QueueingFuture<V> poll(long timeout, TimeUnit unit) throws InterruptedException { 182 return pollForSpecificCompletedTask(timeout, unit, 0); 183 } 184 185 /** 186 * Poll for the first successfully completed task whose completed order is in startIndex, 187 * endIndex(exclusive) range 188 * @param timeout - time to wait before it times out 189 * @param unit - time unit for timeout 190 * @param startIndex - start index, starting from 0, inclusive 191 * @param endIndex - end index, exclusive 192 * @return If within timeout time, there is no successfully completed task, return null; If all 193 * tasks get execution exception, it will throw out the last execution exception, 194 * otherwise return the first successfully completed task's result. 195 */ 196 public QueueingFuture<V> pollForFirstSuccessfullyCompletedTask(long timeout, TimeUnit unit, 197 int startIndex, int endIndex) 198 throws InterruptedException, CancellationException, ExecutionException { 199 200 QueueingFuture<V> f; 201 long start, duration; 202 for (int i = startIndex; i < endIndex; i++) { 203 204 start = EnvironmentEdgeManager.currentTime(); 205 f = pollForSpecificCompletedTask(timeout, unit, i); 206 duration = EnvironmentEdgeManager.currentTime() - start; 207 208 // Even with operationTimeout less than 0, still loop through the rest as there could 209 // be other completed tasks before operationTimeout. 210 timeout -= duration; 211 212 if (f == null) { 213 return null; 214 } else if (f.getExeEx() != null) { 215 // we continue here as we need to loop through all the results. 216 if (LOG.isDebugEnabled()) { 217 LOG.debug("Replica " + ((f == null) ? 0 : f.getReplicaId()) + " returns " 218 + f.getExeEx().getCause()); 219 } 220 221 if (i == (endIndex - 1)) { 222 // Rethrow this exception 223 throw f.getExeEx(); 224 } 225 continue; 226 } 227 228 return f; 229 } 230 231 return null; 232 } 233 234 /** 235 * Poll for the Nth completed task (index starts from 0 (the 1st), 1 (the second)...) 236 * @param timeout - time to wait before it times out 237 * @param unit - time unit for timeout 238 * @param index - the index(th) completed task, index starting from 0 239 */ 240 private QueueingFuture<V> pollForSpecificCompletedTask(long timeout, TimeUnit unit, int index) 241 throws InterruptedException { 242 if (index < 0) { 243 return null; 244 } 245 246 synchronized (tasks) { 247 if (!cancelled && (completedTasks.size() <= index)) unit.timedWait(tasks, timeout); 248 if (completedTasks.size() <= index) return null; 249 } 250 return completedTasks.get(index); 251 } 252 253 public void cancelAll() { 254 // Grab the lock on tasks so that cancelled is visible everywhere 255 synchronized (tasks) { 256 cancelled = true; 257 } 258 for (QueueingFuture<V> future : tasks) { 259 if (future != null) future.cancel(true); 260 } 261 } 262}