View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.util;
21  
22  
23  import java.util.ArrayList;
24  import java.util.List;
25  import java.util.concurrent.ArrayBlockingQueue;
26  import java.util.concurrent.BlockingQueue;
27  import java.util.concurrent.Callable;
28  import java.util.concurrent.Executor;
29  import java.util.concurrent.Future;
30  import java.util.concurrent.FutureTask;
31  import java.util.concurrent.TimeUnit;
32  
33  import org.apache.hadoop.hbase.classification.InterfaceAudience;
34  
35  /**
36   * A completion service, close to the one available in the JDK 1.7
37   * However, this ones keeps the list of the future, and allows to cancel them all.
38   * This means as well that it can be used for a small set of tasks only.
39   * <br>Implementation is not Thread safe.
40   */
41  @InterfaceAudience.Private
42  public class BoundedCompletionService<V> {
43    private final Executor executor;
44    private final List<Future<V>> tasks; // alls the tasks
45    private final BlockingQueue<Future<V>> completed; // all the tasks that are completed
46  
47    class QueueingFuture extends FutureTask<V> {
48  
49      public QueueingFuture(Callable<V> callable) {
50        super(callable);
51      }
52  
53      @Override
54      protected void done() {
55        completed.add(QueueingFuture.this);
56      }
57    }
58  
59    public BoundedCompletionService(Executor executor, int maxTasks) {
60      this.executor = executor;
61      this.tasks = new ArrayList<Future<V>>(maxTasks);
62      this.completed = new ArrayBlockingQueue<Future<V>>(maxTasks);
63    }
64  
65  
66    public Future<V> submit(Callable<V> task) {
67      QueueingFuture newFuture = new QueueingFuture(task);
68      executor.execute(newFuture);
69      tasks.add(newFuture);
70      return newFuture;
71    }
72  
73    public  Future<V> take() throws InterruptedException{
74      return completed.take();
75    }
76  
77    public Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException{
78      return completed.poll(timeout, unit);
79    }
80  
81    public void cancelAll(boolean interrupt) {
82      for (Future<V> future : tasks) {
83        future.cancel(interrupt);
84      }
85    }
86  }