001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.thrift; 019 020import java.util.ArrayList; 021import java.util.Collection; 022import java.util.Iterator; 023import java.util.List; 024import java.util.concurrent.BlockingQueue; 025import java.util.concurrent.TimeUnit; 026import org.apache.yetus.audience.InterfaceAudience; 027import org.slf4j.Logger; 028import org.slf4j.LoggerFactory; 029 030/** 031 * A BlockingQueue reports waiting time in queue and queue length to ThriftMetrics. 032 */ 033@InterfaceAudience.Private 034public class CallQueue implements BlockingQueue<Runnable> { 035 private static final Logger LOG = LoggerFactory.getLogger(CallQueue.class); 036 037 private final BlockingQueue<Call> underlyingQueue; 038 private final ThriftMetrics metrics; 039 040 public CallQueue(BlockingQueue<Call> underlyingQueue, ThriftMetrics metrics) { 041 this.underlyingQueue = underlyingQueue; 042 this.metrics = metrics; 043 } 044 045 private static long now() { 046 return System.nanoTime(); 047 } 048 049 public static class Call implements Runnable { 050 final long startTime; 051 final Runnable underlyingRunnable; 052 053 Call(Runnable underlyingRunnable) { 054 this.underlyingRunnable = underlyingRunnable; 055 this.startTime = now(); 056 } 057 058 @Override 059 public void run() { 060 underlyingRunnable.run(); 061 } 062 063 public long timeInQueue() { 064 return now() - startTime; 065 } 066 067 @Override 068 public boolean equals(Object other) { 069 if (other instanceof Call) { 070 Call otherCall = (Call) (other); 071 return this.underlyingRunnable.equals(otherCall.underlyingRunnable); 072 } else if (other instanceof Runnable) { 073 return this.underlyingRunnable.equals(other); 074 } 075 return false; 076 } 077 078 @Override 079 public int hashCode() { 080 return this.underlyingRunnable.hashCode(); 081 } 082 } 083 084 @Override 085 public Runnable poll() { 086 Call result = underlyingQueue.poll(); 087 updateMetrics(result); 088 return result; 089 } 090 091 private void updateMetrics(Call result) { 092 if (result == null) { 093 return; 094 } 095 metrics.incTimeInQueue(result.timeInQueue()); 096 metrics.setCallQueueLen(this.size()); 097 } 098 099 @Override 100 public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException { 101 Call result = underlyingQueue.poll(timeout, unit); 102 updateMetrics(result); 103 return result; 104 } 105 106 @Override 107 public Runnable remove() { 108 Call result = underlyingQueue.remove(); 109 updateMetrics(result); 110 return result; 111 } 112 113 @Override 114 public Runnable take() throws InterruptedException { 115 Call result = underlyingQueue.take(); 116 updateMetrics(result); 117 return result; 118 } 119 120 @Override 121 public int drainTo(Collection<? super Runnable> destination) { 122 return drainTo(destination, Integer.MAX_VALUE); 123 } 124 125 @Override 126 public int drainTo(Collection<? super Runnable> destination, int maxElements) { 127 if (destination == this) { 128 throw new IllegalArgumentException("A BlockingQueue cannot drain to itself."); 129 } 130 List<Call> drained = new ArrayList<>(); 131 underlyingQueue.drainTo(drained, maxElements); 132 for (Call r : drained) { 133 updateMetrics(r); 134 } 135 destination.addAll(drained); 136 int sz = drained.size(); 137 LOG.info("Elements drained: " + sz); 138 return sz; 139 } 140 141 @Override 142 public boolean offer(Runnable element) { 143 return underlyingQueue.offer(new Call(element)); 144 } 145 146 @Override 147 public boolean offer(Runnable element, long timeout, TimeUnit unit) throws InterruptedException { 148 return underlyingQueue.offer(new Call(element), timeout, unit); 149 } 150 151 @Override 152 public void put(Runnable element) throws InterruptedException { 153 underlyingQueue.put(new Call(element)); 154 } 155 156 @Override 157 public boolean add(Runnable element) { 158 return underlyingQueue.add(new Call(element)); 159 } 160 161 @Override 162 public boolean addAll(Collection<? extends Runnable> elements) { 163 int added = 0; 164 for (Runnable r : elements) { 165 added += underlyingQueue.add(new Call(r)) ? 1 : 0; 166 } 167 return added != 0; 168 } 169 170 @Override 171 public Runnable element() { 172 return underlyingQueue.element(); 173 } 174 175 @Override 176 public Runnable peek() { 177 return underlyingQueue.peek(); 178 } 179 180 @Override 181 public void clear() { 182 underlyingQueue.clear(); 183 } 184 185 @Override 186 public boolean containsAll(Collection<?> elements) { 187 return underlyingQueue.containsAll(elements); 188 } 189 190 @Override 191 public boolean isEmpty() { 192 return underlyingQueue.isEmpty(); 193 } 194 195 @Override 196 public Iterator<Runnable> iterator() { 197 return new Iterator<Runnable>() { 198 final Iterator<Call> underlyingIterator = underlyingQueue.iterator(); 199 200 @Override 201 public Runnable next() { 202 return underlyingIterator.next(); 203 } 204 205 @Override 206 public boolean hasNext() { 207 return underlyingIterator.hasNext(); 208 } 209 210 @Override 211 public void remove() { 212 underlyingIterator.remove(); 213 } 214 }; 215 } 216 217 @Override 218 public boolean removeAll(Collection<?> elements) { 219 return underlyingQueue.removeAll(elements); 220 } 221 222 @Override 223 public boolean retainAll(Collection<?> elements) { 224 return underlyingQueue.retainAll(elements); 225 } 226 227 @Override 228 public int size() { 229 return underlyingQueue.size(); 230 } 231 232 @Override 233 public Object[] toArray() { 234 return underlyingQueue.toArray(); 235 } 236 237 @Override 238 public <T> T[] toArray(T[] array) { 239 return underlyingQueue.toArray(array); 240 } 241 242 @Override 243 public boolean contains(Object element) { 244 return underlyingQueue.contains(element); 245 } 246 247 @Override 248 public int remainingCapacity() { 249 return underlyingQueue.remainingCapacity(); 250 } 251 252 @Override 253 public boolean remove(Object element) { 254 return underlyingQueue.remove(element); 255 } 256}