001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import java.util.Comparator;
021import java.util.concurrent.BlockingQueue;
022import java.util.concurrent.PriorityBlockingQueue;
023import java.util.concurrent.TimeUnit;
024import java.util.concurrent.locks.Condition;
025import java.util.concurrent.locks.Lock;
026import java.util.concurrent.locks.ReentrantLock;
027import org.apache.yetus.audience.InterfaceAudience;
028
029/**
030 * This queue allows a ThreadPoolExecutor to steal jobs from another ThreadPoolExecutor. This queue
031 * also acts as the factory for creating the PriorityBlockingQueue to be used in the steal-from
032 * ThreadPoolExecutor. The behavior of this queue is the same as a normal PriorityBlockingQueue
033 * except the take/poll(long,TimeUnit) methods would also check whether there are jobs in the
034 * steal-from queue if this q ueue is empty. Note the workers in ThreadPoolExecutor must be
035 * pre-started so that they can steal job from the other queue, otherwise the worker will only be
036 * started after there are jobs submitted to main queue.
037 */
038@InterfaceAudience.Private
039public class StealJobQueue<T> extends PriorityBlockingQueue<T> {
040
041  private static final long serialVersionUID = -6334572230936888291L;
042
043  private BlockingQueue<T> stealFromQueue;
044
045  private final Lock lock = new ReentrantLock();
046  private final transient Condition notEmpty = lock.newCondition();
047
048  public StealJobQueue(Comparator<? super T> comparator) {
049    this(11, 11, comparator);
050  }
051
052  public StealJobQueue(int initCapacity, int stealFromQueueInitCapacity,
053    Comparator<? super T> comparator) {
054    super(initCapacity, comparator);
055    this.stealFromQueue = new PriorityBlockingQueue<T>(stealFromQueueInitCapacity, comparator) {
056
057      private static final long serialVersionUID = -6805567216580184701L;
058
059      @Override
060      public boolean offer(T t) {
061        lock.lock();
062        try {
063          notEmpty.signal();
064          return super.offer(t);
065        } finally {
066          lock.unlock();
067        }
068      }
069    };
070  }
071
072  /**
073   * Get a queue whose job might be stolen by the consumer of this original queue
074   * @return the queue whose job could be stolen
075   */
076  public BlockingQueue<T> getStealFromQueue() {
077    return stealFromQueue;
078  }
079
080  @Override
081  public boolean offer(T t) {
082    lock.lock();
083    try {
084      notEmpty.signal();
085      return super.offer(t);
086    } finally {
087      lock.unlock();
088    }
089  }
090
091  @Override
092  public T take() throws InterruptedException {
093    lock.lockInterruptibly();
094    try {
095      while (true) {
096        T retVal = this.poll();
097        if (retVal == null) {
098          retVal = stealFromQueue.poll();
099        }
100        if (retVal == null) {
101          notEmpty.await();
102        } else {
103          return retVal;
104        }
105      }
106    } finally {
107      lock.unlock();
108    }
109  }
110
111  @Override
112  public T poll(long timeout, TimeUnit unit) throws InterruptedException {
113    long nanos = unit.toNanos(timeout);
114    lock.lockInterruptibly();
115    try {
116      while (true) {
117        T retVal = this.poll();
118        if (retVal == null) {
119          retVal = stealFromQueue.poll();
120        }
121        if (retVal == null) {
122          if (nanos <= 0) return null;
123          nanos = notEmpty.awaitNanos(nanos);
124        } else {
125          return retVal;
126        }
127      }
128    } finally {
129      lock.unlock();
130    }
131  }
132}