001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.thrift; 020 021import java.util.ArrayList; 022import java.util.Collection; 023import java.util.Iterator; 024import java.util.List; 025import java.util.concurrent.BlockingQueue; 026import java.util.concurrent.TimeUnit; 027 028import org.apache.yetus.audience.InterfaceAudience; 029import org.slf4j.Logger; 030import org.slf4j.LoggerFactory; 031 032/** 033 * A BlockingQueue reports waiting time in queue and queue length to 034 * ThriftMetrics. 035 */ 036@InterfaceAudience.Private 037public class CallQueue implements BlockingQueue<Runnable> { 038 private static final Logger LOG = LoggerFactory.getLogger(CallQueue.class); 039 040 private final BlockingQueue<Call> underlyingQueue; 041 private final ThriftMetrics metrics; 042 043 public CallQueue(BlockingQueue<Call> underlyingQueue, 044 ThriftMetrics metrics) { 045 this.underlyingQueue = underlyingQueue; 046 this.metrics = metrics; 047 } 048 049 private static long now() { 050 return System.nanoTime(); 051 } 052 053 public static class Call implements Runnable { 054 final long startTime; 055 final Runnable underlyingRunnable; 056 057 Call(Runnable underlyingRunnable) { 058 this.underlyingRunnable = underlyingRunnable; 059 this.startTime = now(); 060 } 061 062 @Override 063 public void run() { 064 underlyingRunnable.run(); 065 } 066 067 public long timeInQueue() { 068 return now() - startTime; 069 } 070 071 @Override 072 public boolean equals(Object other) { 073 if (other instanceof Call) { 074 Call otherCall = (Call)(other); 075 return this.underlyingRunnable.equals(otherCall.underlyingRunnable); 076 } else if (other instanceof Runnable) { 077 return this.underlyingRunnable.equals(other); 078 } 079 return false; 080 } 081 082 @Override 083 public int hashCode() { 084 return this.underlyingRunnable.hashCode(); 085 } 086 } 087 088 @Override 089 public Runnable poll() { 090 Call result = underlyingQueue.poll(); 091 updateMetrics(result); 092 return result; 093 } 094 095 private void updateMetrics(Call result) { 096 if (result == null) { 097 return; 098 } 099 metrics.incTimeInQueue(result.timeInQueue()); 100 metrics.setCallQueueLen(this.size()); 101 } 102 103 @Override 104 public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException { 105 Call result = underlyingQueue.poll(timeout, unit); 106 updateMetrics(result); 107 return result; 108 } 109 110 @Override 111 public Runnable remove() { 112 Call result = underlyingQueue.remove(); 113 updateMetrics(result); 114 return result; 115 } 116 117 @Override 118 public Runnable take() throws InterruptedException { 119 Call result = underlyingQueue.take(); 120 updateMetrics(result); 121 return result; 122 } 123 124 @Override 125 public int drainTo(Collection<? super Runnable> destination) { 126 return drainTo(destination, Integer.MAX_VALUE); 127 } 128 129 @Override 130 public int drainTo(Collection<? super Runnable> destination, 131 int maxElements) { 132 if (destination == this) { 133 throw new IllegalArgumentException( 134 "A BlockingQueue cannot drain to itself."); 135 } 136 List<Call> drained = new ArrayList<>(); 137 underlyingQueue.drainTo(drained, maxElements); 138 for (Call r : drained) { 139 updateMetrics(r); 140 } 141 destination.addAll(drained); 142 int sz = drained.size(); 143 LOG.info("Elements drained: " + sz); 144 return sz; 145 } 146 147 148 @Override 149 public boolean offer(Runnable element) { 150 return underlyingQueue.offer(new Call(element)); 151 } 152 153 @Override 154 public boolean offer(Runnable element, long timeout, TimeUnit unit) 155 throws InterruptedException { 156 return underlyingQueue.offer(new Call(element), timeout, unit); 157 } 158 @Override 159 public void put(Runnable element) throws InterruptedException { 160 underlyingQueue.put(new Call(element)); 161 } 162 163 @Override 164 public boolean add(Runnable element) { 165 return underlyingQueue.add(new Call(element)); 166 } 167 168 @Override 169 public boolean addAll(Collection<? extends Runnable> elements) { 170 int added = 0; 171 for (Runnable r : elements) { 172 added += underlyingQueue.add(new Call(r)) ? 1 : 0; 173 } 174 return added != 0; 175 } 176 177 @Override 178 public Runnable element() { 179 return underlyingQueue.element(); 180 } 181 182 @Override 183 public Runnable peek() { 184 return underlyingQueue.peek(); 185 } 186 187 @Override 188 public void clear() { 189 underlyingQueue.clear(); 190 } 191 192 @Override 193 public boolean containsAll(Collection<?> elements) { 194 return underlyingQueue.containsAll(elements); 195 } 196 197 @Override 198 public boolean isEmpty() { 199 return underlyingQueue.isEmpty(); 200 } 201 202 @Override 203 public Iterator<Runnable> iterator() { 204 return new Iterator<Runnable>() { 205 final Iterator<Call> underlyingIterator = underlyingQueue.iterator(); 206 @Override 207 public Runnable next() { 208 return underlyingIterator.next(); 209 } 210 211 @Override 212 public boolean hasNext() { 213 return underlyingIterator.hasNext(); 214 } 215 216 @Override 217 public void remove() { 218 underlyingIterator.remove(); 219 } 220 }; 221 } 222 223 @Override 224 public boolean removeAll(Collection<?> elements) { 225 return underlyingQueue.removeAll(elements); 226 } 227 228 @Override 229 public boolean retainAll(Collection<?> elements) { 230 return underlyingQueue.retainAll(elements); 231 } 232 233 @Override 234 public int size() { 235 return underlyingQueue.size(); 236 } 237 238 @Override 239 public Object[] toArray() { 240 return underlyingQueue.toArray(); 241 } 242 243 @Override 244 public <T> T[] toArray(T[] array) { 245 return underlyingQueue.toArray(array); 246 } 247 248 @Override 249 public boolean contains(Object element) { 250 return underlyingQueue.contains(element); 251 } 252 253 @Override 254 public int remainingCapacity() { 255 return underlyingQueue.remainingCapacity(); 256 } 257 258 @Override 259 public boolean remove(Object element) { 260 return underlyingQueue.remove(element); 261 } 262}