001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.procedure; 019 020import org.apache.hadoop.hbase.util.AvlUtil.AvlIterableList; 021import org.apache.yetus.audience.InterfaceAudience; 022 023@InterfaceAudience.Private 024public class FairQueue<T extends Comparable<T>> { 025 026 private Queue<T> queueHead = null; 027 private int size = 0; 028 029 public boolean hasRunnables() { 030 return size > 0; 031 } 032 033 public void add(Queue<T> queue) { 034 // For normal priority queue, just append it to the tail 035 if (queueHead == null || queue.getPriority() == 1) { 036 queueHead = AvlIterableList.append(queueHead, queue); 037 size++; 038 return; 039 } 040 // Find the one which priority is less than us 041 // For now only TableQueue and ServerQueue has priority. For TableQueue there are only a small 042 // number of tables which have higher priority, and for ServerQueue there is only one server 043 // which could carry meta which leads to a higher priority, so this will not be an expensive 044 // operation. 045 Queue<T> base = queueHead; 046 do { 047 if (base.getPriority() < queue.getPriority()) { 048 queueHead = AvlIterableList.prepend(queueHead, base, queue); 049 size++; 050 return; 051 } 052 base = AvlIterableList.readNext(base); 053 } while (base != queueHead); 054 // no one is lower than us, append to the tail 055 queueHead = AvlIterableList.append(queueHead, queue); 056 size++; 057 } 058 059 public void remove(Queue<T> queue) { 060 queueHead = AvlIterableList.remove(queueHead, queue); 061 size--; 062 } 063 064 public Queue<T> poll() { 065 if (queueHead == null) { 066 return null; 067 } 068 Queue<T> q = queueHead; 069 do { 070 if (q.isAvailable()) { 071 if (q.getPriority() == 1) { 072 // for the normal priority queue, remove it and append it to the tail 073 queueHead = AvlIterableList.remove(queueHead, q); 074 queueHead = AvlIterableList.append(queueHead, q); 075 } 076 return q; 077 } 078 q = AvlIterableList.readNext(q); 079 } while (q != queueHead); 080 return null; 081 } 082}