001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.client; 020 021import java.util.ArrayList; 022import java.util.concurrent.CancellationException; 023import java.util.concurrent.ExecutionException; 024import java.util.concurrent.Executor; 025import java.util.concurrent.RunnableFuture; 026import java.util.concurrent.TimeUnit; 027import java.util.concurrent.TimeoutException; 028 029import org.apache.hadoop.hbase.trace.TraceUtil; 030import org.apache.yetus.audience.InterfaceAudience; 031import org.slf4j.Logger; 032import org.slf4j.LoggerFactory; 033import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 034 035/** 036 * A completion service for the RpcRetryingCallerFactory. 037 * Keeps the list of the futures, and allows to cancel them all. 038 * This means as well that it can be used for a small set of tasks only. 039 * <br>Implementation is not Thread safe. 040 * 041 * CompletedTasks is implemented as a queue, the entry is added based on the time order. I.e, 042 * when the first task completes (whether it is a success or failure), it is added as a first 043 * entry in the queue, the next completed task is added as a second entry in the queue, ... 044 * When iterating through the queue, we know it is based on time order. If the first 045 * completed task succeeds, it is returned. If it is failure, the iteration goes on until it 046 * finds a success. 047 */ 048@InterfaceAudience.Private 049public class ResultBoundedCompletionService<V> { 050 private static final Logger LOG = LoggerFactory.getLogger(ResultBoundedCompletionService.class); 051 private final RpcRetryingCallerFactory retryingCallerFactory; 052 private final Executor executor; 053 private final QueueingFuture<V>[] tasks; // all the tasks 054 private final ArrayList<QueueingFuture> completedTasks; // completed tasks 055 private volatile boolean cancelled = false; 056 057 class QueueingFuture<T> implements RunnableFuture<T> { 058 private final RetryingCallable<T> future; 059 private T result = null; 060 private ExecutionException exeEx = null; 061 private volatile boolean cancelled = false; 062 private final int callTimeout; 063 private final RpcRetryingCaller<T> retryingCaller; 064 private boolean resultObtained = false; 065 private final int replicaId; // replica id 066 067 068 public QueueingFuture(RetryingCallable<T> future, int callTimeout, int id) { 069 this.future = future; 070 this.callTimeout = callTimeout; 071 this.retryingCaller = retryingCallerFactory.<T>newCaller(); 072 this.replicaId = id; 073 } 074 075 @SuppressWarnings("unchecked") 076 @Override 077 public void run() { 078 try { 079 if (!cancelled) { 080 result = this.retryingCaller.callWithRetries(future, callTimeout); 081 resultObtained = true; 082 } 083 } catch (Throwable t) { 084 exeEx = new ExecutionException(t); 085 } finally { 086 synchronized (tasks) { 087 // If this wasn't canceled then store the result. 088 if (!cancelled) { 089 completedTasks.add(QueueingFuture.this); 090 } 091 092 // Notify just in case there was someone waiting and this was canceled. 093 // That shouldn't happen but better safe than sorry. 094 tasks.notify(); 095 } 096 } 097 } 098 099 @Override 100 public boolean cancel(boolean mayInterruptIfRunning) { 101 if (resultObtained || exeEx != null) return false; 102 retryingCaller.cancel(); 103 if (future instanceof Cancellable) ((Cancellable)future).cancel(); 104 cancelled = true; 105 return true; 106 } 107 108 @Override 109 public boolean isCancelled() { 110 return cancelled; 111 } 112 113 @Override 114 public boolean isDone() { 115 return resultObtained || exeEx != null; 116 } 117 118 @Override 119 public T get() throws InterruptedException, ExecutionException { 120 try { 121 return get(1000, TimeUnit.DAYS); 122 } catch (TimeoutException e) { 123 throw new RuntimeException("You did wait for 1000 days here?", e); 124 } 125 } 126 127 @Override 128 public T get(long timeout, TimeUnit unit) 129 throws InterruptedException, ExecutionException, TimeoutException { 130 synchronized (tasks) { 131 if (resultObtained) { 132 return result; 133 } 134 if (exeEx != null) { 135 throw exeEx; 136 } 137 unit.timedWait(tasks, timeout); 138 } 139 if (resultObtained) { 140 return result; 141 } 142 if (exeEx != null) { 143 throw exeEx; 144 } 145 146 throw new TimeoutException("timeout=" + timeout + ", " + unit); 147 } 148 149 public int getReplicaId() { 150 return replicaId; 151 } 152 153 public ExecutionException getExeEx() { 154 return exeEx; 155 } 156 } 157 158 @SuppressWarnings("unchecked") 159 public ResultBoundedCompletionService( 160 RpcRetryingCallerFactory retryingCallerFactory, Executor executor, 161 int maxTasks) { 162 this.retryingCallerFactory = retryingCallerFactory; 163 this.executor = executor; 164 this.tasks = new QueueingFuture[maxTasks]; 165 this.completedTasks = new ArrayList<>(maxTasks); 166 } 167 168 169 public void submit(RetryingCallable<V> task, int callTimeout, int id) { 170 QueueingFuture<V> newFuture = new QueueingFuture<>(task, callTimeout, id); 171 executor.execute(TraceUtil.wrap(newFuture, "ResultBoundedCompletionService.submit")); 172 tasks[id] = newFuture; 173 } 174 175 public QueueingFuture<V> take() throws InterruptedException { 176 synchronized (tasks) { 177 while (!cancelled && (completedTasks.size() < 1)) tasks.wait(); 178 } 179 return completedTasks.get(0); 180 } 181 182 /** 183 * Poll for the first completed task whether it is a success or execution exception. 184 * 185 * @param timeout - time to wait before it times out 186 * @param unit - time unit for timeout 187 */ 188 public QueueingFuture<V> poll(long timeout, TimeUnit unit) throws InterruptedException { 189 return pollForSpecificCompletedTask(timeout, unit, 0); 190 } 191 192 /** 193 * Poll for the first successfully completed task whose completed order is in startIndex, 194 * endIndex(exclusive) range 195 * 196 * @param timeout - time to wait before it times out 197 * @param unit - time unit for timeout 198 * @param startIndex - start index, starting from 0, inclusive 199 * @param endIndex - end index, exclusive 200 * 201 * @return If within timeout time, there is no successfully completed task, return null; If all 202 * tasks get execution exception, it will throw out the last execution exception, 203 * otherwise return the first successfully completed task's result. 204 */ 205 public QueueingFuture<V> pollForFirstSuccessfullyCompletedTask(long timeout, TimeUnit unit, 206 int startIndex, int endIndex) 207 throws InterruptedException, CancellationException, ExecutionException { 208 209 QueueingFuture<V> f; 210 long start, duration; 211 for (int i = startIndex; i < endIndex; i ++) { 212 213 start = EnvironmentEdgeManager.currentTime(); 214 f = pollForSpecificCompletedTask(timeout, unit, i); 215 duration = EnvironmentEdgeManager.currentTime() - start; 216 217 // Even with operationTimeout less than 0, still loop through the rest as there could 218 // be other completed tasks before operationTimeout. 219 timeout -= duration; 220 221 if (f == null) { 222 return null; 223 } else if (f.getExeEx() != null) { 224 // we continue here as we need to loop through all the results. 225 if (LOG.isDebugEnabled()) { 226 LOG.debug("Replica " + ((f == null) ? 0 : f.getReplicaId()) + " returns " + 227 f.getExeEx().getCause()); 228 } 229 230 if (i == (endIndex - 1)) { 231 // Rethrow this exception 232 throw f.getExeEx(); 233 } 234 continue; 235 } 236 237 return f; 238 } 239 240 // impossible to reach 241 return null; 242 } 243 244 /** 245 * Poll for the Nth completed task (index starts from 0 (the 1st), 1 (the second)...) 246 * 247 * @param timeout - time to wait before it times out 248 * @param unit - time unit for timeout 249 * @param index - the index(th) completed task, index starting from 0 250 */ 251 private QueueingFuture<V> pollForSpecificCompletedTask(long timeout, TimeUnit unit, int index) 252 throws InterruptedException { 253 if (index < 0) { 254 return null; 255 } 256 257 synchronized (tasks) { 258 if (!cancelled && (completedTasks.size() <= index)) unit.timedWait(tasks, timeout); 259 if (completedTasks.size() <= index) return null; 260 } 261 return completedTasks.get(index); 262 } 263 264 public void cancelAll() { 265 // Grab the lock on tasks so that cancelled is visible everywhere 266 synchronized (tasks) { 267 cancelled = true; 268 } 269 for (QueueingFuture<V> future : tasks) { 270 if (future != null) future.cancel(true); 271 } 272 } 273}