001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.normalizer;
019
020import java.util.Collection;
021import java.util.Collections;
022import java.util.Iterator;
023import java.util.LinkedHashSet;
024import java.util.Queue;
025import java.util.Set;
026import java.util.concurrent.BlockingQueue;
027import java.util.concurrent.locks.Condition;
028import java.util.concurrent.locks.ReentrantReadWriteLock;
029import org.apache.yetus.audience.InterfaceAudience;
030
031/**
032 * A specialized collection that holds pending work for the {@link RegionNormalizerWorker}. It is an
033 * ordered collection class that has the following properties:
034 * <ul>
035 * <li>Guarantees uniqueness of elements, as a {@link Set}.</li>
036 * <li>Consumers retrieve objects from the head, as a {@link Queue}, via {@link #take()}.</li>
037 * <li>Work is retrieved on a FIFO policy.</li>
038 * <li>Work retrieval blocks the calling thread until new work is available, as a
039 * {@link BlockingQueue}.</li>
040 * <li>Allows a producer to insert an item at the head of the queue, if desired.</li>
041 * </ul>
042 */
043@InterfaceAudience.Private
044class RegionNormalizerWorkQueue<E> {
045
046  /** Underlying storage structure that gives us the Set behavior and FIFO retrieval policy. */
047  private LinkedHashSet<E> delegate;
048
049  /** Lock for puts and takes **/
050  private final ReentrantReadWriteLock lock;
051  /** Wait queue for waiting takes */
052  private final Condition notEmpty;
053
054  RegionNormalizerWorkQueue() {
055    delegate = new LinkedHashSet<>();
056    lock = new ReentrantReadWriteLock();
057    notEmpty = lock.writeLock().newCondition();
058  }
059
060  /**
061   * Inserts the specified element at the tail of the queue, if it's not already present.
062   * @param e the element to add
063   */
064  public void put(E e) {
065    if (e == null) {
066      throw new NullPointerException();
067    }
068    lock.writeLock().lock();
069    try {
070      delegate.add(e);
071      if (!delegate.isEmpty()) {
072        notEmpty.signal();
073      }
074    } finally {
075      lock.writeLock().unlock();
076    }
077  }
078
079  /**
080   * Inserts the specified element at the head of the queue.
081   * @param e the element to add
082   */
083  public void putFirst(E e) {
084    if (e == null) {
085      throw new NullPointerException();
086    }
087    putAllFirst(Collections.singleton(e));
088  }
089
090  /**
091   * Inserts the specified elements at the tail of the queue. Any elements already present in the
092   * queue are ignored.
093   * @param c the elements to add
094   */
095  public void putAll(Collection<? extends E> c) {
096    if (c == null) {
097      throw new NullPointerException();
098    }
099    lock.writeLock().lock();
100    try {
101      delegate.addAll(c);
102      if (!delegate.isEmpty()) {
103        notEmpty.signal();
104      }
105    } finally {
106      lock.writeLock().unlock();
107    }
108  }
109
110  /**
111   * Inserts the specified elements at the head of the queue.
112   * @param c the elements to add
113   */
114  public void putAllFirst(Collection<? extends E> c) {
115    if (c == null) {
116      throw new NullPointerException();
117    }
118    lock.writeLock().lock();
119    try {
120      final LinkedHashSet<E> copy = new LinkedHashSet<>(c.size() + delegate.size());
121      copy.addAll(c);
122      copy.addAll(delegate);
123      delegate = copy;
124      if (!delegate.isEmpty()) {
125        notEmpty.signal();
126      }
127    } finally {
128      lock.writeLock().unlock();
129    }
130  }
131
132  /**
133   * Retrieves and removes the head of this queue, waiting if necessary until an element becomes
134   * available.
135   * @return the head of this queue
136   * @throws InterruptedException if interrupted while waiting
137   */
138  public E take() throws InterruptedException {
139    E x;
140    // Take a write lock. If the delegate's queue is empty we need it to await(), which will
141    // drop the lock, then reacquire it; or if the queue is not empty we will use an iterator
142    // to mutate the head.
143    lock.writeLock().lockInterruptibly();
144    try {
145      while (delegate.isEmpty()) {
146        notEmpty.await(); // await drops the lock, then reacquires it
147      }
148      final Iterator<E> iter = delegate.iterator();
149      x = iter.next();
150      iter.remove();
151      if (!delegate.isEmpty()) {
152        notEmpty.signal();
153      }
154    } finally {
155      lock.writeLock().unlock();
156    }
157    return x;
158  }
159
160  /**
161   * Atomically removes all of the elements from this queue. The queue will be empty after this call
162   * returns.
163   */
164  public void clear() {
165    lock.writeLock().lock();
166    try {
167      delegate.clear();
168    } finally {
169      lock.writeLock().unlock();
170    }
171  }
172
173  /**
174   * Returns the number of elements in this queue.
175   * @return the number of elements in this queue
176   */
177  public int size() {
178    lock.readLock().lock();
179    try {
180      return delegate.size();
181    } finally {
182      lock.readLock().unlock();
183    }
184  }
185
186  @Override
187  public String toString() {
188    lock.readLock().lock();
189    try {
190      return delegate.toString();
191    } finally {
192      lock.readLock().unlock();
193    }
194  }
195}