001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.normalizer; 019 020import java.util.Collection; 021import java.util.Collections; 022import java.util.Iterator; 023import java.util.LinkedHashSet; 024import java.util.Queue; 025import java.util.Set; 026import java.util.concurrent.BlockingQueue; 027import java.util.concurrent.locks.Condition; 028import java.util.concurrent.locks.ReentrantLock; 029import org.apache.yetus.audience.InterfaceAudience; 030 031/** 032 * A specialized collection that holds pending work for the {@link RegionNormalizerWorker}. It is 033 * an ordered collection class that has the following properties: 034 * <ul> 035 * <li>Guarantees uniqueness of elements, as a {@link Set}.</li> 036 * <li>Consumers retrieve objects from the head, as a {@link Queue}, via {@link #take()}.</li> 037 * <li>Work is retrieved on a FIFO policy.</li> 038 * <li>Work retrieval blocks the calling thread until new work is available, as a 039 * {@link BlockingQueue}.</li> 040 * <li>Allows a producer to insert an item at the head of the queue, if desired.</li> 041 * </ul> 042 * Assumes low-frequency and low-parallelism concurrent access, so protects state using a 043 * simplistic synchronization strategy. 044 */ 045@InterfaceAudience.Private 046class RegionNormalizerWorkQueue<E> { 047 048 /** Underlying storage structure that gives us the Set behavior and FIFO retrieval policy. */ 049 private LinkedHashSet<E> delegate; 050 051 // the locking structure used here follows the example found in LinkedBlockingQueue. The 052 // difference is that our locks guard access to `delegate` rather than the head node. 053 054 /** Lock held by take, poll, etc */ 055 private final ReentrantLock takeLock; 056 057 /** Wait queue for waiting takes */ 058 private final Condition notEmpty; 059 060 /** Lock held by put, offer, etc */ 061 private final ReentrantLock putLock; 062 063 RegionNormalizerWorkQueue() { 064 delegate = new LinkedHashSet<>(); 065 takeLock = new ReentrantLock(); 066 notEmpty = takeLock.newCondition(); 067 putLock = new ReentrantLock(); 068 } 069 070 /** 071 * Signals a waiting take. Called only from put/offer (which do not 072 * otherwise ordinarily lock takeLock.) 073 */ 074 private void signalNotEmpty() { 075 final ReentrantLock takeLock = this.takeLock; 076 takeLock.lock(); 077 try { 078 notEmpty.signal(); 079 } finally { 080 takeLock.unlock(); 081 } 082 } 083 084 /** 085 * Locks to prevent both puts and takes. 086 */ 087 private void fullyLock() { 088 putLock.lock(); 089 takeLock.lock(); 090 } 091 092 /** 093 * Unlocks to allow both puts and takes. 094 */ 095 private void fullyUnlock() { 096 takeLock.unlock(); 097 putLock.unlock(); 098 } 099 100 /** 101 * Inserts the specified element at the tail of the queue, if it's not already present. 102 * 103 * @param e the element to add 104 */ 105 public void put(E e) { 106 if (e == null) { 107 throw new NullPointerException(); 108 } 109 110 putLock.lock(); 111 try { 112 delegate.add(e); 113 } finally { 114 putLock.unlock(); 115 } 116 117 if (!delegate.isEmpty()) { 118 signalNotEmpty(); 119 } 120 } 121 122 /** 123 * Inserts the specified element at the head of the queue. 124 * 125 * @param e the element to add 126 */ 127 public void putFirst(E e) { 128 if (e == null) { 129 throw new NullPointerException(); 130 } 131 putAllFirst(Collections.singleton(e)); 132 } 133 134 /** 135 * Inserts the specified elements at the tail of the queue. Any elements already present in 136 * the queue are ignored. 137 * 138 * @param c the elements to add 139 */ 140 public void putAll(Collection<? extends E> c) { 141 if (c == null) { 142 throw new NullPointerException(); 143 } 144 145 putLock.lock(); 146 try { 147 delegate.addAll(c); 148 } finally { 149 putLock.unlock(); 150 } 151 152 if (!delegate.isEmpty()) { 153 signalNotEmpty(); 154 } 155 } 156 157 /** 158 * Inserts the specified elements at the head of the queue. 159 * 160 * @param c the elements to add 161 */ 162 public void putAllFirst(Collection<? extends E> c) { 163 if (c == null) { 164 throw new NullPointerException(); 165 } 166 167 fullyLock(); 168 try { 169 final LinkedHashSet<E> copy = new LinkedHashSet<>(c.size() + delegate.size()); 170 copy.addAll(c); 171 copy.addAll(delegate); 172 delegate = copy; 173 } finally { 174 fullyUnlock(); 175 } 176 177 if (!delegate.isEmpty()) { 178 signalNotEmpty(); 179 } 180 } 181 182 /** 183 * Retrieves and removes the head of this queue, waiting if necessary 184 * until an element becomes available. 185 * 186 * @return the head of this queue 187 * @throws InterruptedException if interrupted while waiting 188 */ 189 public E take() throws InterruptedException { 190 E x; 191 takeLock.lockInterruptibly(); 192 try { 193 while (delegate.isEmpty()) { 194 notEmpty.await(); 195 } 196 final Iterator<E> iter = delegate.iterator(); 197 x = iter.next(); 198 iter.remove(); 199 if (!delegate.isEmpty()) { 200 notEmpty.signal(); 201 } 202 } finally { 203 takeLock.unlock(); 204 } 205 return x; 206 } 207 208 /** 209 * Atomically removes all of the elements from this queue. 210 * The queue will be empty after this call returns. 211 */ 212 public void clear() { 213 putLock.lock(); 214 try { 215 delegate.clear(); 216 } finally { 217 putLock.unlock(); 218 } 219 } 220 221 /** 222 * Returns the number of elements in this queue. 223 * 224 * @return the number of elements in this queue 225 */ 226 public int size() { 227 takeLock.lock(); 228 try { 229 return delegate.size(); 230 } finally { 231 takeLock.unlock(); 232 } 233 } 234 235 @Override 236 public String toString() { 237 takeLock.lock(); 238 try { 239 return delegate.toString(); 240 } finally { 241 takeLock.unlock(); 242 } 243 } 244}