View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.util;
20  
21  import org.apache.hadoop.hbase.classification.InterfaceAudience;
22  
23  import java.util.concurrent.BlockingQueue;
24  import java.util.concurrent.PriorityBlockingQueue;
25  import java.util.concurrent.TimeUnit;
26  import java.util.concurrent.locks.Condition;
27  import java.util.concurrent.locks.Lock;
28  import java.util.concurrent.locks.ReentrantLock;
29  
30  /**
31   * This queue allows a ThreadPoolExecutor to steal jobs from another ThreadPoolExecutor.
32   * This queue also acts as the factory for creating the PriorityBlockingQueue to be used in the
33   * steal-from ThreadPoolExecutor. The behavior of this queue is the same as a normal
34   * PriorityBlockingQueue except the take/poll(long,TimeUnit) methods would also check whether there
35   * are jobs in the steal-from queue if this q ueue is empty.
36   *
37   * Note the workers in ThreadPoolExecutor must be pre-started so that they can steal job from the
38   * other queue, otherwise the worker will only be started after there are jobs submitted to main
39   * queue.
40   */
41  @InterfaceAudience.Private
42  public class StealJobQueue<T> extends PriorityBlockingQueue<T> {
43  
44    private BlockingQueue<T> stealFromQueue;
45  
46    private final Lock lock = new ReentrantLock();
47    private final Condition notEmpty = lock.newCondition();
48  
49    public StealJobQueue() {
50      this.stealFromQueue = new PriorityBlockingQueue<T>() {
51        @Override
52        public boolean offer(T t) {
53          lock.lock();
54          try {
55            notEmpty.signal();
56            return super.offer(t);
57          } finally {
58            lock.unlock();
59          }
60        }
61      };
62    }
63  
64    public BlockingQueue<T> getStealFromQueue() {
65      return stealFromQueue;
66    }
67  
68    @Override
69    public boolean offer(T t) {
70      lock.lock();
71      try {
72        notEmpty.signal();
73        return super.offer(t);
74      } finally {
75        lock.unlock();
76      }
77    }
78  
79  
80    @Override
81    public T take() throws InterruptedException {
82      lock.lockInterruptibly();
83      try {
84        while (true) {
85          T retVal = this.poll();
86          if (retVal == null) {
87            retVal = stealFromQueue.poll();
88          }
89          if (retVal == null) {
90            notEmpty.await();
91          } else {
92            return retVal;
93          }
94        }
95      } finally {
96        lock.unlock();
97      }
98    }
99  
100   @Override
101   public T poll(long timeout, TimeUnit unit) throws InterruptedException {
102     long nanos = unit.toNanos(timeout);
103     lock.lockInterruptibly();
104     try {
105       while (true) {
106         T retVal = this.poll();
107         if (retVal == null) {
108           retVal = stealFromQueue.poll();
109         }
110         if (retVal == null) {
111           if (nanos <= 0)
112             return null;
113           nanos = notEmpty.awaitNanos(nanos);
114         } else {
115           return retVal;
116         }
117       }
118     } finally {
119       lock.unlock();
120     }
121   }
122 }
123