001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.io.hfile;
020
021import java.util.Map;
022import java.util.Random;
023import java.util.concurrent.ConcurrentSkipListMap;
024import java.util.concurrent.Future;
025import java.util.concurrent.RejectedExecutionException;
026import java.util.concurrent.ScheduledExecutorService;
027import java.util.concurrent.ScheduledThreadPoolExecutor;
028import java.util.concurrent.ThreadFactory;
029import java.util.concurrent.TimeUnit;
030import java.util.regex.Pattern;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.hbase.HBaseConfiguration;
034import org.apache.hadoop.hbase.HConstants;
035import org.apache.yetus.audience.InterfaceAudience;
036import org.slf4j.Logger;
037import org.slf4j.LoggerFactory;
038
039@InterfaceAudience.Private
040public final class PrefetchExecutor {
041
042  private static final Logger LOG = LoggerFactory.getLogger(PrefetchExecutor.class);
043
044  /** Futures for tracking block prefetch activity */
045  private static final Map<Path,Future<?>> prefetchFutures = new ConcurrentSkipListMap<>();
046  /** Executor pool shared among all HFiles for block prefetch */
047  private static final ScheduledExecutorService prefetchExecutorPool;
048  /** Delay before beginning prefetch */
049  private static final int prefetchDelayMillis;
050  /** Variation in prefetch delay times, to mitigate stampedes */
051  private static final float prefetchDelayVariation;
052  static {
053    // Consider doing this on demand with a configuration passed in rather
054    // than in a static initializer.
055    Configuration conf = HBaseConfiguration.create();
056    // 1s here for tests, consider 30s in hbase-default.xml
057    // Set to 0 for no delay
058    prefetchDelayMillis = conf.getInt("hbase.hfile.prefetch.delay", 1000);
059    prefetchDelayVariation = conf.getFloat("hbase.hfile.prefetch.delay.variation", 0.2f);
060    int prefetchThreads = conf.getInt("hbase.hfile.thread.prefetch", 4);
061    prefetchExecutorPool = new ScheduledThreadPoolExecutor(prefetchThreads,
062      new ThreadFactory() {
063        @Override
064        public Thread newThread(Runnable r) {
065          String name = "hfile-prefetch-" + System.currentTimeMillis();
066          Thread t = new Thread(r, name);
067          t.setDaemon(true);
068          return t;
069        }
070    });
071  }
072
073  private static final Random RNG = new Random();
074
075  // TODO: We want HFile, which is where the blockcache lives, to handle
076  // prefetching of file blocks but the Store level is where path convention
077  // knowledge should be contained
078  private static final Pattern prefetchPathExclude =
079      Pattern.compile(
080        "(" +
081          Path.SEPARATOR_CHAR +
082            HConstants.HBASE_TEMP_DIRECTORY.replace(".", "\\.") +
083            Path.SEPARATOR_CHAR +
084        ")|(" +
085          Path.SEPARATOR_CHAR +
086            HConstants.HREGION_COMPACTIONDIR_NAME.replace(".", "\\.") +
087            Path.SEPARATOR_CHAR +
088        ")");
089
090  public static void request(Path path, Runnable runnable) {
091    if (!prefetchPathExclude.matcher(path.toString()).find()) {
092      long delay;
093      if (prefetchDelayMillis > 0) {
094        delay = (long)((prefetchDelayMillis * (1.0f - (prefetchDelayVariation/2))) +
095        (prefetchDelayMillis * (prefetchDelayVariation/2) * RNG.nextFloat()));
096      } else {
097        delay = 0;
098      }
099      try {
100        if (LOG.isDebugEnabled()) {
101          LOG.debug("Prefetch requested for " + path + ", delay=" + delay + " ms");
102        }
103        prefetchFutures.put(path, prefetchExecutorPool.schedule(runnable, delay,
104          TimeUnit.MILLISECONDS));
105      } catch (RejectedExecutionException e) {
106        prefetchFutures.remove(path);
107        LOG.warn("Prefetch request rejected for " + path);
108      }
109    }
110  }
111
112  public static void complete(Path path) {
113    prefetchFutures.remove(path);
114    if (LOG.isDebugEnabled()) {
115      LOG.debug("Prefetch completed for " + path);
116    }
117  }
118
119  public static void cancel(Path path) {
120    Future<?> future = prefetchFutures.get(path);
121    if (future != null) {
122      // ok to race with other cancellation attempts
123      future.cancel(true);
124      prefetchFutures.remove(path);
125      if (LOG.isDebugEnabled()) {
126        LOG.debug("Prefetch cancelled for " + path);
127      }
128    }
129  }
130
131  public static boolean isCompleted(Path path) {
132    Future<?> future = prefetchFutures.get(path);
133    if (future != null) {
134      return future.isDone();
135    }
136    return true;
137  }
138
139  private PrefetchExecutor() {}
140}