001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import java.util.ArrayList;
021import java.util.concurrent.CancellationException;
022import java.util.concurrent.ExecutionException;
023import java.util.concurrent.Executor;
024import java.util.concurrent.RunnableFuture;
025import java.util.concurrent.TimeUnit;
026import java.util.concurrent.TimeoutException;
027import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
028import org.apache.yetus.audience.InterfaceAudience;
029import org.slf4j.Logger;
030import org.slf4j.LoggerFactory;
031
032/**
033 * A completion service for the RpcRetryingCallerFactory. Keeps the list of the futures, and allows
034 * to cancel them all. This means as well that it can be used for a small set of tasks only. <br>
035 * Implementation is not Thread safe. CompletedTasks is implemented as a queue, the entry is added
036 * based on the time order. I.e, when the first task completes (whether it is a success or failure),
037 * it is added as a first entry in the queue, the next completed task is added as a second entry in
038 * the queue, ... When iterating through the queue, we know it is based on time order. If the first
039 * completed task succeeds, it is returned. If it is failure, the iteration goes on until it finds a
040 * success.
041 */
042@InterfaceAudience.Private
043public class ResultBoundedCompletionService<V> {
044  private static final Logger LOG = LoggerFactory.getLogger(ResultBoundedCompletionService.class);
045  private final RpcRetryingCallerFactory retryingCallerFactory;
046  private final Executor executor;
047  private final QueueingFuture<V>[] tasks; // all the tasks
048  private final ArrayList<QueueingFuture> completedTasks; // completed tasks
049  private volatile boolean cancelled = false;
050
051  class QueueingFuture<T> implements RunnableFuture<T> {
052    private final RetryingCallable<T> future;
053    private T result = null;
054    private ExecutionException exeEx = null;
055    private volatile boolean cancelled = false;
056    private final int operationTimeout;
057    private final RpcRetryingCaller<T> retryingCaller;
058    private boolean resultObtained = false;
059    private final int replicaId; // replica id
060
061    public QueueingFuture(RetryingCallable<T> future, int rpcTimeout, int operationTimeout,
062      int id) {
063      this.future = future;
064      this.operationTimeout = operationTimeout;
065      this.retryingCaller = retryingCallerFactory.<T> newCaller(rpcTimeout);
066      this.replicaId = id;
067    }
068
069    @SuppressWarnings("unchecked")
070    @Override
071    public void run() {
072      try {
073        if (!cancelled) {
074          result = this.retryingCaller.callWithRetries(future, operationTimeout);
075          resultObtained = true;
076        }
077      } catch (Throwable t) {
078        exeEx = new ExecutionException(t);
079      } finally {
080        synchronized (tasks) {
081          // If this wasn't canceled then store the result.
082          if (!cancelled) {
083            completedTasks.add(QueueingFuture.this);
084          }
085
086          // Notify just in case there was someone waiting and this was canceled.
087          // That shouldn't happen but better safe than sorry.
088          tasks.notify();
089        }
090      }
091    }
092
093    @Override
094    public boolean cancel(boolean mayInterruptIfRunning) {
095      if (resultObtained || exeEx != null) return false;
096      retryingCaller.cancel();
097      if (future instanceof Cancellable) ((Cancellable) future).cancel();
098      cancelled = true;
099      return true;
100    }
101
102    @Override
103    public boolean isCancelled() {
104      return cancelled;
105    }
106
107    @Override
108    public boolean isDone() {
109      return resultObtained || exeEx != null;
110    }
111
112    @Override
113    public T get() throws InterruptedException, ExecutionException {
114      try {
115        return get(1000, TimeUnit.DAYS);
116      } catch (TimeoutException e) {
117        throw new RuntimeException("You did wait for 1000 days here?", e);
118      }
119    }
120
121    @Override
122    public T get(long timeout, TimeUnit unit)
123      throws InterruptedException, ExecutionException, TimeoutException {
124      synchronized (tasks) {
125        if (resultObtained) {
126          return result;
127        }
128        if (exeEx != null) {
129          throw exeEx;
130        }
131        unit.timedWait(tasks, timeout);
132      }
133      if (resultObtained) {
134        return result;
135      }
136      if (exeEx != null) {
137        throw exeEx;
138      }
139
140      throw new TimeoutException("timeout=" + timeout + ", " + unit);
141    }
142
143    public int getReplicaId() {
144      return replicaId;
145    }
146
147    public ExecutionException getExeEx() {
148      return exeEx;
149    }
150  }
151
152  @SuppressWarnings("unchecked")
153  public ResultBoundedCompletionService(RpcRetryingCallerFactory retryingCallerFactory,
154    Executor executor, int maxTasks) {
155    this.retryingCallerFactory = retryingCallerFactory;
156    this.executor = executor;
157    this.tasks = new QueueingFuture[maxTasks];
158    this.completedTasks = new ArrayList<>(maxTasks);
159  }
160
161  public void submit(RetryingCallable<V> task, int rpcTimeout, int operationTimeout, int id) {
162    QueueingFuture<V> newFuture = new QueueingFuture<>(task, rpcTimeout, operationTimeout, id);
163    // remove trace for runnable because HBASE-25373 and OpenTelemetry do not cover TraceRunnable
164    executor.execute(newFuture);
165    tasks[id] = newFuture;
166  }
167
168  public QueueingFuture<V> take() throws InterruptedException {
169    synchronized (tasks) {
170      while (!cancelled && (completedTasks.size() < 1))
171        tasks.wait();
172    }
173    return completedTasks.get(0);
174  }
175
176  /**
177   * Poll for the first completed task whether it is a success or execution exception.
178   * @param timeout - time to wait before it times out
179   * @param unit    - time unit for timeout
180   */
181  public QueueingFuture<V> poll(long timeout, TimeUnit unit) throws InterruptedException {
182    return pollForSpecificCompletedTask(timeout, unit, 0);
183  }
184
185  /**
186   * Poll for the first successfully completed task whose completed order is in startIndex,
187   * endIndex(exclusive) range
188   * @param timeout    - time to wait before it times out
189   * @param unit       - time unit for timeout
190   * @param startIndex - start index, starting from 0, inclusive
191   * @param endIndex   - end index, exclusive
192   * @return If within timeout time, there is no successfully completed task, return null; If all
193   *         tasks get execution exception, it will throw out the last execution exception,
194   *         otherwise return the first successfully completed task's result.
195   */
196  public QueueingFuture<V> pollForFirstSuccessfullyCompletedTask(long timeout, TimeUnit unit,
197    int startIndex, int endIndex)
198    throws InterruptedException, CancellationException, ExecutionException {
199
200    QueueingFuture<V> f;
201    long start, duration;
202    for (int i = startIndex; i < endIndex; i++) {
203
204      start = EnvironmentEdgeManager.currentTime();
205      f = pollForSpecificCompletedTask(timeout, unit, i);
206      duration = EnvironmentEdgeManager.currentTime() - start;
207
208      // Even with operationTimeout less than 0, still loop through the rest as there could
209      // be other completed tasks before operationTimeout.
210      timeout -= duration;
211
212      if (f == null) {
213        return null;
214      } else if (f.getExeEx() != null) {
215        // we continue here as we need to loop through all the results.
216        if (LOG.isDebugEnabled()) {
217          LOG.debug("Replica " + ((f == null) ? 0 : f.getReplicaId()) + " returns "
218            + f.getExeEx().getCause());
219        }
220
221        if (i == (endIndex - 1)) {
222          // Rethrow this exception
223          throw f.getExeEx();
224        }
225        continue;
226      }
227
228      return f;
229    }
230
231    return null;
232  }
233
234  /**
235   * Poll for the Nth completed task (index starts from 0 (the 1st), 1 (the second)...)
236   * @param timeout - time to wait before it times out
237   * @param unit    - time unit for timeout
238   * @param index   - the index(th) completed task, index starting from 0
239   */
240  private QueueingFuture<V> pollForSpecificCompletedTask(long timeout, TimeUnit unit, int index)
241    throws InterruptedException {
242    if (index < 0) {
243      return null;
244    }
245
246    synchronized (tasks) {
247      if (!cancelled && (completedTasks.size() <= index)) unit.timedWait(tasks, timeout);
248      if (completedTasks.size() <= index) return null;
249    }
250    return completedTasks.get(index);
251  }
252
253  public void cancelAll() {
254    // Grab the lock on tasks so that cancelled is visible everywhere
255    synchronized (tasks) {
256      cancelled = true;
257    }
258    for (QueueingFuture<V> future : tasks) {
259      if (future != null) future.cancel(true);
260    }
261  }
262}