View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.io.hfile;
20  
21  import java.util.Map;
22  import java.util.Random;
23  import java.util.concurrent.ConcurrentSkipListMap;
24  import java.util.concurrent.Future;
25  import java.util.concurrent.RejectedExecutionException;
26  import java.util.concurrent.ScheduledExecutorService;
27  import java.util.concurrent.ScheduledThreadPoolExecutor;
28  import java.util.concurrent.ThreadFactory;
29  import java.util.concurrent.TimeUnit;
30  import java.util.regex.Pattern;
31  
32  import org.apache.commons.logging.Log;
33  import org.apache.commons.logging.LogFactory;
34  
35  import org.apache.hadoop.conf.Configuration;
36  import org.apache.hadoop.fs.Path;
37  import org.apache.hadoop.hbase.HBaseConfiguration;
38  import org.apache.hadoop.hbase.HConstants;
39  
40  public class PrefetchExecutor {
41  
42    private static final Log LOG = LogFactory.getLog(PrefetchExecutor.class);
43  
44    /** Futures for tracking block prefetch activity */
45    private static final Map<Path,Future<?>> prefetchFutures =
46      new ConcurrentSkipListMap<Path,Future<?>>();
47    /** Executor pool shared among all HFiles for block prefetch */
48    private static final ScheduledExecutorService prefetchExecutorPool;
49    /** Delay before beginning prefetch */
50    private static final int prefetchDelayMillis;
51    /** Variation in prefetch delay times, to mitigate stampedes */
52    private static final float prefetchDelayVariation;
53    static {
54      // Consider doing this on demand with a configuration passed in rather
55      // than in a static initializer.
56      Configuration conf = HBaseConfiguration.create();
57      // 1s here for tests, consider 30s in hbase-default.xml
58      // Set to 0 for no delay
59      prefetchDelayMillis = conf.getInt("hbase.hfile.prefetch.delay", 1000);
60      prefetchDelayVariation = conf.getFloat("hbase.hfile.prefetch.delay.variation", 0.2f);
61      int prefetchThreads = conf.getInt("hbase.hfile.thread.prefetch", 4);
62      prefetchExecutorPool = new ScheduledThreadPoolExecutor(prefetchThreads,
63        new ThreadFactory() {
64          @Override
65          public Thread newThread(Runnable r) {
66            String name = "hfile-prefetch-" + System.currentTimeMillis();
67            Thread t = new Thread(r, name);
68            t.setDaemon(true);
69            return t;
70          }
71      });
72    }
73  
74    private static final Random RNG = new Random();
75  
76    // TODO: We want HFile, which is where the blockcache lives, to handle
77    // prefetching of file blocks but the Store level is where path convention
78    // knowledge should be contained
79    private static final Pattern prefetchPathExclude =
80        Pattern.compile(
81          "(" +
82            Path.SEPARATOR_CHAR +
83              HConstants.HBASE_TEMP_DIRECTORY.replace(".", "\\.") +
84              Path.SEPARATOR_CHAR +
85          ")|(" +
86            Path.SEPARATOR_CHAR +
87              HConstants.HREGION_COMPACTIONDIR_NAME.replace(".", "\\.") +
88              Path.SEPARATOR_CHAR +
89          ")");
90  
91    public static void request(Path path, Runnable runnable) {
92      if (!prefetchPathExclude.matcher(path.toString()).find()) {
93        long delay;
94        if (prefetchDelayMillis > 0) {
95          delay = (long)((prefetchDelayMillis * (1.0f - (prefetchDelayVariation/2))) +
96          (prefetchDelayMillis * (prefetchDelayVariation/2) * RNG.nextFloat()));
97        } else {
98          delay = 0;
99        }
100       try {
101         if (LOG.isDebugEnabled()) {
102           LOG.debug("Prefetch requested for " + path + ", delay=" + delay + " ms");
103         }
104         prefetchFutures.put(path, prefetchExecutorPool.schedule(runnable, delay,
105           TimeUnit.MILLISECONDS));
106       } catch (RejectedExecutionException e) {
107         prefetchFutures.remove(path);
108         LOG.warn("Prefetch request rejected for " + path);
109       }
110     }
111   }
112 
113   public static void complete(Path path) {
114     prefetchFutures.remove(path);
115     if (LOG.isDebugEnabled()) {
116       LOG.debug("Prefetch completed for " + path);
117     }
118   }
119 
120   public static void cancel(Path path) {
121     Future<?> future = prefetchFutures.get(path);
122     if (future != null) {
123       // ok to race with other cancellation attempts
124       future.cancel(true);
125       prefetchFutures.remove(path);
126       if (LOG.isDebugEnabled()) {
127         LOG.debug("Prefetch cancelled for " + path);
128       }
129     }
130   }
131 
132   public static boolean isCompleted(Path path) {
133     Future<?> future = prefetchFutures.get(path);
134     if (future != null) {
135       return future.isDone(); 
136     }
137     return true;
138   }
139 
140 }