View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase;
21  
22  import org.apache.hadoop.hbase.classification.InterfaceAudience;
23  
24  import java.util.concurrent.Callable;
25  import java.util.concurrent.Delayed;
26  import java.util.concurrent.ExecutionException;
27  import java.util.concurrent.RunnableScheduledFuture;
28  import java.util.concurrent.ScheduledThreadPoolExecutor;
29  import java.util.concurrent.ThreadFactory;
30  import java.util.concurrent.ThreadLocalRandom;
31  import java.util.concurrent.TimeUnit;
32  import java.util.concurrent.TimeoutException;
33  
34  /**
35   * ScheduledThreadPoolExecutor that will add some jitter to the RunnableScheduledFuture.getDelay.
36   *
37   * This will spread out things on a distributed cluster.
38   */
39  @InterfaceAudience.Private
40  public class JitterScheduledThreadPoolExecutorImpl extends ScheduledThreadPoolExecutor {
41    private final double spread;
42  
43    /**
44     * Main constructor.
45     * @param spread The percent up and down that RunnableScheduledFuture.getDelay should be jittered.
46     */
47    public JitterScheduledThreadPoolExecutorImpl(int corePoolSize,
48                                                 ThreadFactory threadFactory,
49                                                 double spread) {
50      super(corePoolSize, threadFactory);
51      this.spread = spread;
52    }
53  
54    protected <V> java.util.concurrent.RunnableScheduledFuture<V> decorateTask(
55        Runnable runnable, java.util.concurrent.RunnableScheduledFuture<V> task) {
56      return new JitteredRunnableScheduledFuture<>(task);
57    }
58  
59  
60    protected <V> java.util.concurrent.RunnableScheduledFuture<V> decorateTask(
61        Callable<V> callable, java.util.concurrent.RunnableScheduledFuture<V> task) {
62      return new JitteredRunnableScheduledFuture<>(task);
63    }
64  
65    /**
66     * Class that basically just defers to the wrapped future.
67     * The only exception is getDelay
68     */
69    protected class JitteredRunnableScheduledFuture<V> implements RunnableScheduledFuture<V> {
70      private final RunnableScheduledFuture<V> wrapped;
71      JitteredRunnableScheduledFuture(RunnableScheduledFuture<V> wrapped) {
72        this.wrapped = wrapped;
73      }
74  
75      @Override
76      public boolean isPeriodic() {
77        return wrapped.isPeriodic();
78      }
79  
80      @Override
81      public long getDelay(TimeUnit unit) {
82        long baseDelay = wrapped.getDelay(unit);
83        long spreadTime = (long) (baseDelay * spread);
84        long delay = spreadTime <= 0 ? baseDelay
85            : baseDelay + ThreadLocalRandom.current().nextLong(-spreadTime, spreadTime);
86        // Ensure that we don't roll over for nanoseconds.
87        return (delay < 0) ? baseDelay : delay;
88      }
89  
90      @Override
91      public int compareTo(Delayed o) {
92        return wrapped.compareTo(o);
93      }
94  
95      @Override
96      public boolean equals(Object obj) {
97        if (obj == this) {
98          return true;
99        }
100       return obj instanceof Delayed? compareTo((Delayed)obj) == 0: false;
101     }
102 
103     @Override
104     public int hashCode() {
105       return this.wrapped.hashCode();
106     }
107 
108     @Override
109     public void run() {
110       wrapped.run();
111     }
112 
113     @Override
114     public boolean cancel(boolean mayInterruptIfRunning) {
115       return wrapped.cancel(mayInterruptIfRunning);
116     }
117 
118     @Override
119     public boolean isCancelled() {
120       return wrapped.isCancelled();
121     }
122 
123     @Override
124     public boolean isDone() {
125       return wrapped.isDone();
126     }
127 
128     @Override
129     public V get() throws InterruptedException, ExecutionException {
130       return wrapped.get();
131     }
132 
133     @Override
134     public V get(long timeout,
135                  TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
136       return wrapped.get(timeout, unit);
137     }
138   }
139 }