001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.client;
020
021import java.util.ArrayList;
022import java.util.concurrent.CancellationException;
023import java.util.concurrent.ExecutionException;
024import java.util.concurrent.Executor;
025import java.util.concurrent.RunnableFuture;
026import java.util.concurrent.TimeUnit;
027import java.util.concurrent.TimeoutException;
028
029import org.apache.hadoop.hbase.trace.TraceUtil;
030import org.apache.yetus.audience.InterfaceAudience;
031import org.slf4j.Logger;
032import org.slf4j.LoggerFactory;
033import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
034
035/**
036 * A completion service for the RpcRetryingCallerFactory.
037 * Keeps the list of the futures, and allows to cancel them all.
038 * This means as well that it can be used for a small set of tasks only.
039 * <br>Implementation is not Thread safe.
040 *
041 * CompletedTasks is implemented as a queue, the entry is added based on the time order. I.e,
042 * when the first task completes (whether it is a success or failure), it is added as a first
043 * entry in the queue, the next completed task is added as a second entry in the queue, ...
044 * When iterating through the queue, we know it is based on time order. If the first
045 * completed task succeeds, it is returned. If it is failure, the iteration goes on until it
046 * finds a success.
047 */
048@InterfaceAudience.Private
049public class ResultBoundedCompletionService<V> {
050  private static final Logger LOG = LoggerFactory.getLogger(ResultBoundedCompletionService.class);
051  private final RpcRetryingCallerFactory retryingCallerFactory;
052  private final Executor executor;
053  private final QueueingFuture<V>[] tasks; // all the tasks
054  private final ArrayList<QueueingFuture> completedTasks; // completed tasks
055  private volatile boolean cancelled = false;
056  
057  class QueueingFuture<T> implements RunnableFuture<T> {
058    private final RetryingCallable<T> future;
059    private T result = null;
060    private ExecutionException exeEx = null;
061    private volatile boolean cancelled = false;
062    private final int callTimeout;
063    private final RpcRetryingCaller<T> retryingCaller;
064    private boolean resultObtained = false;
065    private final int replicaId;  // replica id
066
067
068    public QueueingFuture(RetryingCallable<T> future, int callTimeout, int id) {
069      this.future = future;
070      this.callTimeout = callTimeout;
071      this.retryingCaller = retryingCallerFactory.<T>newCaller();
072      this.replicaId = id;
073    }
074
075    @SuppressWarnings("unchecked")
076    @Override
077    public void run() {
078      try {
079        if (!cancelled) {
080          result = this.retryingCaller.callWithRetries(future, callTimeout);
081          resultObtained = true;
082        }
083      } catch (Throwable t) {
084        exeEx = new ExecutionException(t);
085      } finally {
086        synchronized (tasks) {
087          // If this wasn't canceled then store the result.
088          if (!cancelled) {
089            completedTasks.add(QueueingFuture.this);
090          }
091
092          // Notify just in case there was someone waiting and this was canceled.
093          // That shouldn't happen but better safe than sorry.
094          tasks.notify();
095        }
096      }
097    }
098
099    @Override
100    public boolean cancel(boolean mayInterruptIfRunning) {
101      if (resultObtained || exeEx != null) return false;
102      retryingCaller.cancel();
103      if (future instanceof Cancellable) ((Cancellable)future).cancel();
104      cancelled = true;
105      return true;
106    }
107
108    @Override
109    public boolean isCancelled() {
110      return cancelled;
111    }
112
113    @Override
114    public boolean isDone() {
115      return resultObtained || exeEx != null;
116    }
117
118    @Override
119    public T get() throws InterruptedException, ExecutionException {
120      try {
121        return get(1000, TimeUnit.DAYS);
122      } catch (TimeoutException e) {
123        throw new RuntimeException("You did wait for 1000 days here?", e);
124      }
125    }
126
127    @Override
128    public T get(long timeout, TimeUnit unit)
129        throws InterruptedException, ExecutionException, TimeoutException {
130      synchronized (tasks) {
131        if (resultObtained) {
132          return result;
133        }
134        if (exeEx != null) {
135          throw exeEx;
136        }
137        unit.timedWait(tasks, timeout);
138      }
139      if (resultObtained) {
140        return result;
141      }
142      if (exeEx != null) {
143        throw exeEx;
144      }
145
146      throw new TimeoutException("timeout=" + timeout + ", " + unit);
147    }
148
149    public int getReplicaId() {
150      return replicaId;
151    }
152
153    public ExecutionException getExeEx() {
154      return exeEx;
155    }
156  }
157
158  @SuppressWarnings("unchecked")
159  public ResultBoundedCompletionService(
160      RpcRetryingCallerFactory retryingCallerFactory, Executor executor,
161      int maxTasks) {
162    this.retryingCallerFactory = retryingCallerFactory;
163    this.executor = executor;
164    this.tasks = new QueueingFuture[maxTasks];
165    this.completedTasks = new ArrayList<>(maxTasks);
166  }
167
168
169  public void submit(RetryingCallable<V> task, int callTimeout, int id) {
170    QueueingFuture<V> newFuture = new QueueingFuture<>(task, callTimeout, id);
171    executor.execute(TraceUtil.wrap(newFuture, "ResultBoundedCompletionService.submit"));
172    tasks[id] = newFuture;
173  }
174
175  public QueueingFuture<V> take() throws InterruptedException {
176    synchronized (tasks) {
177      while (!cancelled && (completedTasks.size() < 1)) tasks.wait();
178    }
179    return completedTasks.get(0);
180  }
181
182  /**
183   * Poll for the first completed task whether it is a success or execution exception.
184   *
185   * @param timeout  - time to wait before it times out
186   * @param unit  - time unit for timeout
187   */
188  public QueueingFuture<V> poll(long timeout, TimeUnit unit) throws InterruptedException {
189    return pollForSpecificCompletedTask(timeout, unit, 0);
190  }
191
192  /**
193   * Poll for the first successfully completed task whose completed order is in startIndex,
194   * endIndex(exclusive) range
195   *
196   * @param timeout  - time to wait before it times out
197   * @param unit  - time unit for timeout
198   * @param startIndex - start index, starting from 0, inclusive
199   * @param endIndex - end index, exclusive
200   *
201   * @return If within timeout time, there is no successfully completed task, return null; If all
202   *         tasks get execution exception, it will throw out the last execution exception,
203   *         otherwise return the first successfully completed task's result.
204   */
205  public QueueingFuture<V> pollForFirstSuccessfullyCompletedTask(long timeout, TimeUnit unit,
206      int startIndex, int endIndex)
207      throws InterruptedException, CancellationException, ExecutionException {
208
209    QueueingFuture<V>  f;
210    long start, duration;
211    for (int i = startIndex; i < endIndex; i ++) {
212
213      start = EnvironmentEdgeManager.currentTime();
214      f = pollForSpecificCompletedTask(timeout, unit, i);
215      duration = EnvironmentEdgeManager.currentTime() - start;
216
217      // Even with operationTimeout less than 0, still loop through the rest as there could
218      // be other completed tasks before operationTimeout.
219      timeout -= duration;
220
221      if (f == null) {
222        return null;
223      } else if (f.getExeEx() != null) {
224        // we continue here as we need to loop through all the results.
225        if (LOG.isDebugEnabled()) {
226          LOG.debug("Replica " + ((f == null) ? 0 : f.getReplicaId()) + " returns " +
227              f.getExeEx().getCause());
228        }
229
230        if (i == (endIndex - 1)) {
231          // Rethrow this exception
232          throw f.getExeEx();
233        }
234        continue;
235      }
236
237      return f;
238    }
239
240    // impossible to reach
241    return null;
242  }
243
244  /**
245   * Poll for the Nth completed task (index starts from 0 (the 1st), 1 (the second)...)
246   *
247   * @param timeout  - time to wait before it times out
248   * @param unit  - time unit for timeout
249   * @param index - the index(th) completed task, index starting from 0
250   */
251  private QueueingFuture<V> pollForSpecificCompletedTask(long timeout, TimeUnit unit, int index)
252      throws InterruptedException {
253    if (index < 0) {
254      return null;
255    }
256
257    synchronized (tasks) {
258      if (!cancelled && (completedTasks.size() <= index)) unit.timedWait(tasks, timeout);
259      if (completedTasks.size() <= index) return null;
260    }
261    return completedTasks.get(index);
262  }
263
264  public void cancelAll() {
265    // Grab the lock on tasks so that cancelled is visible everywhere
266    synchronized (tasks) {
267      cancelled = true;
268    }
269    for (QueueingFuture<V> future : tasks) {
270      if (future != null) future.cancel(true);
271    }
272  }
273}