001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import java.util.concurrent.Callable; 021import java.util.concurrent.Delayed; 022import java.util.concurrent.ExecutionException; 023import java.util.concurrent.RunnableScheduledFuture; 024import java.util.concurrent.ScheduledThreadPoolExecutor; 025import java.util.concurrent.ThreadFactory; 026import java.util.concurrent.ThreadLocalRandom; 027import java.util.concurrent.TimeUnit; 028import java.util.concurrent.TimeoutException; 029import org.apache.yetus.audience.InterfaceAudience; 030 031/** 032 * ScheduledThreadPoolExecutor that will add some jitter to the RunnableScheduledFuture.getDelay. 033 * This will spread out things on a distributed cluster. 034 */ 035@InterfaceAudience.Private 036public class JitterScheduledThreadPoolExecutorImpl extends ScheduledThreadPoolExecutor { 037 private final double spread; 038 039 /** 040 * Main constructor. 041 * @param spread The percent up and down that RunnableScheduledFuture.getDelay should be jittered. 042 */ 043 public JitterScheduledThreadPoolExecutorImpl(int corePoolSize, ThreadFactory threadFactory, 044 double spread) { 045 super(corePoolSize, threadFactory); 046 this.spread = spread; 047 } 048 049 @Override 050 protected <V> java.util.concurrent.RunnableScheduledFuture<V> decorateTask(Runnable runnable, 051 java.util.concurrent.RunnableScheduledFuture<V> task) { 052 return new JitteredRunnableScheduledFuture<>(task); 053 } 054 055 @Override 056 protected <V> java.util.concurrent.RunnableScheduledFuture<V> decorateTask(Callable<V> callable, 057 java.util.concurrent.RunnableScheduledFuture<V> task) { 058 return new JitteredRunnableScheduledFuture<>(task); 059 } 060 061 /** 062 * Class that basically just defers to the wrapped future. The only exception is getDelay 063 */ 064 protected class JitteredRunnableScheduledFuture<V> implements RunnableScheduledFuture<V> { 065 private final RunnableScheduledFuture<V> wrapped; 066 067 JitteredRunnableScheduledFuture(RunnableScheduledFuture<V> wrapped) { 068 this.wrapped = wrapped; 069 } 070 071 @Override 072 public boolean isPeriodic() { 073 return wrapped.isPeriodic(); 074 } 075 076 @Override 077 public long getDelay(TimeUnit unit) { 078 long baseDelay = wrapped.getDelay(unit); 079 long spreadTime = (long) (baseDelay * spread); 080 long delay = spreadTime <= 0 081 ? baseDelay 082 : baseDelay + ThreadLocalRandom.current().nextLong(-spreadTime, spreadTime); 083 // Ensure that we don't roll over for nanoseconds. 084 return (delay < 0) ? baseDelay : delay; 085 } 086 087 @Override 088 public int compareTo(Delayed o) { 089 return wrapped.compareTo(o); 090 } 091 092 @Override 093 public boolean equals(Object obj) { 094 if (obj == this) { 095 return true; 096 } 097 return obj instanceof Delayed ? compareTo((Delayed) obj) == 0 : false; 098 } 099 100 @Override 101 public int hashCode() { 102 return this.wrapped.hashCode(); 103 } 104 105 @Override 106 public void run() { 107 wrapped.run(); 108 } 109 110 @Override 111 public boolean cancel(boolean mayInterruptIfRunning) { 112 return wrapped.cancel(mayInterruptIfRunning); 113 } 114 115 @Override 116 public boolean isCancelled() { 117 return wrapped.isCancelled(); 118 } 119 120 @Override 121 public boolean isDone() { 122 return wrapped.isDone(); 123 } 124 125 @Override 126 public V get() throws InterruptedException, ExecutionException { 127 return wrapped.get(); 128 } 129 130 @Override 131 public V get(long timeout, TimeUnit unit) 132 throws InterruptedException, ExecutionException, TimeoutException { 133 return wrapped.get(timeout, unit); 134 } 135 } 136}