001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import com.google.errorprone.annotations.RestrictedApi;
021import java.util.Map;
022import java.util.concurrent.ConcurrentSkipListMap;
023import java.util.concurrent.Future;
024import java.util.concurrent.RejectedExecutionException;
025import java.util.concurrent.ScheduledExecutorService;
026import java.util.concurrent.ScheduledFuture;
027import java.util.concurrent.ScheduledThreadPoolExecutor;
028import java.util.concurrent.ThreadFactory;
029import java.util.concurrent.ThreadLocalRandom;
030import java.util.concurrent.TimeUnit;
031import java.util.concurrent.atomic.AtomicBoolean;
032import java.util.regex.Pattern;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.fs.Path;
035import org.apache.hadoop.hbase.HBaseConfiguration;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.trace.TraceUtil;
038import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
039import org.apache.yetus.audience.InterfaceAudience;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043@InterfaceAudience.Private
044public final class PrefetchExecutor {
045
046  private static final Logger LOG = LoggerFactory.getLogger(PrefetchExecutor.class);
047  /** Wait time in miliseconds before executing prefetch */
048  public static final String PREFETCH_DELAY = "hbase.hfile.prefetch.delay";
049  public static final String PREFETCH_DELAY_VARIATION = "hbase.hfile.prefetch.delay.variation";
050  public static final float PREFETCH_DELAY_VARIATION_DEFAULT_VALUE = 0.2f;
051
052  /** Futures for tracking block prefetch activity */
053  private static final Map<Path, Future<?>> prefetchFutures = new ConcurrentSkipListMap<>();
054  /** Runnables for resetting the prefetch activity */
055  private static final Map<Path, Runnable> prefetchRunnable = new ConcurrentSkipListMap<>();
056  /** Executor pool shared among all HFiles for block prefetch */
057  private static final ScheduledExecutorService prefetchExecutorPool;
058  /** Delay before beginning prefetch */
059  private static int prefetchDelayMillis;
060  /** Variation in prefetch delay times, to mitigate stampedes */
061  private static float prefetchDelayVariation;
062  static {
063    // Consider doing this on demand with a configuration passed in rather
064    // than in a static initializer.
065    Configuration conf = HBaseConfiguration.create();
066    // 1s here for tests, consider 30s in hbase-default.xml
067    // Set to 0 for no delay
068    prefetchDelayMillis = conf.getInt(PREFETCH_DELAY, 1000);
069    prefetchDelayVariation =
070      conf.getFloat(PREFETCH_DELAY_VARIATION, PREFETCH_DELAY_VARIATION_DEFAULT_VALUE);
071    int prefetchThreads = conf.getInt("hbase.hfile.thread.prefetch", 4);
072    prefetchExecutorPool = new ScheduledThreadPoolExecutor(prefetchThreads, new ThreadFactory() {
073      @Override
074      public Thread newThread(Runnable r) {
075        String name = "hfile-prefetch-" + EnvironmentEdgeManager.currentTime();
076        Thread t = new Thread(r, name);
077        t.setDaemon(true);
078        return t;
079      }
080    });
081  }
082
083  // TODO: We want HFile, which is where the blockcache lives, to handle
084  // prefetching of file blocks but the Store level is where path convention
085  // knowledge should be contained
086  private static final Pattern prefetchPathExclude =
087    Pattern.compile("(" + Path.SEPARATOR_CHAR + HConstants.HBASE_TEMP_DIRECTORY.replace(".", "\\.")
088      + Path.SEPARATOR_CHAR + ")|(" + Path.SEPARATOR_CHAR
089      + HConstants.HREGION_COMPACTIONDIR_NAME.replace(".", "\\.") + Path.SEPARATOR_CHAR + ")");
090
091  public static void request(Path path, Runnable runnable) {
092    if (!prefetchPathExclude.matcher(path.toString()).find()) {
093      long delay;
094      if (prefetchDelayMillis > 0) {
095        delay = (long) ((prefetchDelayMillis * (1.0f - (prefetchDelayVariation / 2)))
096          + (prefetchDelayMillis * (prefetchDelayVariation / 2)
097            * ThreadLocalRandom.current().nextFloat()));
098      } else {
099        delay = 0;
100      }
101      try {
102        LOG.debug("Prefetch requested for {}, delay={} ms", path, delay);
103        final Runnable tracedRunnable =
104          TraceUtil.tracedRunnable(runnable, "PrefetchExecutor.request");
105        final Future<?> future =
106          prefetchExecutorPool.schedule(tracedRunnable, delay, TimeUnit.MILLISECONDS);
107        prefetchFutures.put(path, future);
108        prefetchRunnable.put(path, runnable);
109      } catch (RejectedExecutionException e) {
110        prefetchFutures.remove(path);
111        prefetchRunnable.remove(path);
112        LOG.warn("Prefetch request rejected for {}", path);
113      }
114    }
115  }
116
117  public static void complete(Path path) {
118    prefetchFutures.remove(path);
119    prefetchRunnable.remove(path);
120    if (LOG.isDebugEnabled()) {
121      LOG.debug("Prefetch completed for {}", path.getName());
122    }
123  }
124
125  public static void cancel(Path path) {
126    Future<?> future = prefetchFutures.get(path);
127    if (future != null) {
128      // ok to race with other cancellation attempts
129      future.cancel(true);
130      prefetchFutures.remove(path);
131      prefetchRunnable.remove(path);
132      LOG.debug("Prefetch cancelled for {}", path);
133    }
134  }
135
136  public static void interrupt(Path path) {
137    Future<?> future = prefetchFutures.get(path);
138    if (future != null) {
139      prefetchFutures.remove(path);
140      // ok to race with other cancellation attempts
141      future.cancel(true);
142      LOG.debug("Prefetch cancelled for {}", path);
143    }
144  }
145
146  private PrefetchExecutor() {
147  }
148
149  public static boolean isCompleted(Path path) {
150    Future<?> future = prefetchFutures.get(path);
151    if (future != null) {
152      return future.isDone();
153    }
154    return true;
155  }
156
157  /* Visible for testing only */
158  @RestrictedApi(explanation = "Should only be called in tests", link = "",
159      allowedOnPath = ".*/src/test/.*")
160  static ScheduledExecutorService getExecutorPool() {
161    return prefetchExecutorPool;
162  }
163
164  @RestrictedApi(explanation = "Should only be called in tests", link = "",
165      allowedOnPath = ".*/src/test/.*")
166  static Map<Path, Future<?>> getPrefetchFutures() {
167    return prefetchFutures;
168  }
169
170  @RestrictedApi(explanation = "Should only be called in tests", link = "",
171      allowedOnPath = ".*/src/test/.*")
172  static Map<Path, Runnable> getPrefetchRunnable() {
173    return prefetchRunnable;
174  }
175
176  static boolean isPrefetchStarted() {
177    AtomicBoolean prefetchStarted = new AtomicBoolean(false);
178    for (Map.Entry<Path, Future<?>> entry : prefetchFutures.entrySet()) {
179      Path k = entry.getKey();
180      Future<?> v = entry.getValue();
181      ScheduledFuture sf = (ScheduledFuture) prefetchFutures.get(k);
182      long waitTime = sf.getDelay(TimeUnit.MILLISECONDS);
183      if (waitTime < 0) {
184        // At this point prefetch is started
185        prefetchStarted.set(true);
186        break;
187      }
188    }
189    return prefetchStarted.get();
190  }
191
192  public static int getPrefetchDelay() {
193    return prefetchDelayMillis;
194  }
195
196  public static void loadConfiguration(Configuration conf) {
197    prefetchDelayMillis = conf.getInt(PREFETCH_DELAY, 1000);
198    prefetchDelayVariation =
199      conf.getFloat(PREFETCH_DELAY_VARIATION, PREFETCH_DELAY_VARIATION_DEFAULT_VALUE);
200    prefetchFutures.forEach((k, v) -> {
201      ScheduledFuture sf = (ScheduledFuture) prefetchFutures.get(k);
202      if (!(sf.getDelay(TimeUnit.MILLISECONDS) > 0)) {
203        // the thread is still pending delay expiration and has not started to run yet, so can be
204        // re-scheduled at no cost.
205        interrupt(k);
206        request(k, prefetchRunnable.get(k));
207      }
208      LOG.debug("Reset called on Prefetch of file {} with delay {}, delay variation {}", k,
209        prefetchDelayMillis, prefetchDelayVariation);
210    });
211  }
212}