001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile; 019 020import java.util.Map; 021import java.util.concurrent.ConcurrentSkipListMap; 022import java.util.concurrent.Future; 023import java.util.concurrent.RejectedExecutionException; 024import java.util.concurrent.ScheduledExecutorService; 025import java.util.concurrent.ScheduledThreadPoolExecutor; 026import java.util.concurrent.ThreadFactory; 027import java.util.concurrent.ThreadLocalRandom; 028import java.util.concurrent.TimeUnit; 029import java.util.regex.Pattern; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.fs.Path; 032import org.apache.hadoop.hbase.HBaseConfiguration; 033import org.apache.hadoop.hbase.HConstants; 034import org.apache.hadoop.hbase.trace.TraceUtil; 035import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 036import org.apache.yetus.audience.InterfaceAudience; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039 040@InterfaceAudience.Private 041public final class PrefetchExecutor { 042 043 private static final Logger LOG = LoggerFactory.getLogger(PrefetchExecutor.class); 044 045 /** Futures for tracking block prefetch activity */ 046 private static final Map<Path, Future<?>> prefetchFutures = new ConcurrentSkipListMap<>(); 047 /** Executor pool shared among all HFiles for block prefetch */ 048 private static final ScheduledExecutorService prefetchExecutorPool; 049 /** Delay before beginning prefetch */ 050 private static final int prefetchDelayMillis; 051 /** Variation in prefetch delay times, to mitigate stampedes */ 052 private static final float prefetchDelayVariation; 053 static { 054 // Consider doing this on demand with a configuration passed in rather 055 // than in a static initializer. 056 Configuration conf = HBaseConfiguration.create(); 057 // 1s here for tests, consider 30s in hbase-default.xml 058 // Set to 0 for no delay 059 prefetchDelayMillis = conf.getInt("hbase.hfile.prefetch.delay", 1000); 060 prefetchDelayVariation = conf.getFloat("hbase.hfile.prefetch.delay.variation", 0.2f); 061 int prefetchThreads = conf.getInt("hbase.hfile.thread.prefetch", 4); 062 prefetchExecutorPool = new ScheduledThreadPoolExecutor(prefetchThreads, new ThreadFactory() { 063 @Override 064 public Thread newThread(Runnable r) { 065 String name = "hfile-prefetch-" + EnvironmentEdgeManager.currentTime(); 066 Thread t = new Thread(r, name); 067 t.setDaemon(true); 068 return t; 069 } 070 }); 071 } 072 073 // TODO: We want HFile, which is where the blockcache lives, to handle 074 // prefetching of file blocks but the Store level is where path convention 075 // knowledge should be contained 076 private static final Pattern prefetchPathExclude = 077 Pattern.compile("(" + Path.SEPARATOR_CHAR + HConstants.HBASE_TEMP_DIRECTORY.replace(".", "\\.") 078 + Path.SEPARATOR_CHAR + ")|(" + Path.SEPARATOR_CHAR 079 + HConstants.HREGION_COMPACTIONDIR_NAME.replace(".", "\\.") + Path.SEPARATOR_CHAR + ")"); 080 081 public static void request(Path path, Runnable runnable) { 082 if (!prefetchPathExclude.matcher(path.toString()).find()) { 083 long delay; 084 if (prefetchDelayMillis > 0) { 085 delay = (long) ((prefetchDelayMillis * (1.0f - (prefetchDelayVariation / 2))) 086 + (prefetchDelayMillis * (prefetchDelayVariation / 2) 087 * ThreadLocalRandom.current().nextFloat())); 088 } else { 089 delay = 0; 090 } 091 try { 092 LOG.debug("Prefetch requested for {}, delay={} ms", path, delay); 093 final Runnable tracedRunnable = 094 TraceUtil.tracedRunnable(runnable, "PrefetchExecutor.request"); 095 final Future<?> future = 096 prefetchExecutorPool.schedule(tracedRunnable, delay, TimeUnit.MILLISECONDS); 097 prefetchFutures.put(path, future); 098 } catch (RejectedExecutionException e) { 099 prefetchFutures.remove(path); 100 LOG.warn("Prefetch request rejected for {}", path); 101 } 102 } 103 } 104 105 public static void complete(Path path) { 106 prefetchFutures.remove(path); 107 LOG.debug("Prefetch completed for {}", path); 108 } 109 110 public static void cancel(Path path) { 111 Future<?> future = prefetchFutures.get(path); 112 if (future != null) { 113 // ok to race with other cancellation attempts 114 future.cancel(true); 115 prefetchFutures.remove(path); 116 LOG.debug("Prefetch cancelled for {}", path); 117 } 118 } 119 120 public static boolean isCompleted(Path path) { 121 Future<?> future = prefetchFutures.get(path); 122 if (future != null) { 123 return future.isDone(); 124 } 125 return true; 126 } 127 128 private PrefetchExecutor() { 129 } 130}