001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util; 019 020import java.util.ArrayList; 021import java.util.List; 022import java.util.concurrent.ArrayBlockingQueue; 023import java.util.concurrent.BlockingQueue; 024import java.util.concurrent.Callable; 025import java.util.concurrent.Executor; 026import java.util.concurrent.Future; 027import java.util.concurrent.FutureTask; 028import java.util.concurrent.TimeUnit; 029import org.apache.yetus.audience.InterfaceAudience; 030 031/** 032 * A completion service, close to the one available in the JDK 1.7 However, this ones keeps the list 033 * of the future, and allows to cancel them all. This means as well that it can be used for a small 034 * set of tasks only. <br> 035 * Implementation is not Thread safe. 036 */ 037@InterfaceAudience.Private 038public class BoundedCompletionService<V> { 039 private final Executor executor; 040 private final List<Future<V>> tasks; // alls the tasks 041 private final BlockingQueue<Future<V>> completed; // all the tasks that are completed 042 043 class QueueingFuture extends FutureTask<V> { 044 045 public QueueingFuture(Callable<V> callable) { 046 super(callable); 047 } 048 049 @Override 050 protected void done() { 051 completed.add(QueueingFuture.this); 052 } 053 } 054 055 public BoundedCompletionService(Executor executor, int maxTasks) { 056 this.executor = executor; 057 this.tasks = new ArrayList<>(maxTasks); 058 this.completed = new ArrayBlockingQueue<>(maxTasks); 059 } 060 061 public Future<V> submit(Callable<V> task) { 062 QueueingFuture newFuture = new QueueingFuture(task); 063 executor.execute(newFuture); 064 tasks.add(newFuture); 065 return newFuture; 066 } 067 068 public Future<V> take() throws InterruptedException { 069 return completed.take(); 070 } 071 072 public Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException { 073 return completed.poll(timeout, unit); 074 } 075 076 public void cancelAll(boolean interrupt) { 077 for (Future<V> future : tasks) { 078 future.cancel(interrupt); 079 } 080 } 081}