001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import java.util.concurrent.Callable;
021import java.util.concurrent.Delayed;
022import java.util.concurrent.ExecutionException;
023import java.util.concurrent.RunnableScheduledFuture;
024import java.util.concurrent.ScheduledThreadPoolExecutor;
025import java.util.concurrent.ThreadFactory;
026import java.util.concurrent.ThreadLocalRandom;
027import java.util.concurrent.TimeUnit;
028import java.util.concurrent.TimeoutException;
029import org.apache.yetus.audience.InterfaceAudience;
030
031/**
032 * ScheduledThreadPoolExecutor that will add some jitter to the RunnableScheduledFuture.getDelay.
033 * This will spread out things on a distributed cluster.
034 */
035@InterfaceAudience.Private
036public class JitterScheduledThreadPoolExecutorImpl extends ScheduledThreadPoolExecutor {
037  private final double spread;
038
039  /**
040   * Main constructor.
041   * @param spread The percent up and down that RunnableScheduledFuture.getDelay should be jittered.
042   */
043  public JitterScheduledThreadPoolExecutorImpl(int corePoolSize, ThreadFactory threadFactory,
044    double spread) {
045    super(corePoolSize, threadFactory);
046    this.spread = spread;
047  }
048
049  @Override
050  protected <V> java.util.concurrent.RunnableScheduledFuture<V> decorateTask(Runnable runnable,
051    java.util.concurrent.RunnableScheduledFuture<V> task) {
052    return new JitteredRunnableScheduledFuture<>(task);
053  }
054
055  @Override
056  protected <V> java.util.concurrent.RunnableScheduledFuture<V> decorateTask(Callable<V> callable,
057    java.util.concurrent.RunnableScheduledFuture<V> task) {
058    return new JitteredRunnableScheduledFuture<>(task);
059  }
060
061  /**
062   * Class that basically just defers to the wrapped future. The only exception is getDelay
063   */
064  protected class JitteredRunnableScheduledFuture<V> implements RunnableScheduledFuture<V> {
065    private final RunnableScheduledFuture<V> wrapped;
066
067    JitteredRunnableScheduledFuture(RunnableScheduledFuture<V> wrapped) {
068      this.wrapped = wrapped;
069    }
070
071    @Override
072    public boolean isPeriodic() {
073      return wrapped.isPeriodic();
074    }
075
076    @Override
077    public long getDelay(TimeUnit unit) {
078      long baseDelay = wrapped.getDelay(unit);
079      long spreadTime = (long) (baseDelay * spread);
080      long delay = spreadTime <= 0
081        ? baseDelay
082        : baseDelay + ThreadLocalRandom.current().nextLong(-spreadTime, spreadTime);
083      // Ensure that we don't roll over for nanoseconds.
084      return (delay < 0) ? baseDelay : delay;
085    }
086
087    @Override
088    public int compareTo(Delayed o) {
089      return wrapped.compareTo(o);
090    }
091
092    @Override
093    public boolean equals(Object obj) {
094      if (obj == this) {
095        return true;
096      }
097      return obj instanceof Delayed ? compareTo((Delayed) obj) == 0 : false;
098    }
099
100    @Override
101    public int hashCode() {
102      return this.wrapped.hashCode();
103    }
104
105    @Override
106    public void run() {
107      wrapped.run();
108    }
109
110    @Override
111    public boolean cancel(boolean mayInterruptIfRunning) {
112      return wrapped.cancel(mayInterruptIfRunning);
113    }
114
115    @Override
116    public boolean isCancelled() {
117      return wrapped.isCancelled();
118    }
119
120    @Override
121    public boolean isDone() {
122      return wrapped.isDone();
123    }
124
125    @Override
126    public V get() throws InterruptedException, ExecutionException {
127      return wrapped.get();
128    }
129
130    @Override
131    public V get(long timeout, TimeUnit unit)
132      throws InterruptedException, ExecutionException, TimeoutException {
133      return wrapped.get(timeout, unit);
134    }
135  }
136}