001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import java.util.Map;
021import java.util.concurrent.ConcurrentSkipListMap;
022import java.util.concurrent.Future;
023import java.util.concurrent.RejectedExecutionException;
024import java.util.concurrent.ScheduledExecutorService;
025import java.util.concurrent.ScheduledThreadPoolExecutor;
026import java.util.concurrent.ThreadFactory;
027import java.util.concurrent.ThreadLocalRandom;
028import java.util.concurrent.TimeUnit;
029import java.util.regex.Pattern;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.hbase.HBaseConfiguration;
033import org.apache.hadoop.hbase.HConstants;
034import org.apache.hadoop.hbase.trace.TraceUtil;
035import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
036import org.apache.yetus.audience.InterfaceAudience;
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039
040@InterfaceAudience.Private
041public final class PrefetchExecutor {
042
043  private static final Logger LOG = LoggerFactory.getLogger(PrefetchExecutor.class);
044
045  /** Futures for tracking block prefetch activity */
046  private static final Map<Path, Future<?>> prefetchFutures = new ConcurrentSkipListMap<>();
047  /** Executor pool shared among all HFiles for block prefetch */
048  private static final ScheduledExecutorService prefetchExecutorPool;
049  /** Delay before beginning prefetch */
050  private static final int prefetchDelayMillis;
051  /** Variation in prefetch delay times, to mitigate stampedes */
052  private static final float prefetchDelayVariation;
053  static {
054    // Consider doing this on demand with a configuration passed in rather
055    // than in a static initializer.
056    Configuration conf = HBaseConfiguration.create();
057    // 1s here for tests, consider 30s in hbase-default.xml
058    // Set to 0 for no delay
059    prefetchDelayMillis = conf.getInt("hbase.hfile.prefetch.delay", 1000);
060    prefetchDelayVariation = conf.getFloat("hbase.hfile.prefetch.delay.variation", 0.2f);
061    int prefetchThreads = conf.getInt("hbase.hfile.thread.prefetch", 4);
062    prefetchExecutorPool = new ScheduledThreadPoolExecutor(prefetchThreads, new ThreadFactory() {
063      @Override
064      public Thread newThread(Runnable r) {
065        String name = "hfile-prefetch-" + EnvironmentEdgeManager.currentTime();
066        Thread t = new Thread(r, name);
067        t.setDaemon(true);
068        return t;
069      }
070    });
071  }
072
073  // TODO: We want HFile, which is where the blockcache lives, to handle
074  // prefetching of file blocks but the Store level is where path convention
075  // knowledge should be contained
076  private static final Pattern prefetchPathExclude =
077    Pattern.compile("(" + Path.SEPARATOR_CHAR + HConstants.HBASE_TEMP_DIRECTORY.replace(".", "\\.")
078      + Path.SEPARATOR_CHAR + ")|(" + Path.SEPARATOR_CHAR
079      + HConstants.HREGION_COMPACTIONDIR_NAME.replace(".", "\\.") + Path.SEPARATOR_CHAR + ")");
080
081  public static void request(Path path, Runnable runnable) {
082    if (!prefetchPathExclude.matcher(path.toString()).find()) {
083      long delay;
084      if (prefetchDelayMillis > 0) {
085        delay = (long) ((prefetchDelayMillis * (1.0f - (prefetchDelayVariation / 2)))
086          + (prefetchDelayMillis * (prefetchDelayVariation / 2)
087            * ThreadLocalRandom.current().nextFloat()));
088      } else {
089        delay = 0;
090      }
091      try {
092        LOG.debug("Prefetch requested for {}, delay={} ms", path, delay);
093        final Runnable tracedRunnable =
094          TraceUtil.tracedRunnable(runnable, "PrefetchExecutor.request");
095        final Future<?> future =
096          prefetchExecutorPool.schedule(tracedRunnable, delay, TimeUnit.MILLISECONDS);
097        prefetchFutures.put(path, future);
098      } catch (RejectedExecutionException e) {
099        prefetchFutures.remove(path);
100        LOG.warn("Prefetch request rejected for {}", path);
101      }
102    }
103  }
104
105  public static void complete(Path path) {
106    prefetchFutures.remove(path);
107    if (LOG.isDebugEnabled()) {
108      LOG.debug("Prefetch completed for {}", path.getName());
109    }
110  }
111
112  public static void cancel(Path path) {
113    Future<?> future = prefetchFutures.get(path);
114    if (future != null) {
115      // ok to race with other cancellation attempts
116      future.cancel(true);
117      prefetchFutures.remove(path);
118      LOG.debug("Prefetch cancelled for {}", path);
119    }
120  }
121
122  public static boolean isCompleted(Path path) {
123    Future<?> future = prefetchFutures.get(path);
124    if (future != null) {
125      return future.isDone();
126    }
127    return true;
128  }
129
130  private PrefetchExecutor() {
131  }
132
133  /* Visible for testing only */
134  static ScheduledExecutorService getExecutorPool() {
135    return prefetchExecutorPool;
136  }
137}