001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.normalizer;
019
020import java.util.Collection;
021import java.util.Collections;
022import java.util.Iterator;
023import java.util.LinkedHashSet;
024import java.util.Queue;
025import java.util.Set;
026import java.util.concurrent.BlockingQueue;
027import java.util.concurrent.locks.Condition;
028import java.util.concurrent.locks.ReentrantLock;
029import org.apache.yetus.audience.InterfaceAudience;
030
031/**
032 * A specialized collection that holds pending work for the {@link RegionNormalizerWorker}. It is
033 * an ordered collection class that has the following properties:
034 * <ul>
035 *   <li>Guarantees uniqueness of elements, as a {@link Set}.</li>
036 *   <li>Consumers retrieve objects from the head, as a {@link Queue}, via {@link #take()}.</li>
037 *   <li>Work is retrieved on a FIFO policy.</li>
038 *   <li>Work retrieval blocks the calling thread until new work is available, as a
039 *     {@link BlockingQueue}.</li>
040 *   <li>Allows a producer to insert an item at the head of the queue, if desired.</li>
041 * </ul>
042 * Assumes low-frequency and low-parallelism concurrent access, so protects state using a
043 * simplistic synchronization strategy.
044 */
045@InterfaceAudience.Private
046class RegionNormalizerWorkQueue<E> {
047
048  /** Underlying storage structure that gives us the Set behavior and FIFO retrieval policy. */
049  private LinkedHashSet<E> delegate;
050
051  // the locking structure used here follows the example found in LinkedBlockingQueue. The
052  // difference is that our locks guard access to `delegate` rather than the head node.
053
054  /** Lock held by take, poll, etc */
055  private final ReentrantLock takeLock;
056
057  /** Wait queue for waiting takes */
058  private final Condition notEmpty;
059
060  /** Lock held by put, offer, etc */
061  private final ReentrantLock putLock;
062
063  RegionNormalizerWorkQueue() {
064    delegate = new LinkedHashSet<>();
065    takeLock = new ReentrantLock();
066    notEmpty = takeLock.newCondition();
067    putLock = new ReentrantLock();
068  }
069
070  /**
071   * Signals a waiting take. Called only from put/offer (which do not
072   * otherwise ordinarily lock takeLock.)
073   */
074  private void signalNotEmpty() {
075    final ReentrantLock takeLock = this.takeLock;
076    takeLock.lock();
077    try {
078      notEmpty.signal();
079    } finally {
080      takeLock.unlock();
081    }
082  }
083
084  /**
085   * Locks to prevent both puts and takes.
086   */
087  private void fullyLock() {
088    putLock.lock();
089    takeLock.lock();
090  }
091
092  /**
093   * Unlocks to allow both puts and takes.
094   */
095  private void fullyUnlock() {
096    takeLock.unlock();
097    putLock.unlock();
098  }
099
100  /**
101   * Inserts the specified element at the tail of the queue, if it's not already present.
102   *
103   * @param e the element to add
104   */
105  public void put(E e) {
106    if (e == null) {
107      throw new NullPointerException();
108    }
109
110    putLock.lock();
111    try {
112      delegate.add(e);
113    } finally {
114      putLock.unlock();
115    }
116
117    if (!delegate.isEmpty()) {
118      signalNotEmpty();
119    }
120  }
121
122  /**
123   * Inserts the specified element at the head of the queue.
124   *
125   * @param e the element to add
126   */
127  public void putFirst(E e) {
128    if (e == null) {
129      throw new NullPointerException();
130    }
131    putAllFirst(Collections.singleton(e));
132  }
133
134  /**
135   * Inserts the specified elements at the tail of the queue. Any elements already present in
136   * the queue are ignored.
137   *
138   * @param c the elements to add
139   */
140  public void putAll(Collection<? extends E> c) {
141    if (c == null) {
142      throw new NullPointerException();
143    }
144
145    putLock.lock();
146    try {
147      delegate.addAll(c);
148    } finally {
149      putLock.unlock();
150    }
151
152    if (!delegate.isEmpty()) {
153      signalNotEmpty();
154    }
155  }
156
157  /**
158   * Inserts the specified elements at the head of the queue.
159   *
160   * @param c the elements to add
161   */
162  public void putAllFirst(Collection<? extends E> c) {
163    if (c == null) {
164      throw new NullPointerException();
165    }
166
167    fullyLock();
168    try {
169      final LinkedHashSet<E> copy = new LinkedHashSet<>(c.size() + delegate.size());
170      copy.addAll(c);
171      copy.addAll(delegate);
172      delegate = copy;
173    } finally {
174      fullyUnlock();
175    }
176
177    if (!delegate.isEmpty()) {
178      signalNotEmpty();
179    }
180  }
181
182  /**
183   * Retrieves and removes the head of this queue, waiting if necessary
184   * until an element becomes available.
185   *
186   * @return the head of this queue
187   * @throws InterruptedException if interrupted while waiting
188   */
189  public E take() throws InterruptedException {
190    E x;
191    takeLock.lockInterruptibly();
192    try {
193      while (delegate.isEmpty()) {
194        notEmpty.await();
195      }
196      final Iterator<E> iter = delegate.iterator();
197      x = iter.next();
198      iter.remove();
199      if (!delegate.isEmpty()) {
200        notEmpty.signal();
201      }
202    } finally {
203      takeLock.unlock();
204    }
205    return x;
206  }
207
208  /**
209   * Atomically removes all of the elements from this queue.
210   * The queue will be empty after this call returns.
211   */
212  public void clear() {
213    putLock.lock();
214    try {
215      delegate.clear();
216    } finally {
217      putLock.unlock();
218    }
219  }
220
221  /**
222   * Returns the number of elements in this queue.
223   *
224   * @return the number of elements in this queue
225   */
226  public int size() {
227    takeLock.lock();
228    try {
229      return delegate.size();
230    } finally {
231      takeLock.unlock();
232    }
233  }
234
235  @Override
236  public String toString() {
237    takeLock.lock();
238    try {
239      return delegate.toString();
240    } finally {
241      takeLock.unlock();
242    }
243  }
244}