1 /**
2 *
3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
10 *
11 * http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 */
19 package org.apache.hadoop.hbase;
20
21 import java.util.ArrayList;
22 import java.util.HashMap;
23 import java.util.LinkedHashMap;
24 import java.util.Map.Entry;
25 import java.util.concurrent.ScheduledFuture;
26 import java.util.concurrent.ScheduledThreadPoolExecutor;
27 import java.util.concurrent.ThreadFactory;
28 import java.util.concurrent.atomic.AtomicInteger;
29
30 import com.google.common.annotations.VisibleForTesting;
31 import org.apache.commons.logging.Log;
32 import org.apache.commons.logging.LogFactory;
33 import org.apache.hadoop.hbase.ScheduledChore.ChoreServicer;
34 import org.apache.hadoop.hbase.classification.InterfaceAudience;
35
36 /**
37 * ChoreService is a service that can be used to schedule instances of {@link ScheduledChore} to run
38 * periodically while sharing threads. The ChoreService is backed by a
39 * {@link ScheduledThreadPoolExecutor} whose core pool size changes dynamically depending on the
40 * number of {@link ScheduledChore} scheduled. All of the threads in the core thread pool of the
41 * underlying {@link ScheduledThreadPoolExecutor} are set to be daemon threads.
42 * <p>
43 * The ChoreService provides the ability to schedule, cancel, and trigger instances of
44 * {@link ScheduledChore}. The ChoreService also provides the ability to check on the status of
45 * scheduled chores. The number of threads used by the ChoreService changes based on the scheduling
46 * load and whether or not the scheduled chores are executing on time. As more chores are scheduled,
47 * there may be a need to increase the number of threads if it is noticed that chores are no longer
48 * meeting their scheduled start times. On the other hand, as chores are cancelled, an attempt is
49 * made to reduce the number of running threads to see if chores can still meet their start times
50 * with a smaller thread pool.
51 * <p>
52 * When finished with a ChoreService it is good practice to call {@link ChoreService#shutdown()}.
53 * Calling this method ensures that all scheduled chores are cancelled and cleaned up properly.
54 */
55 @InterfaceAudience.Private
56 public class ChoreService implements ChoreServicer {
57 private static final Log LOG = LogFactory.getLog(ChoreService.class);
58
59 /**
60 * The minimum number of threads in the core pool of the underlying ScheduledThreadPoolExecutor
61 */
62 public final static int MIN_CORE_POOL_SIZE = 1;
63
64 /**
65 * This thread pool is used to schedule all of the Chores
66 */
67 private final ScheduledThreadPoolExecutor scheduler;
68
69 /**
70 * Maps chores to their futures. Futures are used to control a chore's schedule
71 */
72 private final HashMap<ScheduledChore, ScheduledFuture<?>> scheduledChores;
73
74 /**
75 * Maps chores to Booleans which indicate whether or not a chore has caused an increase in the
76 * core pool size of the ScheduledThreadPoolExecutor. Each chore should only be allowed to
77 * increase the core pool size by 1 (otherwise a single long running chore whose execution is
78 * longer than its period would be able to spawn too many threads).
79 */
80 private final HashMap<ScheduledChore, Boolean> choresMissingStartTime;
81
82 /**
83 * The coreThreadPoolPrefix is the prefix that will be applied to all threads within the
84 * ScheduledThreadPoolExecutor. The prefix is typically related to the Server that the service is
85 * running on. The prefix is useful because it allows us to monitor how the thread pool of a
86 * particular service changes over time VIA thread dumps.
87 */
88 private final String coreThreadPoolPrefix;
89
90 /**
91 *
92 * @param coreThreadPoolPrefix Prefix that will be applied to the Thread name of all threads
93 * spawned by this service
94 */
95 @VisibleForTesting
96 public ChoreService(final String coreThreadPoolPrefix) {
97 this(coreThreadPoolPrefix, MIN_CORE_POOL_SIZE, false);
98 }
99
100 /**
101 * @param jitter Should chore service add some jitter for all of the scheduled chores. When set
102 * to true this will add -10% to 10% jitter.
103 */
104 public ChoreService(final String coreThreadPoolPrefix, final boolean jitter) {
105 this(coreThreadPoolPrefix, MIN_CORE_POOL_SIZE, jitter);
106 }
107
108 /**
109 * @param coreThreadPoolPrefix Prefix that will be applied to the Thread name of all threads
110 * spawned by this service
111 * @param corePoolSize The initial size to set the core pool of the ScheduledThreadPoolExecutor
112 * to during initialization. The default size is 1, but specifying a larger size may be
113 * beneficial if you know that 1 thread will not be enough.
114 */
115 public ChoreService(final String coreThreadPoolPrefix, int corePoolSize, boolean jitter) {
116 this.coreThreadPoolPrefix = coreThreadPoolPrefix;
117 if (corePoolSize < MIN_CORE_POOL_SIZE) {
118 corePoolSize = MIN_CORE_POOL_SIZE;
119 }
120
121 final ThreadFactory threadFactory = new ChoreServiceThreadFactory(coreThreadPoolPrefix);
122 if (jitter) {
123 scheduler = new JitterScheduledThreadPoolExecutorImpl(corePoolSize, threadFactory, 0.1);
124 } else {
125 scheduler = new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
126 }
127
128 scheduler.setRemoveOnCancelPolicy(true);
129 scheduledChores = new HashMap<ScheduledChore, ScheduledFuture<?>>();
130 choresMissingStartTime = new HashMap<ScheduledChore, Boolean>();
131 }
132
133 /**
134 * @param coreThreadPoolPrefix Prefix that will be applied to the Thread name of all threads
135 * spawned by this service
136 */
137 public static ChoreService getInstance(final String coreThreadPoolPrefix) {
138 return new ChoreService(coreThreadPoolPrefix);
139 }
140
141 /**
142 * @param chore Chore to be scheduled. If the chore is already scheduled with another ChoreService
143 * instance, that schedule will be cancelled (i.e. a Chore can only ever be scheduled
144 * with a single ChoreService instance).
145 * @return true when the chore was successfully scheduled. false when the scheduling failed
146 * (typically occurs when a chore is scheduled during shutdown of service)
147 */
148 public synchronized boolean scheduleChore(ScheduledChore chore) {
149 if (chore == null) {
150 return false;
151 }
152
153 try {
154 chore.setChoreServicer(this);
155 ScheduledFuture<?> future =
156 scheduler.scheduleAtFixedRate(chore, chore.getInitialDelay(), chore.getPeriod(),
157 chore.getTimeUnit());
158 scheduledChores.put(chore, future);
159 return true;
160 } catch (Exception exception) {
161 if (LOG.isInfoEnabled()) {
162 LOG.info("Could not successfully schedule chore: " + chore.getName());
163 }
164 return false;
165 }
166 }
167
168 /**
169 * @param chore The Chore to be rescheduled. If the chore is not scheduled with this ChoreService
170 * yet then this call is equivalent to a call to scheduleChore.
171 */
172 private synchronized void rescheduleChore(ScheduledChore chore) {
173 if (chore == null) return;
174
175 if (scheduledChores.containsKey(chore)) {
176 ScheduledFuture<?> future = scheduledChores.get(chore);
177 future.cancel(false);
178 }
179 scheduleChore(chore);
180 }
181
182 @Override
183 public synchronized void cancelChore(ScheduledChore chore) {
184 cancelChore(chore, true);
185 }
186
187 @Override
188 public synchronized void cancelChore(ScheduledChore chore, boolean mayInterruptIfRunning) {
189 if (chore != null && scheduledChores.containsKey(chore)) {
190 ScheduledFuture<?> future = scheduledChores.get(chore);
191 future.cancel(mayInterruptIfRunning);
192 scheduledChores.remove(chore);
193
194 // Removing a chore that was missing its start time means it may be possible
195 // to reduce the number of threads
196 if (choresMissingStartTime.containsKey(chore)) {
197 choresMissingStartTime.remove(chore);
198 requestCorePoolDecrease();
199 }
200 }
201 }
202
203 @Override
204 public synchronized boolean isChoreScheduled(ScheduledChore chore) {
205 return chore != null && scheduledChores.containsKey(chore)
206 && !scheduledChores.get(chore).isDone();
207 }
208
209 @Override
210 public synchronized boolean triggerNow(ScheduledChore chore) {
211 if (chore == null) {
212 return false;
213 } else {
214 rescheduleChore(chore);
215 return true;
216 }
217 }
218
219 /**
220 * @return number of chores that this service currently has scheduled
221 */
222 int getNumberOfScheduledChores() {
223 return scheduledChores.size();
224 }
225
226 /**
227 * @return number of chores that this service currently has scheduled that are missing their
228 * scheduled start time
229 */
230 int getNumberOfChoresMissingStartTime() {
231 return choresMissingStartTime.size();
232 }
233
234 /**
235 * @return number of threads in the core pool of the underlying ScheduledThreadPoolExecutor
236 */
237 int getCorePoolSize() {
238 return scheduler.getCorePoolSize();
239 }
240
241 /**
242 * Custom ThreadFactory used with the ScheduledThreadPoolExecutor so that all the threads are
243 * daemon threads, and thus, don't prevent the JVM from shutting down
244 */
245 static class ChoreServiceThreadFactory implements ThreadFactory {
246 private final String threadPrefix;
247 private final static String THREAD_NAME_SUFFIX = "_ChoreService_";
248 private AtomicInteger threadNumber = new AtomicInteger(1);
249
250 /**
251 * @param threadPrefix The prefix given to all threads created by this factory
252 */
253 public ChoreServiceThreadFactory(final String threadPrefix) {
254 this.threadPrefix = threadPrefix;
255 }
256
257 @Override
258 public Thread newThread(Runnable r) {
259 Thread thread =
260 new Thread(r, threadPrefix + THREAD_NAME_SUFFIX + threadNumber.getAndIncrement());
261 thread.setDaemon(true);
262 return thread;
263 }
264 }
265
266 /**
267 * Represents a request to increase the number of core pool threads. Typically a request
268 * originates from the fact that the current core pool size is not sufficient to service all of
269 * the currently running Chores
270 * @return true when the request to increase the core pool size succeeds
271 */
272 private synchronized boolean requestCorePoolIncrease() {
273 // There is no point in creating more threads than scheduledChores.size since scheduled runs
274 // of the same chore cannot run concurrently (i.e. happen-before behavior is enforced
275 // amongst occurrences of the same chore).
276 if (scheduler.getCorePoolSize() < scheduledChores.size()) {
277 scheduler.setCorePoolSize(scheduler.getCorePoolSize() + 1);
278 printChoreServiceDetails("requestCorePoolIncrease");
279 return true;
280 }
281 return false;
282 }
283
284 /**
285 * Represents a request to decrease the number of core pool threads. Typically a request
286 * originates from the fact that the current core pool size is more than sufficient to service the
287 * running Chores.
288 */
289 private synchronized void requestCorePoolDecrease() {
290 if (scheduler.getCorePoolSize() > MIN_CORE_POOL_SIZE) {
291 scheduler.setCorePoolSize(scheduler.getCorePoolSize() - 1);
292 printChoreServiceDetails("requestCorePoolDecrease");
293 }
294 }
295
296 @Override
297 public synchronized void onChoreMissedStartTime(ScheduledChore chore) {
298 if (chore == null || !scheduledChores.containsKey(chore)) return;
299
300 // If the chore has not caused an increase in the size of the core thread pool then request an
301 // increase. This allows each chore missing its start time to increase the core pool size by
302 // at most 1.
303 if (!choresMissingStartTime.containsKey(chore) || !choresMissingStartTime.get(chore)) {
304 choresMissingStartTime.put(chore, requestCorePoolIncrease());
305 }
306
307 // Must reschedule the chore to prevent unnecessary delays of chores in the scheduler. If
308 // the chore is NOT rescheduled, future executions of this chore will be delayed more and
309 // more on each iteration. This hurts us because the ScheduledThreadPoolExecutor allocates
310 // idle threads to chores based on how delayed they are.
311 rescheduleChore(chore);
312 printChoreDetails("onChoreMissedStartTime", chore);
313 }
314
315 /**
316 * shutdown the service. Any chores that are scheduled for execution will be cancelled. Any chores
317 * in the middle of execution will be interrupted and shutdown. This service will be unusable
318 * after this method has been called (i.e. future scheduling attempts will fail).
319 */
320 public synchronized void shutdown() {
321 scheduler.shutdownNow();
322 if (LOG.isInfoEnabled()) {
323 LOG.info("Chore service for: " + coreThreadPoolPrefix + " had " + scheduledChores.keySet()
324 + " on shutdown");
325 }
326 cancelAllChores(true);
327 scheduledChores.clear();
328 choresMissingStartTime.clear();
329 }
330
331 /**
332 * @return true when the service is shutdown and thus cannot be used anymore
333 */
334 public boolean isShutdown() {
335 return scheduler.isShutdown();
336 }
337
338 /**
339 * @return true when the service is shutdown and all threads have terminated
340 */
341 public boolean isTerminated() {
342 return scheduler.isTerminated();
343 }
344
345 private void cancelAllChores(final boolean mayInterruptIfRunning) {
346 ArrayList<ScheduledChore> choresToCancel = new ArrayList<ScheduledChore>();
347 // Build list of chores to cancel so we can iterate through a set that won't change
348 // as chores are cancelled. If we tried to cancel each chore while iterating through
349 // keySet the results would be undefined because the keySet would be changing
350 for (ScheduledChore chore : scheduledChores.keySet()) {
351 choresToCancel.add(chore);
352 }
353 for (ScheduledChore chore : choresToCancel) {
354 cancelChore(chore, mayInterruptIfRunning);
355 }
356 choresToCancel.clear();
357 }
358
359 /**
360 * Prints a summary of important details about the chore. Used for debugging purposes
361 */
362 private void printChoreDetails(final String header, ScheduledChore chore) {
363 LinkedHashMap<String, String> output = new LinkedHashMap<String, String>();
364 output.put(header, "");
365 output.put("Chore name: ", chore.getName());
366 output.put("Chore period: ", Integer.toString(chore.getPeriod()));
367 output.put("Chore timeBetweenRuns: ", Long.toString(chore.getTimeBetweenRuns()));
368
369 for (Entry<String, String> entry : output.entrySet()) {
370 if (LOG.isTraceEnabled()) LOG.trace(entry.getKey() + entry.getValue());
371 }
372 }
373
374 /**
375 * Prints a summary of important details about the service. Used for debugging purposes
376 */
377 private void printChoreServiceDetails(final String header) {
378 LinkedHashMap<String, String> output = new LinkedHashMap<String, String>();
379 output.put(header, "");
380 output.put("ChoreService corePoolSize: ", Integer.toString(getCorePoolSize()));
381 output.put("ChoreService scheduledChores: ", Integer.toString(getNumberOfScheduledChores()));
382 output.put("ChoreService missingStartTimeCount: ",
383 Integer.toString(getNumberOfChoresMissingStartTime()));
384
385 for (Entry<String, String> entry : output.entrySet()) {
386 if (LOG.isTraceEnabled()) LOG.trace(entry.getKey() + entry.getValue());
387 }
388 }
389 }