001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util; 019 020import java.util.Collection; 021import java.util.Iterator; 022import java.util.concurrent.BlockingQueue; 023import java.util.concurrent.DelayQueue; 024import java.util.concurrent.Delayed; 025import java.util.concurrent.TimeUnit; 026import org.apache.yetus.audience.InterfaceAudience; 027 028/** 029 * A blocking queue implementation for adding a constant delay. Uses a DelayQueue as a backing store 030 * @param <E> type of elements 031 */ 032@InterfaceAudience.Private 033public class ConstantDelayQueue<E> implements BlockingQueue<E> { 034 035 private static final class DelayedElement<T> implements Delayed { 036 T element; 037 long end; 038 039 public DelayedElement(T element, long delayMs) { 040 this.element = element; 041 this.end = EnvironmentEdgeManager.currentTime() + delayMs; 042 } 043 044 @Override 045 public int compareTo(Delayed o) { 046 long cmp = getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS); 047 return cmp == 0 ? 0 : (cmp < 0 ? -1 : 1); 048 } 049 050 @Override 051 public long getDelay(TimeUnit unit) { 052 return unit.convert(end - EnvironmentEdgeManager.currentTime(), TimeUnit.MILLISECONDS); 053 } 054 } 055 056 private final long delayMs; 057 058 // backing DelayQueue 059 private DelayQueue<DelayedElement<E>> queue = new DelayQueue<>(); 060 061 public ConstantDelayQueue(TimeUnit timeUnit, long delay) { 062 this.delayMs = TimeUnit.MILLISECONDS.convert(delay, timeUnit); 063 } 064 065 @Override 066 public E remove() { 067 DelayedElement<E> el = queue.remove(); 068 return el == null ? null : el.element; 069 } 070 071 @Override 072 public E poll() { 073 DelayedElement<E> el = queue.poll(); 074 return el == null ? null : el.element; 075 } 076 077 @Override 078 public E element() { 079 DelayedElement<E> el = queue.element(); 080 return el == null ? null : el.element; 081 } 082 083 @Override 084 public E peek() { 085 DelayedElement<E> el = queue.peek(); 086 return el == null ? null : el.element; 087 } 088 089 @Override 090 public int size() { 091 return queue.size(); 092 } 093 094 @Override 095 public boolean isEmpty() { 096 return queue.isEmpty(); 097 } 098 099 @Override 100 public Iterator<E> iterator() { 101 throw new UnsupportedOperationException(); // not implemented yet 102 } 103 104 @Override 105 public Object[] toArray() { 106 throw new UnsupportedOperationException(); // not implemented yet 107 } 108 109 @Override 110 public <T> T[] toArray(T[] a) { 111 throw new UnsupportedOperationException(); // not implemented yet 112 } 113 114 @Override 115 public boolean containsAll(Collection<?> c) { 116 throw new UnsupportedOperationException(); // not implemented yet 117 } 118 119 @Override 120 public boolean addAll(Collection<? extends E> c) { 121 throw new UnsupportedOperationException(); // not implemented yet 122 } 123 124 @Override 125 public boolean removeAll(Collection<?> c) { 126 throw new UnsupportedOperationException(); // not implemented yet 127 } 128 129 @Override 130 public boolean retainAll(Collection<?> c) { 131 throw new UnsupportedOperationException(); // not implemented yet 132 } 133 134 @Override 135 public void clear() { 136 queue.clear(); 137 } 138 139 @Override 140 public boolean add(E e) { 141 return queue.add(new DelayedElement<>(e, delayMs)); 142 } 143 144 @Override 145 public boolean offer(E e) { 146 return queue.offer(new DelayedElement<>(e, delayMs)); 147 } 148 149 @Override 150 public void put(E e) throws InterruptedException { 151 queue.put(new DelayedElement<>(e, delayMs)); 152 } 153 154 @Override 155 public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { 156 return queue.offer(new DelayedElement<>(e, delayMs), timeout, unit); 157 } 158 159 @Override 160 public E take() throws InterruptedException { 161 DelayedElement<E> el = queue.take(); 162 return el == null ? null : el.element; 163 } 164 165 @Override 166 public E poll(long timeout, TimeUnit unit) throws InterruptedException { 167 DelayedElement<E> el = queue.poll(timeout, unit); 168 return el == null ? null : el.element; 169 } 170 171 @Override 172 public int remainingCapacity() { 173 return queue.remainingCapacity(); 174 } 175 176 @Override 177 public boolean remove(Object o) { 178 throw new UnsupportedOperationException(); // not implemented yet 179 } 180 181 @Override 182 public boolean contains(Object o) { 183 throw new UnsupportedOperationException(); // not implemented yet 184 } 185 186 @Override 187 public int drainTo(Collection<? super E> c) { 188 throw new UnsupportedOperationException(); // not implemented yet 189 } 190 191 @Override 192 public int drainTo(Collection<? super E> c, int maxElements) { 193 throw new UnsupportedOperationException(); // not implemented yet 194 } 195}