001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.normalizer; 019 020import java.util.Collection; 021import java.util.Collections; 022import java.util.Iterator; 023import java.util.LinkedHashSet; 024import java.util.Queue; 025import java.util.Set; 026import java.util.concurrent.BlockingQueue; 027import java.util.concurrent.locks.Condition; 028import java.util.concurrent.locks.ReentrantReadWriteLock; 029import org.apache.yetus.audience.InterfaceAudience; 030 031/** 032 * A specialized collection that holds pending work for the {@link RegionNormalizerWorker}. It is an 033 * ordered collection class that has the following properties: 034 * <ul> 035 * <li>Guarantees uniqueness of elements, as a {@link Set}.</li> 036 * <li>Consumers retrieve objects from the head, as a {@link Queue}, via {@link #take()}.</li> 037 * <li>Work is retrieved on a FIFO policy.</li> 038 * <li>Work retrieval blocks the calling thread until new work is available, as a 039 * {@link BlockingQueue}.</li> 040 * <li>Allows a producer to insert an item at the head of the queue, if desired.</li> 041 * </ul> 042 */ 043@InterfaceAudience.Private 044class RegionNormalizerWorkQueue<E> { 045 046 /** Underlying storage structure that gives us the Set behavior and FIFO retrieval policy. */ 047 private LinkedHashSet<E> delegate; 048 049 /** Lock for puts and takes **/ 050 private final ReentrantReadWriteLock lock; 051 /** Wait queue for waiting takes */ 052 private final Condition notEmpty; 053 054 RegionNormalizerWorkQueue() { 055 delegate = new LinkedHashSet<>(); 056 lock = new ReentrantReadWriteLock(); 057 notEmpty = lock.writeLock().newCondition(); 058 } 059 060 /** 061 * Inserts the specified element at the tail of the queue, if it's not already present. 062 * @param e the element to add 063 */ 064 public void put(E e) { 065 if (e == null) { 066 throw new NullPointerException(); 067 } 068 lock.writeLock().lock(); 069 try { 070 delegate.add(e); 071 if (!delegate.isEmpty()) { 072 notEmpty.signal(); 073 } 074 } finally { 075 lock.writeLock().unlock(); 076 } 077 } 078 079 /** 080 * Inserts the specified element at the head of the queue. 081 * @param e the element to add 082 */ 083 public void putFirst(E e) { 084 if (e == null) { 085 throw new NullPointerException(); 086 } 087 putAllFirst(Collections.singleton(e)); 088 } 089 090 /** 091 * Inserts the specified elements at the tail of the queue. Any elements already present in the 092 * queue are ignored. 093 * @param c the elements to add 094 */ 095 public void putAll(Collection<? extends E> c) { 096 if (c == null) { 097 throw new NullPointerException(); 098 } 099 lock.writeLock().lock(); 100 try { 101 delegate.addAll(c); 102 if (!delegate.isEmpty()) { 103 notEmpty.signal(); 104 } 105 } finally { 106 lock.writeLock().unlock(); 107 } 108 } 109 110 /** 111 * Inserts the specified elements at the head of the queue. 112 * @param c the elements to add 113 */ 114 public void putAllFirst(Collection<? extends E> c) { 115 if (c == null) { 116 throw new NullPointerException(); 117 } 118 lock.writeLock().lock(); 119 try { 120 final LinkedHashSet<E> copy = new LinkedHashSet<>(c.size() + delegate.size()); 121 copy.addAll(c); 122 copy.addAll(delegate); 123 delegate = copy; 124 if (!delegate.isEmpty()) { 125 notEmpty.signal(); 126 } 127 } finally { 128 lock.writeLock().unlock(); 129 } 130 } 131 132 /** 133 * Retrieves and removes the head of this queue, waiting if necessary until an element becomes 134 * available. 135 * @return the head of this queue 136 * @throws InterruptedException if interrupted while waiting 137 */ 138 public E take() throws InterruptedException { 139 E x; 140 // Take a write lock. If the delegate's queue is empty we need it to await(), which will 141 // drop the lock, then reacquire it; or if the queue is not empty we will use an iterator 142 // to mutate the head. 143 lock.writeLock().lockInterruptibly(); 144 try { 145 while (delegate.isEmpty()) { 146 notEmpty.await(); // await drops the lock, then reacquires it 147 } 148 final Iterator<E> iter = delegate.iterator(); 149 x = iter.next(); 150 iter.remove(); 151 if (!delegate.isEmpty()) { 152 notEmpty.signal(); 153 } 154 } finally { 155 lock.writeLock().unlock(); 156 } 157 return x; 158 } 159 160 /** 161 * Atomically removes all of the elements from this queue. The queue will be empty after this call 162 * returns. 163 */ 164 public void clear() { 165 lock.writeLock().lock(); 166 try { 167 delegate.clear(); 168 } finally { 169 lock.writeLock().unlock(); 170 } 171 } 172 173 /** 174 * Returns the number of elements in this queue. 175 * @return the number of elements in this queue 176 */ 177 public int size() { 178 lock.readLock().lock(); 179 try { 180 return delegate.size(); 181 } finally { 182 lock.readLock().unlock(); 183 } 184 } 185 186 @Override 187 public String toString() { 188 lock.readLock().lock(); 189 try { 190 return delegate.toString(); 191 } finally { 192 lock.readLock().unlock(); 193 } 194 } 195}