001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.util; 020 021import java.util.Collection; 022import java.util.Iterator; 023import java.util.concurrent.BlockingQueue; 024import java.util.concurrent.DelayQueue; 025import java.util.concurrent.Delayed; 026import java.util.concurrent.TimeUnit; 027 028import org.apache.yetus.audience.InterfaceAudience; 029 030/** 031 * A blocking queue implementation for adding a constant delay. Uses a DelayQueue as a backing store 032 * @param <E> type of elements 033 */ 034@InterfaceAudience.Private 035public class ConstantDelayQueue<E> implements BlockingQueue<E> { 036 037 private static final class DelayedElement<T> implements Delayed { 038 T element; 039 long end; 040 public DelayedElement(T element, long delayMs) { 041 this.element = element; 042 this.end = EnvironmentEdgeManager.currentTime() + delayMs; 043 } 044 045 @Override 046 public int compareTo(Delayed o) { 047 long cmp = getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS); 048 return cmp == 0 ? 0 : ( cmp < 0 ? -1 : 1); 049 } 050 051 @Override 052 public long getDelay(TimeUnit unit) { 053 return unit.convert(end - System.currentTimeMillis(), TimeUnit.MILLISECONDS); 054 } 055 } 056 057 private final long delayMs; 058 059 // backing DelayQueue 060 private DelayQueue<DelayedElement<E>> queue = new DelayQueue<>(); 061 062 public ConstantDelayQueue(TimeUnit timeUnit, long delay) { 063 this.delayMs = TimeUnit.MILLISECONDS.convert(delay, timeUnit); 064 } 065 066 @Override 067 public E remove() { 068 DelayedElement<E> el = queue.remove(); 069 return el == null ? null : el.element; 070 } 071 072 @Override 073 public E poll() { 074 DelayedElement<E> el = queue.poll(); 075 return el == null ? null : el.element; 076 } 077 078 @Override 079 public E element() { 080 DelayedElement<E> el = queue.element(); 081 return el == null ? null : el.element; 082 } 083 084 @Override 085 public E peek() { 086 DelayedElement<E> el = queue.peek(); 087 return el == null ? null : el.element; 088 } 089 090 @Override 091 public int size() { 092 return queue.size(); 093 } 094 095 @Override 096 public boolean isEmpty() { 097 return queue.isEmpty(); 098 } 099 100 @Override 101 public Iterator<E> iterator() { 102 throw new UnsupportedOperationException(); // not implemented yet 103 } 104 105 @Override 106 public Object[] toArray() { 107 throw new UnsupportedOperationException(); // not implemented yet 108 } 109 110 @Override 111 public <T> T[] toArray(T[] a) { 112 throw new UnsupportedOperationException(); // not implemented yet 113 } 114 115 @Override 116 public boolean containsAll(Collection<?> c) { 117 throw new UnsupportedOperationException(); // not implemented yet 118 } 119 120 @Override 121 public boolean addAll(Collection<? extends E> c) { 122 throw new UnsupportedOperationException(); // not implemented yet 123 } 124 125 @Override 126 public boolean removeAll(Collection<?> c) { 127 throw new UnsupportedOperationException(); // not implemented yet 128 } 129 130 @Override 131 public boolean retainAll(Collection<?> c) { 132 throw new UnsupportedOperationException(); // not implemented yet 133 } 134 135 @Override 136 public void clear() { 137 queue.clear(); 138 } 139 140 @Override 141 public boolean add(E e) { 142 return queue.add(new DelayedElement<>(e, delayMs)); 143 } 144 145 @Override 146 public boolean offer(E e) { 147 return queue.offer(new DelayedElement<>(e, delayMs)); 148 } 149 150 @Override 151 public void put(E e) throws InterruptedException { 152 queue.put(new DelayedElement<>(e, delayMs)); 153 } 154 155 @Override 156 public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { 157 return queue.offer(new DelayedElement<>(e, delayMs), timeout, unit); 158 } 159 160 @Override 161 public E take() throws InterruptedException { 162 DelayedElement<E> el = queue.take(); 163 return el == null ? null : el.element; 164 } 165 166 @Override 167 public E poll(long timeout, TimeUnit unit) throws InterruptedException { 168 DelayedElement<E> el = queue.poll(timeout, unit); 169 return el == null ? null : el.element; 170 } 171 172 @Override 173 public int remainingCapacity() { 174 return queue.remainingCapacity(); 175 } 176 177 @Override 178 public boolean remove(Object o) { 179 throw new UnsupportedOperationException(); // not implemented yet 180 } 181 182 @Override 183 public boolean contains(Object o) { 184 throw new UnsupportedOperationException(); // not implemented yet 185 } 186 187 @Override 188 public int drainTo(Collection<? super E> c) { 189 throw new UnsupportedOperationException(); // not implemented yet 190 } 191 192 @Override 193 public int drainTo(Collection<? super E> c, int maxElements) { 194 throw new UnsupportedOperationException(); // not implemented yet 195 } 196}