001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019
020package org.apache.hadoop.hbase;
021
022import java.util.concurrent.Callable;
023import java.util.concurrent.Delayed;
024import java.util.concurrent.ExecutionException;
025import java.util.concurrent.RunnableScheduledFuture;
026import java.util.concurrent.ScheduledThreadPoolExecutor;
027import java.util.concurrent.ThreadFactory;
028import java.util.concurrent.ThreadLocalRandom;
029import java.util.concurrent.TimeUnit;
030import java.util.concurrent.TimeoutException;
031
032import org.apache.yetus.audience.InterfaceAudience;
033
034/**
035 * ScheduledThreadPoolExecutor that will add some jitter to the RunnableScheduledFuture.getDelay.
036 *
037 * This will spread out things on a distributed cluster.
038 */
039@InterfaceAudience.Private
040public class JitterScheduledThreadPoolExecutorImpl extends ScheduledThreadPoolExecutor {
041  private final double spread;
042
043  /**
044   * Main constructor.
045   * @param spread The percent up and down that RunnableScheduledFuture.getDelay should be jittered.
046   */
047  public JitterScheduledThreadPoolExecutorImpl(int corePoolSize,
048                                               ThreadFactory threadFactory,
049                                               double spread) {
050    super(corePoolSize, threadFactory);
051    this.spread = spread;
052  }
053
054  @Override
055  protected <V> java.util.concurrent.RunnableScheduledFuture<V> decorateTask(
056      Runnable runnable, java.util.concurrent.RunnableScheduledFuture<V> task) {
057    return new JitteredRunnableScheduledFuture<>(task);
058  }
059
060  @Override
061  protected <V> java.util.concurrent.RunnableScheduledFuture<V> decorateTask(
062      Callable<V> callable, java.util.concurrent.RunnableScheduledFuture<V> task) {
063    return new JitteredRunnableScheduledFuture<>(task);
064  }
065
066  /**
067   * Class that basically just defers to the wrapped future.
068   * The only exception is getDelay
069   */
070  protected class JitteredRunnableScheduledFuture<V> implements RunnableScheduledFuture<V> {
071    private final RunnableScheduledFuture<V> wrapped;
072    JitteredRunnableScheduledFuture(RunnableScheduledFuture<V> wrapped) {
073      this.wrapped = wrapped;
074    }
075
076    @Override
077    public boolean isPeriodic() {
078      return wrapped.isPeriodic();
079    }
080
081    @Override
082    public long getDelay(TimeUnit unit) {
083      long baseDelay = wrapped.getDelay(unit);
084      long spreadTime = (long) (baseDelay * spread);
085      long delay = spreadTime <= 0 ? baseDelay
086          : baseDelay + ThreadLocalRandom.current().nextLong(-spreadTime, spreadTime);
087      // Ensure that we don't roll over for nanoseconds.
088      return (delay < 0) ? baseDelay : delay;
089    }
090
091    @Override
092    public int compareTo(Delayed o) {
093      return wrapped.compareTo(o);
094    }
095
096    @Override
097    public boolean equals(Object obj) {
098      if (obj == this) {
099        return true;
100      }
101      return obj instanceof Delayed? compareTo((Delayed)obj) == 0: false;
102    }
103
104    @Override
105    public int hashCode() {
106      return this.wrapped.hashCode();
107    }
108
109    @Override
110    public void run() {
111      wrapped.run();
112    }
113
114    @Override
115    public boolean cancel(boolean mayInterruptIfRunning) {
116      return wrapped.cancel(mayInterruptIfRunning);
117    }
118
119    @Override
120    public boolean isCancelled() {
121      return wrapped.isCancelled();
122    }
123
124    @Override
125    public boolean isDone() {
126      return wrapped.isDone();
127    }
128
129    @Override
130    public V get() throws InterruptedException, ExecutionException {
131      return wrapped.get();
132    }
133
134    @Override
135    public V get(long timeout,
136                 TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
137      return wrapped.get(timeout, unit);
138    }
139  }
140}