001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.util;
020
021import java.util.Collection;
022import java.util.Iterator;
023import java.util.concurrent.BlockingQueue;
024import java.util.concurrent.DelayQueue;
025import java.util.concurrent.Delayed;
026import java.util.concurrent.TimeUnit;
027
028import org.apache.yetus.audience.InterfaceAudience;
029
030/**
031 * A blocking queue implementation for adding a constant delay. Uses a DelayQueue as a backing store
032 * @param <E> type of elements
033 */
034@InterfaceAudience.Private
035public class ConstantDelayQueue<E> implements BlockingQueue<E> {
036
037  private static final class DelayedElement<T> implements Delayed {
038    T element;
039    long end;
040    public DelayedElement(T element, long delayMs) {
041      this.element = element;
042      this.end = EnvironmentEdgeManager.currentTime() + delayMs;
043    }
044
045    @Override
046    public int compareTo(Delayed o) {
047      long cmp = getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS);
048      return cmp == 0 ? 0 : ( cmp < 0 ? -1 : 1);
049    }
050
051    @Override
052    public long getDelay(TimeUnit unit) {
053      return unit.convert(end - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
054    }
055  }
056
057  private final long delayMs;
058
059  // backing DelayQueue
060  private DelayQueue<DelayedElement<E>> queue = new DelayQueue<>();
061
062  public ConstantDelayQueue(TimeUnit timeUnit, long delay) {
063    this.delayMs = TimeUnit.MILLISECONDS.convert(delay, timeUnit);
064  }
065
066  @Override
067  public E remove() {
068    DelayedElement<E> el = queue.remove();
069    return el == null ? null : el.element;
070  }
071
072  @Override
073  public E poll() {
074    DelayedElement<E> el = queue.poll();
075    return el == null ? null : el.element;
076  }
077
078  @Override
079  public E element() {
080    DelayedElement<E> el = queue.element();
081    return el == null ? null : el.element;
082  }
083
084  @Override
085  public E peek() {
086    DelayedElement<E> el = queue.peek();
087    return el == null ? null : el.element;
088  }
089
090  @Override
091  public int size() {
092    return queue.size();
093  }
094
095  @Override
096  public boolean isEmpty() {
097    return queue.isEmpty();
098  }
099
100  @Override
101  public Iterator<E> iterator() {
102    throw new UnsupportedOperationException(); // not implemented yet
103  }
104
105  @Override
106  public Object[] toArray() {
107    throw new UnsupportedOperationException(); // not implemented yet
108  }
109
110  @Override
111  public <T> T[] toArray(T[] a) {
112    throw new UnsupportedOperationException(); // not implemented yet
113  }
114
115  @Override
116  public boolean containsAll(Collection<?> c) {
117    throw new UnsupportedOperationException(); // not implemented yet
118  }
119
120  @Override
121  public boolean addAll(Collection<? extends E> c) {
122    throw new UnsupportedOperationException(); // not implemented yet
123  }
124
125  @Override
126  public boolean removeAll(Collection<?> c) {
127    throw new UnsupportedOperationException(); // not implemented yet
128  }
129
130  @Override
131  public boolean retainAll(Collection<?> c) {
132    throw new UnsupportedOperationException(); // not implemented yet
133  }
134
135  @Override
136  public void clear() {
137    queue.clear();
138  }
139
140  @Override
141  public boolean add(E e) {
142    return queue.add(new DelayedElement<>(e, delayMs));
143  }
144
145  @Override
146  public boolean offer(E e) {
147    return queue.offer(new DelayedElement<>(e, delayMs));
148  }
149
150  @Override
151  public void put(E e) throws InterruptedException {
152    queue.put(new DelayedElement<>(e, delayMs));
153  }
154
155  @Override
156  public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
157    return queue.offer(new DelayedElement<>(e, delayMs), timeout, unit);
158  }
159
160  @Override
161  public E take() throws InterruptedException {
162    DelayedElement<E> el = queue.take();
163    return el == null ? null : el.element;
164  }
165
166  @Override
167  public E poll(long timeout, TimeUnit unit) throws InterruptedException {
168    DelayedElement<E> el = queue.poll(timeout, unit);
169    return el == null ? null : el.element;
170  }
171
172  @Override
173  public int remainingCapacity() {
174    return queue.remainingCapacity();
175  }
176
177  @Override
178  public boolean remove(Object o) {
179    throw new UnsupportedOperationException(); // not implemented yet
180  }
181
182  @Override
183  public boolean contains(Object o) {
184    throw new UnsupportedOperationException(); // not implemented yet
185  }
186
187  @Override
188  public int drainTo(Collection<? super E> c) {
189    throw new UnsupportedOperationException(); // not implemented yet
190  }
191
192  @Override
193  public int drainTo(Collection<? super E> c, int maxElements) {
194    throw new UnsupportedOperationException(); // not implemented yet
195  }
196}