001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import java.util.ArrayList;
021import java.util.List;
022import java.util.concurrent.ArrayBlockingQueue;
023import java.util.concurrent.BlockingQueue;
024import java.util.concurrent.Callable;
025import java.util.concurrent.Executor;
026import java.util.concurrent.Future;
027import java.util.concurrent.FutureTask;
028import java.util.concurrent.TimeUnit;
029import org.apache.yetus.audience.InterfaceAudience;
030
031/**
032 * A completion service, close to the one available in the JDK 1.7 However, this ones keeps the list
033 * of the future, and allows to cancel them all. This means as well that it can be used for a small
034 * set of tasks only. <br>
035 * Implementation is not Thread safe.
036 */
037@InterfaceAudience.Private
038public class BoundedCompletionService<V> {
039  private final Executor executor;
040  private final List<Future<V>> tasks; // alls the tasks
041  private final BlockingQueue<Future<V>> completed; // all the tasks that are completed
042
043  class QueueingFuture extends FutureTask<V> {
044
045    public QueueingFuture(Callable<V> callable) {
046      super(callable);
047    }
048
049    @Override
050    protected void done() {
051      completed.add(QueueingFuture.this);
052    }
053  }
054
055  public BoundedCompletionService(Executor executor, int maxTasks) {
056    this.executor = executor;
057    this.tasks = new ArrayList<>(maxTasks);
058    this.completed = new ArrayBlockingQueue<>(maxTasks);
059  }
060
061  public Future<V> submit(Callable<V> task) {
062    QueueingFuture newFuture = new QueueingFuture(task);
063    executor.execute(newFuture);
064    tasks.add(newFuture);
065    return newFuture;
066  }
067
068  public Future<V> take() throws InterruptedException {
069    return completed.take();
070  }
071
072  public Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException {
073    return completed.poll(timeout, unit);
074  }
075
076  public void cancelAll(boolean interrupt) {
077    for (Future<V> future : tasks) {
078      future.cancel(interrupt);
079    }
080  }
081}