001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019 020package org.apache.hadoop.hbase; 021 022import java.util.concurrent.Callable; 023import java.util.concurrent.Delayed; 024import java.util.concurrent.ExecutionException; 025import java.util.concurrent.RunnableScheduledFuture; 026import java.util.concurrent.ScheduledThreadPoolExecutor; 027import java.util.concurrent.ThreadFactory; 028import java.util.concurrent.ThreadLocalRandom; 029import java.util.concurrent.TimeUnit; 030import java.util.concurrent.TimeoutException; 031 032import org.apache.yetus.audience.InterfaceAudience; 033 034/** 035 * ScheduledThreadPoolExecutor that will add some jitter to the RunnableScheduledFuture.getDelay. 036 * 037 * This will spread out things on a distributed cluster. 038 */ 039@InterfaceAudience.Private 040public class JitterScheduledThreadPoolExecutorImpl extends ScheduledThreadPoolExecutor { 041 private final double spread; 042 043 /** 044 * Main constructor. 045 * @param spread The percent up and down that RunnableScheduledFuture.getDelay should be jittered. 046 */ 047 public JitterScheduledThreadPoolExecutorImpl(int corePoolSize, 048 ThreadFactory threadFactory, 049 double spread) { 050 super(corePoolSize, threadFactory); 051 this.spread = spread; 052 } 053 054 @Override 055 protected <V> java.util.concurrent.RunnableScheduledFuture<V> decorateTask( 056 Runnable runnable, java.util.concurrent.RunnableScheduledFuture<V> task) { 057 return new JitteredRunnableScheduledFuture<>(task); 058 } 059 060 @Override 061 protected <V> java.util.concurrent.RunnableScheduledFuture<V> decorateTask( 062 Callable<V> callable, java.util.concurrent.RunnableScheduledFuture<V> task) { 063 return new JitteredRunnableScheduledFuture<>(task); 064 } 065 066 /** 067 * Class that basically just defers to the wrapped future. 068 * The only exception is getDelay 069 */ 070 protected class JitteredRunnableScheduledFuture<V> implements RunnableScheduledFuture<V> { 071 private final RunnableScheduledFuture<V> wrapped; 072 JitteredRunnableScheduledFuture(RunnableScheduledFuture<V> wrapped) { 073 this.wrapped = wrapped; 074 } 075 076 @Override 077 public boolean isPeriodic() { 078 return wrapped.isPeriodic(); 079 } 080 081 @Override 082 public long getDelay(TimeUnit unit) { 083 long baseDelay = wrapped.getDelay(unit); 084 long spreadTime = (long) (baseDelay * spread); 085 long delay = spreadTime <= 0 ? baseDelay 086 : baseDelay + ThreadLocalRandom.current().nextLong(-spreadTime, spreadTime); 087 // Ensure that we don't roll over for nanoseconds. 088 return (delay < 0) ? baseDelay : delay; 089 } 090 091 @Override 092 public int compareTo(Delayed o) { 093 return wrapped.compareTo(o); 094 } 095 096 @Override 097 public boolean equals(Object obj) { 098 if (obj == this) { 099 return true; 100 } 101 return obj instanceof Delayed? compareTo((Delayed)obj) == 0: false; 102 } 103 104 @Override 105 public int hashCode() { 106 return this.wrapped.hashCode(); 107 } 108 109 @Override 110 public void run() { 111 wrapped.run(); 112 } 113 114 @Override 115 public boolean cancel(boolean mayInterruptIfRunning) { 116 return wrapped.cancel(mayInterruptIfRunning); 117 } 118 119 @Override 120 public boolean isCancelled() { 121 return wrapped.isCancelled(); 122 } 123 124 @Override 125 public boolean isDone() { 126 return wrapped.isDone(); 127 } 128 129 @Override 130 public V get() throws InterruptedException, ExecutionException { 131 return wrapped.get(); 132 } 133 134 @Override 135 public V get(long timeout, 136 TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { 137 return wrapped.get(timeout, unit); 138 } 139 } 140}