001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.util; 020 021import org.apache.yetus.audience.InterfaceAudience; 022 023import java.util.Comparator; 024import java.util.concurrent.BlockingQueue; 025import java.util.concurrent.PriorityBlockingQueue; 026import java.util.concurrent.TimeUnit; 027import java.util.concurrent.locks.Condition; 028import java.util.concurrent.locks.Lock; 029import java.util.concurrent.locks.ReentrantLock; 030 031/** 032 * This queue allows a ThreadPoolExecutor to steal jobs from another ThreadPoolExecutor. 033 * This queue also acts as the factory for creating the PriorityBlockingQueue to be used in the 034 * steal-from ThreadPoolExecutor. The behavior of this queue is the same as a normal 035 * PriorityBlockingQueue except the take/poll(long,TimeUnit) methods would also check whether there 036 * are jobs in the steal-from queue if this q ueue is empty. 037 * 038 * Note the workers in ThreadPoolExecutor must be pre-started so that they can steal job from the 039 * other queue, otherwise the worker will only be started after there are jobs submitted to main 040 * queue. 041 */ 042@InterfaceAudience.Private 043public class StealJobQueue<T> extends PriorityBlockingQueue<T> { 044 045 private static final long serialVersionUID = -6334572230936888291L; 046 047 private BlockingQueue<T> stealFromQueue; 048 049 private final Lock lock = new ReentrantLock(); 050 private final transient Condition notEmpty = lock.newCondition(); 051 052 public StealJobQueue(Comparator<? super T> comparator) { 053 this(11, 11, comparator); 054 } 055 056 public StealJobQueue(int initCapacity, int stealFromQueueInitCapacity, 057 Comparator<? super T> comparator) { 058 super(initCapacity, comparator); 059 this.stealFromQueue = new PriorityBlockingQueue<T>(stealFromQueueInitCapacity, comparator) { 060 061 private static final long serialVersionUID = -6805567216580184701L; 062 063 @Override 064 public boolean offer(T t) { 065 lock.lock(); 066 try { 067 notEmpty.signal(); 068 return super.offer(t); 069 } finally { 070 lock.unlock(); 071 } 072 } 073 }; 074 } 075 076 /** 077 * Get a queue whose job might be stolen by the consumer of this original queue 078 * @return the queue whose job could be stolen 079 */ 080 public BlockingQueue<T> getStealFromQueue() { 081 return stealFromQueue; 082 } 083 084 @Override 085 public boolean offer(T t) { 086 lock.lock(); 087 try { 088 notEmpty.signal(); 089 return super.offer(t); 090 } finally { 091 lock.unlock(); 092 } 093 } 094 095 096 @Override 097 public T take() throws InterruptedException { 098 lock.lockInterruptibly(); 099 try { 100 while (true) { 101 T retVal = this.poll(); 102 if (retVal == null) { 103 retVal = stealFromQueue.poll(); 104 } 105 if (retVal == null) { 106 notEmpty.await(); 107 } else { 108 return retVal; 109 } 110 } 111 } finally { 112 lock.unlock(); 113 } 114 } 115 116 @Override 117 public T poll(long timeout, TimeUnit unit) throws InterruptedException { 118 long nanos = unit.toNanos(timeout); 119 lock.lockInterruptibly(); 120 try { 121 while (true) { 122 T retVal = this.poll(); 123 if (retVal == null) { 124 retVal = stealFromQueue.poll(); 125 } 126 if (retVal == null) { 127 if (nanos <= 0) 128 return null; 129 nanos = notEmpty.awaitNanos(nanos); 130 } else { 131 return retVal; 132 } 133 } 134 } finally { 135 lock.unlock(); 136 } 137 } 138} 139