001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile; 019 020import java.util.Map; 021import java.util.concurrent.ConcurrentSkipListMap; 022import java.util.concurrent.Future; 023import java.util.concurrent.RejectedExecutionException; 024import java.util.concurrent.ScheduledExecutorService; 025import java.util.concurrent.ScheduledThreadPoolExecutor; 026import java.util.concurrent.ThreadFactory; 027import java.util.concurrent.ThreadLocalRandom; 028import java.util.concurrent.TimeUnit; 029import java.util.regex.Pattern; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.fs.Path; 032import org.apache.hadoop.hbase.HBaseConfiguration; 033import org.apache.hadoop.hbase.HConstants; 034import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 035import org.apache.yetus.audience.InterfaceAudience; 036import org.slf4j.Logger; 037import org.slf4j.LoggerFactory; 038 039@InterfaceAudience.Private 040public final class PrefetchExecutor { 041 042 private static final Logger LOG = LoggerFactory.getLogger(PrefetchExecutor.class); 043 044 /** Futures for tracking block prefetch activity */ 045 private static final Map<Path, Future<?>> prefetchFutures = new ConcurrentSkipListMap<>(); 046 /** Executor pool shared among all HFiles for block prefetch */ 047 private static final ScheduledExecutorService prefetchExecutorPool; 048 /** Delay before beginning prefetch */ 049 private static final int prefetchDelayMillis; 050 /** Variation in prefetch delay times, to mitigate stampedes */ 051 private static final float prefetchDelayVariation; 052 static { 053 // Consider doing this on demand with a configuration passed in rather 054 // than in a static initializer. 055 Configuration conf = HBaseConfiguration.create(); 056 // 1s here for tests, consider 30s in hbase-default.xml 057 // Set to 0 for no delay 058 prefetchDelayMillis = conf.getInt("hbase.hfile.prefetch.delay", 1000); 059 prefetchDelayVariation = conf.getFloat("hbase.hfile.prefetch.delay.variation", 0.2f); 060 int prefetchThreads = conf.getInt("hbase.hfile.thread.prefetch", 4); 061 prefetchExecutorPool = new ScheduledThreadPoolExecutor(prefetchThreads, new ThreadFactory() { 062 @Override 063 public Thread newThread(Runnable r) { 064 String name = "hfile-prefetch-" + EnvironmentEdgeManager.currentTime(); 065 Thread t = new Thread(r, name); 066 t.setDaemon(true); 067 return t; 068 } 069 }); 070 } 071 072 // TODO: We want HFile, which is where the blockcache lives, to handle 073 // prefetching of file blocks but the Store level is where path convention 074 // knowledge should be contained 075 private static final Pattern prefetchPathExclude = 076 Pattern.compile("(" + Path.SEPARATOR_CHAR + HConstants.HBASE_TEMP_DIRECTORY.replace(".", "\\.") 077 + Path.SEPARATOR_CHAR + ")|(" + Path.SEPARATOR_CHAR 078 + HConstants.HREGION_COMPACTIONDIR_NAME.replace(".", "\\.") + Path.SEPARATOR_CHAR + ")"); 079 080 public static void request(Path path, Runnable runnable) { 081 if (!prefetchPathExclude.matcher(path.toString()).find()) { 082 long delay; 083 if (prefetchDelayMillis > 0) { 084 delay = (long) ((prefetchDelayMillis * (1.0f - (prefetchDelayVariation / 2))) 085 + (prefetchDelayMillis * (prefetchDelayVariation / 2) 086 * ThreadLocalRandom.current().nextFloat())); 087 } else { 088 delay = 0; 089 } 090 try { 091 if (LOG.isDebugEnabled()) { 092 LOG.debug("Prefetch requested for " + path + ", delay=" + delay + " ms"); 093 } 094 prefetchFutures.put(path, 095 prefetchExecutorPool.schedule(runnable, delay, TimeUnit.MILLISECONDS)); 096 } catch (RejectedExecutionException e) { 097 prefetchFutures.remove(path); 098 LOG.warn("Prefetch request rejected for " + path); 099 } 100 } 101 } 102 103 public static void complete(Path path) { 104 prefetchFutures.remove(path); 105 if (LOG.isDebugEnabled()) { 106 LOG.debug("Prefetch completed for " + path); 107 } 108 } 109 110 public static void cancel(Path path) { 111 Future<?> future = prefetchFutures.get(path); 112 if (future != null) { 113 // ok to race with other cancellation attempts 114 future.cancel(true); 115 prefetchFutures.remove(path); 116 if (LOG.isDebugEnabled()) { 117 LOG.debug("Prefetch cancelled for " + path); 118 } 119 } 120 } 121 122 public static boolean isCompleted(Path path) { 123 Future<?> future = prefetchFutures.get(path); 124 if (future != null) { 125 return future.isDone(); 126 } 127 return true; 128 } 129 130 private PrefetchExecutor() { 131 } 132}