001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util; 019 020import java.util.Comparator; 021import java.util.concurrent.BlockingQueue; 022import java.util.concurrent.PriorityBlockingQueue; 023import java.util.concurrent.TimeUnit; 024import java.util.concurrent.locks.Condition; 025import java.util.concurrent.locks.Lock; 026import java.util.concurrent.locks.ReentrantLock; 027import org.apache.yetus.audience.InterfaceAudience; 028 029/** 030 * This queue allows a ThreadPoolExecutor to steal jobs from another ThreadPoolExecutor. This queue 031 * also acts as the factory for creating the PriorityBlockingQueue to be used in the steal-from 032 * ThreadPoolExecutor. The behavior of this queue is the same as a normal PriorityBlockingQueue 033 * except the take/poll(long,TimeUnit) methods would also check whether there are jobs in the 034 * steal-from queue if this q ueue is empty. Note the workers in ThreadPoolExecutor must be 035 * pre-started so that they can steal job from the other queue, otherwise the worker will only be 036 * started after there are jobs submitted to main queue. 037 */ 038@InterfaceAudience.Private 039public class StealJobQueue<T> extends PriorityBlockingQueue<T> { 040 041 private static final long serialVersionUID = -6334572230936888291L; 042 043 private BlockingQueue<T> stealFromQueue; 044 045 private final Lock lock = new ReentrantLock(); 046 private final transient Condition notEmpty = lock.newCondition(); 047 048 public StealJobQueue(Comparator<? super T> comparator) { 049 this(11, 11, comparator); 050 } 051 052 public StealJobQueue(int initCapacity, int stealFromQueueInitCapacity, 053 Comparator<? super T> comparator) { 054 super(initCapacity, comparator); 055 this.stealFromQueue = new PriorityBlockingQueue<T>(stealFromQueueInitCapacity, comparator) { 056 057 private static final long serialVersionUID = -6805567216580184701L; 058 059 @Override 060 public boolean offer(T t) { 061 lock.lock(); 062 try { 063 notEmpty.signal(); 064 return super.offer(t); 065 } finally { 066 lock.unlock(); 067 } 068 } 069 }; 070 } 071 072 /** 073 * Get a queue whose job might be stolen by the consumer of this original queue 074 * @return the queue whose job could be stolen 075 */ 076 public BlockingQueue<T> getStealFromQueue() { 077 return stealFromQueue; 078 } 079 080 @Override 081 public boolean offer(T t) { 082 lock.lock(); 083 try { 084 notEmpty.signal(); 085 return super.offer(t); 086 } finally { 087 lock.unlock(); 088 } 089 } 090 091 @Override 092 public T take() throws InterruptedException { 093 lock.lockInterruptibly(); 094 try { 095 while (true) { 096 T retVal = this.poll(); 097 if (retVal == null) { 098 retVal = stealFromQueue.poll(); 099 } 100 if (retVal == null) { 101 notEmpty.await(); 102 } else { 103 return retVal; 104 } 105 } 106 } finally { 107 lock.unlock(); 108 } 109 } 110 111 @Override 112 public T poll(long timeout, TimeUnit unit) throws InterruptedException { 113 long nanos = unit.toNanos(timeout); 114 lock.lockInterruptibly(); 115 try { 116 while (true) { 117 T retVal = this.poll(); 118 if (retVal == null) { 119 retVal = stealFromQueue.poll(); 120 } 121 if (retVal == null) { 122 if (nanos <= 0) return null; 123 nanos = notEmpty.awaitNanos(nanos); 124 } else { 125 return retVal; 126 } 127 } 128 } finally { 129 lock.unlock(); 130 } 131 } 132}