View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase;
20  
21  import java.util.ArrayList;
22  import java.util.HashMap;
23  import java.util.LinkedHashMap;
24  import java.util.Map.Entry;
25  import java.util.concurrent.ScheduledFuture;
26  import java.util.concurrent.ScheduledThreadPoolExecutor;
27  import java.util.concurrent.ThreadFactory;
28  import java.util.concurrent.atomic.AtomicInteger;
29  
30  import org.apache.commons.logging.Log;
31  import org.apache.commons.logging.LogFactory;
32  import org.apache.hadoop.hbase.ScheduledChore.ChoreServicer;
33  import org.apache.hadoop.hbase.classification.InterfaceAudience;
34  
35  /**
36   * ChoreService is a service that can be used to schedule instances of {@link ScheduledChore} to run
37   * periodically while sharing threads. The ChoreService is backed by a
38   * {@link ScheduledThreadPoolExecutor} whose core pool size changes dynamically depending on the
39   * number of {@link ScheduledChore} scheduled. All of the threads in the core thread pool of the
40   * underlying {@link ScheduledThreadPoolExecutor} are set to be daemon threads.
41   * <p>
42   * The ChoreService provides the ability to schedule, cancel, and trigger instances of
43   * {@link ScheduledChore}. The ChoreService also provides the ability to check on the status of
44   * scheduled chores. The number of threads used by the ChoreService changes based on the scheduling
45   * load and whether or not the scheduled chores are executing on time. As more chores are scheduled,
46   * there may be a need to increase the number of threads if it is noticed that chores are no longer
47   * meeting their scheduled start times. On the other hand, as chores are cancelled, an attempt is
48   * made to reduce the number of running threads to see if chores can still meet their start times
49   * with a smaller thread pool.
50   * <p>
51   * When finished with a ChoreService it is good practice to call {@link ChoreService#shutdown()}.
52   * Calling this method ensures that all scheduled chores are cancelled and cleaned up properly.
53   */
54  @InterfaceAudience.Private
55  public class ChoreService implements ChoreServicer {
56    private final Log LOG = LogFactory.getLog(this.getClass());
57  
58    /**
59     * The minimum number of threads in the core pool of the underlying ScheduledThreadPoolExecutor
60     */
61    public final static int MIN_CORE_POOL_SIZE = 1;
62  
63    /**
64     * This thread pool is used to schedule all of the Chores
65     */
66    private final ScheduledThreadPoolExecutor scheduler;
67  
68    /**
69     * Maps chores to their futures. Futures are used to control a chore's schedule
70     */
71    private final HashMap<ScheduledChore, ScheduledFuture<?>> scheduledChores;
72  
73    /**
74     * Maps chores to Booleans which indicate whether or not a chore has caused an increase in the
75     * core pool size of the ScheduledThreadPoolExecutor. Each chore should only be allowed to
76     * increase the core pool size by 1 (otherwise a single long running chore whose execution is
77     * longer than its period would be able to spawn too many threads).
78     */
79    private final HashMap<ScheduledChore, Boolean> choresMissingStartTime;
80  
81    /**
82     * The coreThreadPoolPrefix is the prefix that will be applied to all threads within the
83     * ScheduledThreadPoolExecutor. The prefix is typically related to the Server that the service is
84     * running on. The prefix is useful because it allows us to monitor how the thread pool of a
85     * particular service changes over time VIA thread dumps.
86     */
87    private final String coreThreadPoolPrefix;
88  
89    /**
90     * @param coreThreadPoolPrefix Prefix that will be applied to the Thread name of all threads
91     *          spawned by this service
92     */
93    public ChoreService(final String coreThreadPoolPrefix) {
94      this(coreThreadPoolPrefix, MIN_CORE_POOL_SIZE);
95    }
96  
97    /**
98     * @param coreThreadPoolPrefix Prefix that will be applied to the Thread name of all threads
99     *          spawned by this service
100    * @param corePoolSize The initial size to set the core pool of the ScheduledThreadPoolExecutor 
101    *          to during initialization. The default size is 1, but specifying a larger size may be
102    *          beneficial if you know that 1 thread will not be enough.
103    */
104   public ChoreService(final String coreThreadPoolPrefix, int corePoolSize) {
105     this.coreThreadPoolPrefix = coreThreadPoolPrefix;
106     if (corePoolSize < MIN_CORE_POOL_SIZE) corePoolSize = MIN_CORE_POOL_SIZE;
107     final ThreadFactory threadFactory = new ChoreServiceThreadFactory(coreThreadPoolPrefix);
108     scheduler = new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
109     scheduler.setRemoveOnCancelPolicy(true);
110     scheduledChores = new HashMap<ScheduledChore, ScheduledFuture<?>>();
111     choresMissingStartTime = new HashMap<ScheduledChore, Boolean>();
112   }
113 
114   /**
115    * @param coreThreadPoolPrefix Prefix that will be applied to the Thread name of all threads
116    *          spawned by this service
117    */
118   public static ChoreService getInstance(final String coreThreadPoolPrefix) {
119     return new ChoreService(coreThreadPoolPrefix);
120   }
121 
122   /**
123    * @param chore Chore to be scheduled. If the chore is already scheduled with another ChoreService
124    *          instance, that schedule will be cancelled (i.e. a Chore can only ever be scheduled
125    *          with a single ChoreService instance).
126    * @return true when the chore was successfully scheduled. false when the scheduling failed
127    *         (typically occurs when a chore is scheduled during shutdown of service)
128    */
129   public synchronized boolean scheduleChore(ScheduledChore chore) {
130     if (chore == null) return false;
131 
132     try {
133       chore.setChoreServicer(this);
134       ScheduledFuture<?> future =
135           scheduler.scheduleAtFixedRate(chore, chore.getInitialDelay(), chore.getPeriod(),
136             chore.getTimeUnit());
137       scheduledChores.put(chore, future);
138       return true;
139     } catch (Exception exception) {
140       if (LOG.isInfoEnabled()) {
141         LOG.info("Could not successfully schedule chore: " + chore.getName());
142       }
143       return false;
144     }
145   }
146 
147   /**
148    * @param chore The Chore to be rescheduled. If the chore is not scheduled with this ChoreService
149    *          yet then this call is equivalent to a call to scheduleChore.
150    */
151   private synchronized void rescheduleChore(ScheduledChore chore) {
152     if (chore == null) return;
153 
154     if (scheduledChores.containsKey(chore)) {
155       ScheduledFuture<?> future = scheduledChores.get(chore);
156       future.cancel(false);
157     }
158     scheduleChore(chore);
159   }
160 
161   @Override
162   public synchronized void cancelChore(ScheduledChore chore) {
163     cancelChore(chore, true);
164   }
165 
166   @Override
167   public synchronized void cancelChore(ScheduledChore chore, boolean mayInterruptIfRunning) {
168     if (chore != null && scheduledChores.containsKey(chore)) {
169       ScheduledFuture<?> future = scheduledChores.get(chore);
170       future.cancel(mayInterruptIfRunning);
171       scheduledChores.remove(chore);
172 
173       // Removing a chore that was missing its start time means it may be possible
174       // to reduce the number of threads
175       if (choresMissingStartTime.containsKey(chore)) {
176         choresMissingStartTime.remove(chore);
177         requestCorePoolDecrease();
178       }
179     }
180   }
181 
182   @Override
183   public synchronized boolean isChoreScheduled(ScheduledChore chore) {
184     return chore != null && scheduledChores.containsKey(chore)
185         && !scheduledChores.get(chore).isDone();
186   }
187 
188   @Override
189   public synchronized boolean triggerNow(ScheduledChore chore) {
190     if (chore == null) {
191       return false;
192     } else {
193       rescheduleChore(chore);
194       return true;
195     }
196   }
197 
198   /**
199    * @return number of chores that this service currently has scheduled
200    */
201   int getNumberOfScheduledChores() {
202     return scheduledChores.size();
203   }
204 
205   /**
206    * @return number of chores that this service currently has scheduled that are missing their
207    *         scheduled start time
208    */
209   int getNumberOfChoresMissingStartTime() {
210     return choresMissingStartTime.size();
211   }
212 
213   /**
214    * @return number of threads in the core pool of the underlying ScheduledThreadPoolExecutor
215    */
216   int getCorePoolSize() {
217     return scheduler.getCorePoolSize();
218   }
219 
220   /**
221    * Custom ThreadFactory used with the ScheduledThreadPoolExecutor so that all the threads are
222    * daemon threads, and thus, don't prevent the JVM from shutting down
223    */
224   static class ChoreServiceThreadFactory implements ThreadFactory {
225     private final String threadPrefix;
226     private final static String THREAD_NAME_SUFFIX = "_ChoreService_";
227     private AtomicInteger threadNumber = new AtomicInteger(1);
228 
229     /**
230      * @param threadPrefix The prefix given to all threads created by this factory
231      */
232     public ChoreServiceThreadFactory(final String threadPrefix) {
233       this.threadPrefix = threadPrefix;
234     }
235 
236     @Override
237     public Thread newThread(Runnable r) {
238       Thread thread =
239           new Thread(r, threadPrefix + THREAD_NAME_SUFFIX + threadNumber.getAndIncrement());
240       thread.setDaemon(true);
241       return thread;
242     }
243   }
244 
245   /**
246    * Represents a request to increase the number of core pool threads. Typically a request
247    * originates from the fact that the current core pool size is not sufficient to service all of
248    * the currently running Chores
249    * @return true when the request to increase the core pool size succeeds
250    */
251   private synchronized boolean requestCorePoolIncrease() {
252     // There is no point in creating more threads than scheduledChores.size since scheduled runs
253     // of the same chore cannot run concurrently (i.e. happen-before behavior is enforced
254     // amongst occurrences of the same chore).
255     if (scheduler.getCorePoolSize() < scheduledChores.size()) {
256       scheduler.setCorePoolSize(scheduler.getCorePoolSize() + 1);
257       printChoreServiceDetails("requestCorePoolIncrease");
258       return true;
259     }
260     return false;
261   }
262 
263   /**
264    * Represents a request to decrease the number of core pool threads. Typically a request
265    * originates from the fact that the current core pool size is more than sufficient to service the
266    * running Chores.
267    */
268   private synchronized void requestCorePoolDecrease() {
269     if (scheduler.getCorePoolSize() > MIN_CORE_POOL_SIZE) {
270       scheduler.setCorePoolSize(scheduler.getCorePoolSize() - 1);
271       printChoreServiceDetails("requestCorePoolDecrease");
272     }
273   }
274 
275   @Override
276   public synchronized void onChoreMissedStartTime(ScheduledChore chore) {
277     if (chore == null || !scheduledChores.containsKey(chore)) return;
278 
279     // If the chore has not caused an increase in the size of the core thread pool then request an
280     // increase. This allows each chore missing its start time to increase the core pool size by
281     // at most 1.
282     if (!choresMissingStartTime.containsKey(chore) || !choresMissingStartTime.get(chore)) {
283       choresMissingStartTime.put(chore, requestCorePoolIncrease());
284     }
285 
286     // Must reschedule the chore to prevent unnecessary delays of chores in the scheduler. If
287     // the chore is NOT rescheduled, future executions of this chore will be delayed more and
288     // more on each iteration. This hurts us because the ScheduledThreadPoolExecutor allocates
289     // idle threads to chores based on how delayed they are.
290     rescheduleChore(chore);
291     printChoreDetails("onChoreMissedStartTime", chore);
292   }
293 
294   /**
295    * shutdown the service. Any chores that are scheduled for execution will be cancelled. Any chores
296    * in the middle of execution will be interrupted and shutdown. This service will be unusable
297    * after this method has been called (i.e. future scheduling attempts will fail).
298    */
299   public synchronized void shutdown() {
300     scheduler.shutdownNow();
301     if (LOG.isInfoEnabled()) {
302       LOG.info("Chore service for: " + coreThreadPoolPrefix + " had " + scheduledChores.keySet()
303           + " on shutdown");
304     }
305     cancelAllChores(true);
306     scheduledChores.clear();
307     choresMissingStartTime.clear();
308   }
309   
310   /**
311    * @return true when the service is shutdown and thus cannot be used anymore
312    */
313   public boolean isShutdown() {
314     return scheduler.isShutdown();
315   }
316 
317   /**
318    * @return true when the service is shutdown and all threads have terminated
319    */
320   public boolean isTerminated() {
321     return scheduler.isTerminated();
322   }
323 
324   private void cancelAllChores(final boolean mayInterruptIfRunning) {
325     ArrayList<ScheduledChore> choresToCancel = new ArrayList<ScheduledChore>();
326     // Build list of chores to cancel so we can iterate through a set that won't change
327     // as chores are cancelled. If we tried to cancel each chore while iterating through
328     // keySet the results would be undefined because the keySet would be changing
329     for (ScheduledChore chore : scheduledChores.keySet()) {
330       choresToCancel.add(chore);
331     }
332     for (ScheduledChore chore : choresToCancel) {
333       cancelChore(chore, mayInterruptIfRunning);
334     }
335     choresToCancel.clear();
336   }
337 
338   /**
339    * Prints a summary of important details about the chore. Used for debugging purposes
340    */
341   private void printChoreDetails(final String header, ScheduledChore chore) {
342     LinkedHashMap<String, String> output = new LinkedHashMap<String, String>();
343     output.put(header, "");
344     output.put("Chore name: ", chore.getName());
345     output.put("Chore period: ", Integer.toString(chore.getPeriod()));
346     output.put("Chore timeBetweenRuns: ", Long.toString(chore.getTimeBetweenRuns()));
347 
348     for (Entry<String, String> entry : output.entrySet()) {
349       if (LOG.isTraceEnabled()) LOG.trace(entry.getKey() + entry.getValue());
350     }
351   }
352 
353   /**
354    * Prints a summary of important details about the service. Used for debugging purposes
355    */
356   private void printChoreServiceDetails(final String header) {
357     LinkedHashMap<String, String> output = new LinkedHashMap<String, String>();
358     output.put(header, "");
359     output.put("ChoreService corePoolSize: ", Integer.toString(getCorePoolSize()));
360     output.put("ChoreService scheduledChores: ", Integer.toString(getNumberOfScheduledChores()));
361     output.put("ChoreService missingStartTimeCount: ",
362       Integer.toString(getNumberOfChoresMissingStartTime()));
363 
364     for (Entry<String, String> entry : output.entrySet()) {
365       if (LOG.isTraceEnabled()) LOG.trace(entry.getKey() + entry.getValue());
366     }
367   }
368 }