001/* 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.io.hfile; 020 021import java.util.Map; 022import java.util.Random; 023import java.util.concurrent.ConcurrentSkipListMap; 024import java.util.concurrent.Future; 025import java.util.concurrent.RejectedExecutionException; 026import java.util.concurrent.ScheduledExecutorService; 027import java.util.concurrent.ScheduledThreadPoolExecutor; 028import java.util.concurrent.ThreadFactory; 029import java.util.concurrent.TimeUnit; 030import java.util.regex.Pattern; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.hbase.HBaseConfiguration; 034import org.apache.hadoop.hbase.HConstants; 035import org.apache.yetus.audience.InterfaceAudience; 036import org.slf4j.Logger; 037import org.slf4j.LoggerFactory; 038 039@InterfaceAudience.Private 040public final class PrefetchExecutor { 041 042 private static final Logger LOG = LoggerFactory.getLogger(PrefetchExecutor.class); 043 044 /** Futures for tracking block prefetch activity */ 045 private static final Map<Path,Future<?>> prefetchFutures = new ConcurrentSkipListMap<>(); 046 /** Executor pool shared among all HFiles for block prefetch */ 047 private static final ScheduledExecutorService prefetchExecutorPool; 048 /** Delay before beginning prefetch */ 049 private static final int prefetchDelayMillis; 050 /** Variation in prefetch delay times, to mitigate stampedes */ 051 private static final float prefetchDelayVariation; 052 static { 053 // Consider doing this on demand with a configuration passed in rather 054 // than in a static initializer. 055 Configuration conf = HBaseConfiguration.create(); 056 // 1s here for tests, consider 30s in hbase-default.xml 057 // Set to 0 for no delay 058 prefetchDelayMillis = conf.getInt("hbase.hfile.prefetch.delay", 1000); 059 prefetchDelayVariation = conf.getFloat("hbase.hfile.prefetch.delay.variation", 0.2f); 060 int prefetchThreads = conf.getInt("hbase.hfile.thread.prefetch", 4); 061 prefetchExecutorPool = new ScheduledThreadPoolExecutor(prefetchThreads, 062 new ThreadFactory() { 063 @Override 064 public Thread newThread(Runnable r) { 065 String name = "hfile-prefetch-" + System.currentTimeMillis(); 066 Thread t = new Thread(r, name); 067 t.setDaemon(true); 068 return t; 069 } 070 }); 071 } 072 073 private static final Random RNG = new Random(); 074 075 // TODO: We want HFile, which is where the blockcache lives, to handle 076 // prefetching of file blocks but the Store level is where path convention 077 // knowledge should be contained 078 private static final Pattern prefetchPathExclude = 079 Pattern.compile( 080 "(" + 081 Path.SEPARATOR_CHAR + 082 HConstants.HBASE_TEMP_DIRECTORY.replace(".", "\\.") + 083 Path.SEPARATOR_CHAR + 084 ")|(" + 085 Path.SEPARATOR_CHAR + 086 HConstants.HREGION_COMPACTIONDIR_NAME.replace(".", "\\.") + 087 Path.SEPARATOR_CHAR + 088 ")"); 089 090 public static void request(Path path, Runnable runnable) { 091 if (!prefetchPathExclude.matcher(path.toString()).find()) { 092 long delay; 093 if (prefetchDelayMillis > 0) { 094 delay = (long)((prefetchDelayMillis * (1.0f - (prefetchDelayVariation/2))) + 095 (prefetchDelayMillis * (prefetchDelayVariation/2) * RNG.nextFloat())); 096 } else { 097 delay = 0; 098 } 099 try { 100 if (LOG.isDebugEnabled()) { 101 LOG.debug("Prefetch requested for " + path + ", delay=" + delay + " ms"); 102 } 103 prefetchFutures.put(path, prefetchExecutorPool.schedule(runnable, delay, 104 TimeUnit.MILLISECONDS)); 105 } catch (RejectedExecutionException e) { 106 prefetchFutures.remove(path); 107 LOG.warn("Prefetch request rejected for " + path); 108 } 109 } 110 } 111 112 public static void complete(Path path) { 113 prefetchFutures.remove(path); 114 if (LOG.isDebugEnabled()) { 115 LOG.debug("Prefetch completed for " + path); 116 } 117 } 118 119 public static void cancel(Path path) { 120 Future<?> future = prefetchFutures.get(path); 121 if (future != null) { 122 // ok to race with other cancellation attempts 123 future.cancel(true); 124 prefetchFutures.remove(path); 125 if (LOG.isDebugEnabled()) { 126 LOG.debug("Prefetch cancelled for " + path); 127 } 128 } 129 } 130 131 public static boolean isCompleted(Path path) { 132 Future<?> future = prefetchFutures.get(path); 133 if (future != null) { 134 return future.isDone(); 135 } 136 return true; 137 } 138 139 private PrefetchExecutor() {} 140}