001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019 020package org.apache.hadoop.hbase.util; 021 022 023import java.util.ArrayList; 024import java.util.List; 025import java.util.concurrent.ArrayBlockingQueue; 026import java.util.concurrent.BlockingQueue; 027import java.util.concurrent.Callable; 028import java.util.concurrent.Executor; 029import java.util.concurrent.Future; 030import java.util.concurrent.FutureTask; 031import java.util.concurrent.TimeUnit; 032 033import org.apache.yetus.audience.InterfaceAudience; 034 035/** 036 * A completion service, close to the one available in the JDK 1.7 037 * However, this ones keeps the list of the future, and allows to cancel them all. 038 * This means as well that it can be used for a small set of tasks only. 039 * <br>Implementation is not Thread safe. 040 */ 041@InterfaceAudience.Private 042public class BoundedCompletionService<V> { 043 private final Executor executor; 044 private final List<Future<V>> tasks; // alls the tasks 045 private final BlockingQueue<Future<V>> completed; // all the tasks that are completed 046 047 class QueueingFuture extends FutureTask<V> { 048 049 public QueueingFuture(Callable<V> callable) { 050 super(callable); 051 } 052 053 @Override 054 protected void done() { 055 completed.add(QueueingFuture.this); 056 } 057 } 058 059 public BoundedCompletionService(Executor executor, int maxTasks) { 060 this.executor = executor; 061 this.tasks = new ArrayList<>(maxTasks); 062 this.completed = new ArrayBlockingQueue<>(maxTasks); 063 } 064 065 066 public Future<V> submit(Callable<V> task) { 067 QueueingFuture newFuture = new QueueingFuture(task); 068 executor.execute(newFuture); 069 tasks.add(newFuture); 070 return newFuture; 071 } 072 073 public Future<V> take() throws InterruptedException{ 074 return completed.take(); 075 } 076 077 public Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException{ 078 return completed.poll(timeout, unit); 079 } 080 081 public void cancelAll(boolean interrupt) { 082 for (Future<V> future : tasks) { 083 future.cancel(interrupt); 084 } 085 } 086}