001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import java.util.Map;
021import java.util.concurrent.ConcurrentSkipListMap;
022import java.util.concurrent.Future;
023import java.util.concurrent.RejectedExecutionException;
024import java.util.concurrent.ScheduledExecutorService;
025import java.util.concurrent.ScheduledThreadPoolExecutor;
026import java.util.concurrent.ThreadFactory;
027import java.util.concurrent.ThreadLocalRandom;
028import java.util.concurrent.TimeUnit;
029import java.util.regex.Pattern;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.hbase.HBaseConfiguration;
033import org.apache.hadoop.hbase.HConstants;
034import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
035import org.apache.yetus.audience.InterfaceAudience;
036import org.slf4j.Logger;
037import org.slf4j.LoggerFactory;
038
039@InterfaceAudience.Private
040public final class PrefetchExecutor {
041
042  private static final Logger LOG = LoggerFactory.getLogger(PrefetchExecutor.class);
043
044  /** Futures for tracking block prefetch activity */
045  private static final Map<Path, Future<?>> prefetchFutures = new ConcurrentSkipListMap<>();
046  /** Executor pool shared among all HFiles for block prefetch */
047  private static final ScheduledExecutorService prefetchExecutorPool;
048  /** Delay before beginning prefetch */
049  private static final int prefetchDelayMillis;
050  /** Variation in prefetch delay times, to mitigate stampedes */
051  private static final float prefetchDelayVariation;
052  static {
053    // Consider doing this on demand with a configuration passed in rather
054    // than in a static initializer.
055    Configuration conf = HBaseConfiguration.create();
056    // 1s here for tests, consider 30s in hbase-default.xml
057    // Set to 0 for no delay
058    prefetchDelayMillis = conf.getInt("hbase.hfile.prefetch.delay", 1000);
059    prefetchDelayVariation = conf.getFloat("hbase.hfile.prefetch.delay.variation", 0.2f);
060    int prefetchThreads = conf.getInt("hbase.hfile.thread.prefetch", 4);
061    prefetchExecutorPool = new ScheduledThreadPoolExecutor(prefetchThreads, new ThreadFactory() {
062      @Override
063      public Thread newThread(Runnable r) {
064        String name = "hfile-prefetch-" + EnvironmentEdgeManager.currentTime();
065        Thread t = new Thread(r, name);
066        t.setDaemon(true);
067        return t;
068      }
069    });
070  }
071
072  // TODO: We want HFile, which is where the blockcache lives, to handle
073  // prefetching of file blocks but the Store level is where path convention
074  // knowledge should be contained
075  private static final Pattern prefetchPathExclude =
076    Pattern.compile("(" + Path.SEPARATOR_CHAR + HConstants.HBASE_TEMP_DIRECTORY.replace(".", "\\.")
077      + Path.SEPARATOR_CHAR + ")|(" + Path.SEPARATOR_CHAR
078      + HConstants.HREGION_COMPACTIONDIR_NAME.replace(".", "\\.") + Path.SEPARATOR_CHAR + ")");
079
080  public static void request(Path path, Runnable runnable) {
081    if (!prefetchPathExclude.matcher(path.toString()).find()) {
082      long delay;
083      if (prefetchDelayMillis > 0) {
084        delay = (long) ((prefetchDelayMillis * (1.0f - (prefetchDelayVariation / 2)))
085          + (prefetchDelayMillis * (prefetchDelayVariation / 2)
086            * ThreadLocalRandom.current().nextFloat()));
087      } else {
088        delay = 0;
089      }
090      try {
091        if (LOG.isDebugEnabled()) {
092          LOG.debug("Prefetch requested for " + path + ", delay=" + delay + " ms");
093        }
094        prefetchFutures.put(path,
095          prefetchExecutorPool.schedule(runnable, delay, TimeUnit.MILLISECONDS));
096      } catch (RejectedExecutionException e) {
097        prefetchFutures.remove(path);
098        LOG.warn("Prefetch request rejected for " + path);
099      }
100    }
101  }
102
103  public static void complete(Path path) {
104    prefetchFutures.remove(path);
105    if (LOG.isDebugEnabled()) {
106      LOG.debug("Prefetch completed for " + path);
107    }
108  }
109
110  public static void cancel(Path path) {
111    Future<?> future = prefetchFutures.get(path);
112    if (future != null) {
113      // ok to race with other cancellation attempts
114      future.cancel(true);
115      prefetchFutures.remove(path);
116      if (LOG.isDebugEnabled()) {
117        LOG.debug("Prefetch cancelled for " + path);
118      }
119    }
120  }
121
122  public static boolean isCompleted(Path path) {
123    Future<?> future = prefetchFutures.get(path);
124    if (future != null) {
125      return future.isDone();
126    }
127    return true;
128  }
129
130  private PrefetchExecutor() {
131  }
132}