View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase;
20  
21  import java.util.ArrayList;
22  import java.util.HashMap;
23  import java.util.LinkedHashMap;
24  import java.util.Map.Entry;
25  import java.util.concurrent.ScheduledFuture;
26  import java.util.concurrent.ScheduledThreadPoolExecutor;
27  import java.util.concurrent.ThreadFactory;
28  import java.util.concurrent.atomic.AtomicInteger;
29  
30  import com.google.common.annotations.VisibleForTesting;
31  import org.apache.commons.logging.Log;
32  import org.apache.commons.logging.LogFactory;
33  import org.apache.hadoop.hbase.ScheduledChore.ChoreServicer;
34  import org.apache.hadoop.hbase.classification.InterfaceAudience;
35  
36  /**
37   * ChoreService is a service that can be used to schedule instances of {@link ScheduledChore} to run
38   * periodically while sharing threads. The ChoreService is backed by a
39   * {@link ScheduledThreadPoolExecutor} whose core pool size changes dynamically depending on the
40   * number of {@link ScheduledChore} scheduled. All of the threads in the core thread pool of the
41   * underlying {@link ScheduledThreadPoolExecutor} are set to be daemon threads.
42   * <p>
43   * The ChoreService provides the ability to schedule, cancel, and trigger instances of
44   * {@link ScheduledChore}. The ChoreService also provides the ability to check on the status of
45   * scheduled chores. The number of threads used by the ChoreService changes based on the scheduling
46   * load and whether or not the scheduled chores are executing on time. As more chores are scheduled,
47   * there may be a need to increase the number of threads if it is noticed that chores are no longer
48   * meeting their scheduled start times. On the other hand, as chores are cancelled, an attempt is
49   * made to reduce the number of running threads to see if chores can still meet their start times
50   * with a smaller thread pool.
51   * <p>
52   * When finished with a ChoreService it is good practice to call {@link ChoreService#shutdown()}.
53   * Calling this method ensures that all scheduled chores are cancelled and cleaned up properly.
54   */
55  @InterfaceAudience.Private
56  public class ChoreService implements ChoreServicer {
57    private static final Log LOG = LogFactory.getLog(ChoreService.class);
58  
59    /**
60     * The minimum number of threads in the core pool of the underlying ScheduledThreadPoolExecutor
61     */
62    public final static int MIN_CORE_POOL_SIZE = 1;
63  
64    /**
65     * This thread pool is used to schedule all of the Chores
66     */
67    private final ScheduledThreadPoolExecutor scheduler;
68  
69    /**
70     * Maps chores to their futures. Futures are used to control a chore's schedule
71     */
72    private final HashMap<ScheduledChore, ScheduledFuture<?>> scheduledChores;
73  
74    /**
75     * Maps chores to Booleans which indicate whether or not a chore has caused an increase in the
76     * core pool size of the ScheduledThreadPoolExecutor. Each chore should only be allowed to
77     * increase the core pool size by 1 (otherwise a single long running chore whose execution is
78     * longer than its period would be able to spawn too many threads).
79     */
80    private final HashMap<ScheduledChore, Boolean> choresMissingStartTime;
81  
82    /**
83     * The coreThreadPoolPrefix is the prefix that will be applied to all threads within the
84     * ScheduledThreadPoolExecutor. The prefix is typically related to the Server that the service is
85     * running on. The prefix is useful because it allows us to monitor how the thread pool of a
86     * particular service changes over time VIA thread dumps.
87     */
88    private final String coreThreadPoolPrefix;
89  
90    /**
91     *
92     * @param coreThreadPoolPrefix Prefix that will be applied to the Thread name of all threads
93     *          spawned by this service
94     */
95    @VisibleForTesting
96    public ChoreService(final String coreThreadPoolPrefix) {
97      this(coreThreadPoolPrefix, MIN_CORE_POOL_SIZE, false);
98    }
99  
100   /**
101    * @param jitter Should chore service add some jitter for all of the scheduled chores. When set
102    *               to true this will add -10% to 10% jitter.
103    */
104   public ChoreService(final String coreThreadPoolPrefix, final boolean jitter) {
105     this(coreThreadPoolPrefix, MIN_CORE_POOL_SIZE, jitter);
106   }
107 
108   /**
109    * @param coreThreadPoolPrefix Prefix that will be applied to the Thread name of all threads
110    *          spawned by this service
111    * @param corePoolSize The initial size to set the core pool of the ScheduledThreadPoolExecutor 
112    *          to during initialization. The default size is 1, but specifying a larger size may be
113    *          beneficial if you know that 1 thread will not be enough.
114    */
115   public ChoreService(final String coreThreadPoolPrefix, int corePoolSize, boolean jitter) {
116     this.coreThreadPoolPrefix = coreThreadPoolPrefix;
117     if (corePoolSize < MIN_CORE_POOL_SIZE)  {
118       corePoolSize = MIN_CORE_POOL_SIZE;
119     }
120 
121     final ThreadFactory threadFactory = new ChoreServiceThreadFactory(coreThreadPoolPrefix);
122     if (jitter) {
123       scheduler = new JitterScheduledThreadPoolExecutorImpl(corePoolSize, threadFactory, 0.1);
124     } else {
125       scheduler = new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
126     }
127 
128     scheduler.setRemoveOnCancelPolicy(true);
129     scheduledChores = new HashMap<ScheduledChore, ScheduledFuture<?>>();
130     choresMissingStartTime = new HashMap<ScheduledChore, Boolean>();
131   }
132 
133   /**
134    * @param coreThreadPoolPrefix Prefix that will be applied to the Thread name of all threads
135    *          spawned by this service
136    */
137   public static ChoreService getInstance(final String coreThreadPoolPrefix) {
138     return new ChoreService(coreThreadPoolPrefix);
139   }
140 
141   /**
142    * @param chore Chore to be scheduled. If the chore is already scheduled with another ChoreService
143    *          instance, that schedule will be cancelled (i.e. a Chore can only ever be scheduled
144    *          with a single ChoreService instance).
145    * @return true when the chore was successfully scheduled. false when the scheduling failed
146    *         (typically occurs when a chore is scheduled during shutdown of service)
147    */
148   public synchronized boolean scheduleChore(ScheduledChore chore) {
149     if (chore == null) {
150       return false;
151     }
152 
153     try {
154       chore.setChoreServicer(this);
155       ScheduledFuture<?> future =
156           scheduler.scheduleAtFixedRate(chore, chore.getInitialDelay(), chore.getPeriod(),
157             chore.getTimeUnit());
158       scheduledChores.put(chore, future);
159       return true;
160     } catch (Exception exception) {
161       if (LOG.isInfoEnabled()) {
162         LOG.info("Could not successfully schedule chore: " + chore.getName());
163       }
164       return false;
165     }
166   }
167 
168   /**
169    * @param chore The Chore to be rescheduled. If the chore is not scheduled with this ChoreService
170    *          yet then this call is equivalent to a call to scheduleChore.
171    */
172   private synchronized void rescheduleChore(ScheduledChore chore) {
173     if (chore == null) return;
174 
175     if (scheduledChores.containsKey(chore)) {
176       ScheduledFuture<?> future = scheduledChores.get(chore);
177       future.cancel(false);
178     }
179     scheduleChore(chore);
180   }
181 
182   @Override
183   public synchronized void cancelChore(ScheduledChore chore) {
184     cancelChore(chore, true);
185   }
186 
187   @Override
188   public synchronized void cancelChore(ScheduledChore chore, boolean mayInterruptIfRunning) {
189     if (chore != null && scheduledChores.containsKey(chore)) {
190       ScheduledFuture<?> future = scheduledChores.get(chore);
191       future.cancel(mayInterruptIfRunning);
192       scheduledChores.remove(chore);
193 
194       // Removing a chore that was missing its start time means it may be possible
195       // to reduce the number of threads
196       if (choresMissingStartTime.containsKey(chore)) {
197         choresMissingStartTime.remove(chore);
198         requestCorePoolDecrease();
199       }
200     }
201   }
202 
203   @Override
204   public synchronized boolean isChoreScheduled(ScheduledChore chore) {
205     return chore != null && scheduledChores.containsKey(chore)
206         && !scheduledChores.get(chore).isDone();
207   }
208 
209   @Override
210   public synchronized boolean triggerNow(ScheduledChore chore) {
211     if (chore == null) {
212       return false;
213     } else {
214       rescheduleChore(chore);
215       return true;
216     }
217   }
218 
219   /**
220    * @return number of chores that this service currently has scheduled
221    */
222   int getNumberOfScheduledChores() {
223     return scheduledChores.size();
224   }
225 
226   /**
227    * @return number of chores that this service currently has scheduled that are missing their
228    *         scheduled start time
229    */
230   int getNumberOfChoresMissingStartTime() {
231     return choresMissingStartTime.size();
232   }
233 
234   /**
235    * @return number of threads in the core pool of the underlying ScheduledThreadPoolExecutor
236    */
237   int getCorePoolSize() {
238     return scheduler.getCorePoolSize();
239   }
240 
241   /**
242    * Custom ThreadFactory used with the ScheduledThreadPoolExecutor so that all the threads are
243    * daemon threads, and thus, don't prevent the JVM from shutting down
244    */
245   static class ChoreServiceThreadFactory implements ThreadFactory {
246     private final String threadPrefix;
247     private final static String THREAD_NAME_SUFFIX = "_ChoreService_";
248     private AtomicInteger threadNumber = new AtomicInteger(1);
249 
250     /**
251      * @param threadPrefix The prefix given to all threads created by this factory
252      */
253     public ChoreServiceThreadFactory(final String threadPrefix) {
254       this.threadPrefix = threadPrefix;
255     }
256 
257     @Override
258     public Thread newThread(Runnable r) {
259       Thread thread =
260           new Thread(r, threadPrefix + THREAD_NAME_SUFFIX + threadNumber.getAndIncrement());
261       thread.setDaemon(true);
262       return thread;
263     }
264   }
265 
266   /**
267    * Represents a request to increase the number of core pool threads. Typically a request
268    * originates from the fact that the current core pool size is not sufficient to service all of
269    * the currently running Chores
270    * @return true when the request to increase the core pool size succeeds
271    */
272   private synchronized boolean requestCorePoolIncrease() {
273     // There is no point in creating more threads than scheduledChores.size since scheduled runs
274     // of the same chore cannot run concurrently (i.e. happen-before behavior is enforced
275     // amongst occurrences of the same chore).
276     if (scheduler.getCorePoolSize() < scheduledChores.size()) {
277       scheduler.setCorePoolSize(scheduler.getCorePoolSize() + 1);
278       printChoreServiceDetails("requestCorePoolIncrease");
279       return true;
280     }
281     return false;
282   }
283 
284   /**
285    * Represents a request to decrease the number of core pool threads. Typically a request
286    * originates from the fact that the current core pool size is more than sufficient to service the
287    * running Chores.
288    */
289   private synchronized void requestCorePoolDecrease() {
290     if (scheduler.getCorePoolSize() > MIN_CORE_POOL_SIZE) {
291       scheduler.setCorePoolSize(scheduler.getCorePoolSize() - 1);
292       printChoreServiceDetails("requestCorePoolDecrease");
293     }
294   }
295 
296   @Override
297   public synchronized void onChoreMissedStartTime(ScheduledChore chore) {
298     if (chore == null || !scheduledChores.containsKey(chore)) return;
299 
300     // If the chore has not caused an increase in the size of the core thread pool then request an
301     // increase. This allows each chore missing its start time to increase the core pool size by
302     // at most 1.
303     if (!choresMissingStartTime.containsKey(chore) || !choresMissingStartTime.get(chore)) {
304       choresMissingStartTime.put(chore, requestCorePoolIncrease());
305     }
306 
307     // Must reschedule the chore to prevent unnecessary delays of chores in the scheduler. If
308     // the chore is NOT rescheduled, future executions of this chore will be delayed more and
309     // more on each iteration. This hurts us because the ScheduledThreadPoolExecutor allocates
310     // idle threads to chores based on how delayed they are.
311     rescheduleChore(chore);
312     printChoreDetails("onChoreMissedStartTime", chore);
313   }
314 
315   /**
316    * shutdown the service. Any chores that are scheduled for execution will be cancelled. Any chores
317    * in the middle of execution will be interrupted and shutdown. This service will be unusable
318    * after this method has been called (i.e. future scheduling attempts will fail).
319    */
320   public synchronized void shutdown() {
321     scheduler.shutdownNow();
322     if (LOG.isInfoEnabled()) {
323       LOG.info("Chore service for: " + coreThreadPoolPrefix + " had " + scheduledChores.keySet()
324           + " on shutdown");
325     }
326     cancelAllChores(true);
327     scheduledChores.clear();
328     choresMissingStartTime.clear();
329   }
330   
331   /**
332    * @return true when the service is shutdown and thus cannot be used anymore
333    */
334   public boolean isShutdown() {
335     return scheduler.isShutdown();
336   }
337 
338   /**
339    * @return true when the service is shutdown and all threads have terminated
340    */
341   public boolean isTerminated() {
342     return scheduler.isTerminated();
343   }
344 
345   private void cancelAllChores(final boolean mayInterruptIfRunning) {
346     ArrayList<ScheduledChore> choresToCancel = new ArrayList<ScheduledChore>();
347     // Build list of chores to cancel so we can iterate through a set that won't change
348     // as chores are cancelled. If we tried to cancel each chore while iterating through
349     // keySet the results would be undefined because the keySet would be changing
350     for (ScheduledChore chore : scheduledChores.keySet()) {
351       choresToCancel.add(chore);
352     }
353     for (ScheduledChore chore : choresToCancel) {
354       cancelChore(chore, mayInterruptIfRunning);
355     }
356     choresToCancel.clear();
357   }
358 
359   /**
360    * Prints a summary of important details about the chore. Used for debugging purposes
361    */
362   private void printChoreDetails(final String header, ScheduledChore chore) {
363     LinkedHashMap<String, String> output = new LinkedHashMap<String, String>();
364     output.put(header, "");
365     output.put("Chore name: ", chore.getName());
366     output.put("Chore period: ", Integer.toString(chore.getPeriod()));
367     output.put("Chore timeBetweenRuns: ", Long.toString(chore.getTimeBetweenRuns()));
368 
369     for (Entry<String, String> entry : output.entrySet()) {
370       if (LOG.isTraceEnabled()) LOG.trace(entry.getKey() + entry.getValue());
371     }
372   }
373 
374   /**
375    * Prints a summary of important details about the service. Used for debugging purposes
376    */
377   private void printChoreServiceDetails(final String header) {
378     LinkedHashMap<String, String> output = new LinkedHashMap<String, String>();
379     output.put(header, "");
380     output.put("ChoreService corePoolSize: ", Integer.toString(getCorePoolSize()));
381     output.put("ChoreService scheduledChores: ", Integer.toString(getNumberOfScheduledChores()));
382     output.put("ChoreService missingStartTimeCount: ",
383       Integer.toString(getNumberOfChoresMissingStartTime()));
384 
385     for (Entry<String, String> entry : output.entrySet()) {
386       if (LOG.isTraceEnabled()) LOG.trace(entry.getKey() + entry.getValue());
387     }
388   }
389 }