001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.util;
020
021import org.apache.yetus.audience.InterfaceAudience;
022
023import java.util.Comparator;
024import java.util.concurrent.BlockingQueue;
025import java.util.concurrent.PriorityBlockingQueue;
026import java.util.concurrent.TimeUnit;
027import java.util.concurrent.locks.Condition;
028import java.util.concurrent.locks.Lock;
029import java.util.concurrent.locks.ReentrantLock;
030
031/**
032 * This queue allows a ThreadPoolExecutor to steal jobs from another ThreadPoolExecutor.
033 * This queue also acts as the factory for creating the PriorityBlockingQueue to be used in the
034 * steal-from ThreadPoolExecutor. The behavior of this queue is the same as a normal
035 * PriorityBlockingQueue except the take/poll(long,TimeUnit) methods would also check whether there
036 * are jobs in the steal-from queue if this q ueue is empty.
037 *
038 * Note the workers in ThreadPoolExecutor must be pre-started so that they can steal job from the
039 * other queue, otherwise the worker will only be started after there are jobs submitted to main
040 * queue.
041 */
042@InterfaceAudience.Private
043public class StealJobQueue<T> extends PriorityBlockingQueue<T> {
044
045  private static final long serialVersionUID = -6334572230936888291L;
046
047  private BlockingQueue<T> stealFromQueue;
048
049  private final Lock lock = new ReentrantLock();
050  private final transient Condition notEmpty = lock.newCondition();
051
052  public StealJobQueue(Comparator<? super T> comparator) {
053    this(11, 11, comparator);
054  }
055
056  public StealJobQueue(int initCapacity, int stealFromQueueInitCapacity,
057      Comparator<? super T> comparator) {
058    super(initCapacity, comparator);
059    this.stealFromQueue = new PriorityBlockingQueue<T>(stealFromQueueInitCapacity, comparator) {
060
061      private static final long serialVersionUID = -6805567216580184701L;
062
063      @Override
064      public boolean offer(T t) {
065        lock.lock();
066        try {
067          notEmpty.signal();
068          return super.offer(t);
069        } finally {
070          lock.unlock();
071        }
072      }
073    };
074  }
075
076  /**
077   * Get a queue whose job might be stolen by the consumer of this original queue
078   * @return the queue whose job could be stolen
079   */
080  public BlockingQueue<T> getStealFromQueue() {
081    return stealFromQueue;
082  }
083
084  @Override
085  public boolean offer(T t) {
086    lock.lock();
087    try {
088      notEmpty.signal();
089      return super.offer(t);
090    } finally {
091      lock.unlock();
092    }
093  }
094
095
096  @Override
097  public T take() throws InterruptedException {
098    lock.lockInterruptibly();
099    try {
100      while (true) {
101        T retVal = this.poll();
102        if (retVal == null) {
103          retVal = stealFromQueue.poll();
104        }
105        if (retVal == null) {
106          notEmpty.await();
107        } else {
108          return retVal;
109        }
110      }
111    } finally {
112      lock.unlock();
113    }
114  }
115
116  @Override
117  public T poll(long timeout, TimeUnit unit) throws InterruptedException {
118    long nanos = unit.toNanos(timeout);
119    lock.lockInterruptibly();
120    try {
121      while (true) {
122        T retVal = this.poll();
123        if (retVal == null) {
124          retVal = stealFromQueue.poll();
125        }
126        if (retVal == null) {
127          if (nanos <= 0)
128            return null;
129          nanos = notEmpty.awaitNanos(nanos);
130        } else {
131          return retVal;
132        }
133      }
134    } finally {
135      lock.unlock();
136    }
137  }
138}
139