1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.io.hfile;
20
21 import java.util.Map;
22 import java.util.Random;
23 import java.util.concurrent.ConcurrentSkipListMap;
24 import java.util.concurrent.Future;
25 import java.util.concurrent.RejectedExecutionException;
26 import java.util.concurrent.ScheduledExecutorService;
27 import java.util.concurrent.ScheduledThreadPoolExecutor;
28 import java.util.concurrent.ThreadFactory;
29 import java.util.concurrent.TimeUnit;
30 import java.util.regex.Pattern;
31
32 import org.apache.commons.logging.Log;
33 import org.apache.commons.logging.LogFactory;
34
35 import org.apache.hadoop.conf.Configuration;
36 import org.apache.hadoop.fs.Path;
37 import org.apache.hadoop.hbase.HBaseConfiguration;
38 import org.apache.hadoop.hbase.HConstants;
39
40 public class PrefetchExecutor {
41
42 private static final Log LOG = LogFactory.getLog(PrefetchExecutor.class);
43
44
45 private static final Map<Path,Future<?>> prefetchFutures =
46 new ConcurrentSkipListMap<Path,Future<?>>();
47
48 private static final ScheduledExecutorService prefetchExecutorPool;
49
50 private static final int prefetchDelayMillis;
51
52 private static final float prefetchDelayVariation;
53 static {
54
55
56 Configuration conf = HBaseConfiguration.create();
57
58
59 prefetchDelayMillis = conf.getInt("hbase.hfile.prefetch.delay", 1000);
60 prefetchDelayVariation = conf.getFloat("hbase.hfile.prefetch.delay.variation", 0.2f);
61 int prefetchThreads = conf.getInt("hbase.hfile.thread.prefetch", 4);
62 prefetchExecutorPool = new ScheduledThreadPoolExecutor(prefetchThreads,
63 new ThreadFactory() {
64 @Override
65 public Thread newThread(Runnable r) {
66 Thread t = new Thread(r);
67 t.setName("hfile-prefetch-" + System.currentTimeMillis());
68 t.setDaemon(true);
69 return t;
70 }
71 });
72 }
73
74 private static final Random RNG = new Random();
75
76
77
78
79 private static final Pattern prefetchPathExclude =
80 Pattern.compile(
81 "(" +
82 Path.SEPARATOR_CHAR +
83 HConstants.HBASE_TEMP_DIRECTORY.replace(".", "\\.") +
84 Path.SEPARATOR_CHAR +
85 ")|(" +
86 Path.SEPARATOR_CHAR +
87 HConstants.HREGION_COMPACTIONDIR_NAME.replace(".", "\\.") +
88 Path.SEPARATOR_CHAR +
89 ")");
90
91 public static void request(Path path, Runnable runnable) {
92 if (!prefetchPathExclude.matcher(path.toString()).find()) {
93 long delay;
94 if (prefetchDelayMillis > 0) {
95 delay = (long)((prefetchDelayMillis * (1.0f - (prefetchDelayVariation/2))) +
96 (prefetchDelayMillis * (prefetchDelayVariation/2) * RNG.nextFloat()));
97 } else {
98 delay = 0;
99 }
100 try {
101 if (LOG.isDebugEnabled()) {
102 LOG.debug("Prefetch requested for " + path + ", delay=" + delay + " ms");
103 }
104 prefetchFutures.put(path, prefetchExecutorPool.schedule(runnable, delay,
105 TimeUnit.MILLISECONDS));
106 } catch (RejectedExecutionException e) {
107 prefetchFutures.remove(path);
108 LOG.warn("Prefetch request rejected for " + path);
109 }
110 }
111 }
112
113 public static void complete(Path path) {
114 prefetchFutures.remove(path);
115 if (LOG.isDebugEnabled()) {
116 LOG.debug("Prefetch completed for " + path);
117 }
118 }
119
120 public static void cancel(Path path) {
121 Future<?> future = prefetchFutures.get(path);
122 if (future != null) {
123
124 future.cancel(true);
125 prefetchFutures.remove(path);
126 if (LOG.isDebugEnabled()) {
127 LOG.debug("Prefetch cancelled for " + path);
128 }
129 }
130 }
131
132 public static boolean isCompleted(Path path) {
133 Future<?> future = prefetchFutures.get(path);
134 if (future != null) {
135 return future.isDone();
136 }
137 return true;
138 }
139
140 }