1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase;
21
22 import org.apache.hadoop.hbase.classification.InterfaceAudience;
23
24 import java.util.concurrent.Callable;
25 import java.util.concurrent.Delayed;
26 import java.util.concurrent.ExecutionException;
27 import java.util.concurrent.RunnableScheduledFuture;
28 import java.util.concurrent.ScheduledThreadPoolExecutor;
29 import java.util.concurrent.ThreadFactory;
30 import java.util.concurrent.ThreadLocalRandom;
31 import java.util.concurrent.TimeUnit;
32 import java.util.concurrent.TimeoutException;
33
34
35
36
37
38
39 @InterfaceAudience.Private
40 public class JitterScheduledThreadPoolExecutorImpl extends ScheduledThreadPoolExecutor {
41 private final double spread;
42
43
44
45
46
47 public JitterScheduledThreadPoolExecutorImpl(int corePoolSize,
48 ThreadFactory threadFactory,
49 double spread) {
50 super(corePoolSize, threadFactory);
51 this.spread = spread;
52 }
53
54 protected <V> java.util.concurrent.RunnableScheduledFuture<V> decorateTask(
55 Runnable runnable, java.util.concurrent.RunnableScheduledFuture<V> task) {
56 return new JitteredRunnableScheduledFuture<>(task);
57 }
58
59
60 protected <V> java.util.concurrent.RunnableScheduledFuture<V> decorateTask(
61 Callable<V> callable, java.util.concurrent.RunnableScheduledFuture<V> task) {
62 return new JitteredRunnableScheduledFuture<>(task);
63 }
64
65
66
67
68
69 protected class JitteredRunnableScheduledFuture<V> implements RunnableScheduledFuture<V> {
70 private final RunnableScheduledFuture<V> wrapped;
71 JitteredRunnableScheduledFuture(RunnableScheduledFuture<V> wrapped) {
72 this.wrapped = wrapped;
73 }
74
75 @Override
76 public boolean isPeriodic() {
77 return wrapped.isPeriodic();
78 }
79
80 @Override
81 public long getDelay(TimeUnit unit) {
82 long baseDelay = wrapped.getDelay(unit);
83 long spreadTime = (long) (baseDelay * spread);
84 long delay = spreadTime <= 0 ? baseDelay
85 : baseDelay + ThreadLocalRandom.current().nextLong(-spreadTime, spreadTime);
86
87 return (delay < 0) ? baseDelay : delay;
88 }
89
90 @Override
91 public int compareTo(Delayed o) {
92 return wrapped.compareTo(o);
93 }
94
95 @Override
96 public boolean equals(Object obj) {
97 if (obj == this) {
98 return true;
99 }
100 return obj instanceof Delayed? compareTo((Delayed)obj) == 0: false;
101 }
102
103 @Override
104 public int hashCode() {
105 return this.wrapped.hashCode();
106 }
107
108 @Override
109 public void run() {
110 wrapped.run();
111 }
112
113 @Override
114 public boolean cancel(boolean mayInterruptIfRunning) {
115 return wrapped.cancel(mayInterruptIfRunning);
116 }
117
118 @Override
119 public boolean isCancelled() {
120 return wrapped.isCancelled();
121 }
122
123 @Override
124 public boolean isDone() {
125 return wrapped.isDone();
126 }
127
128 @Override
129 public V get() throws InterruptedException, ExecutionException {
130 return wrapped.get();
131 }
132
133 @Override
134 public V get(long timeout,
135 TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
136 return wrapped.get(timeout, unit);
137 }
138 }
139 }