1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase;
20
21 import java.util.ArrayList;
22 import java.util.HashMap;
23 import java.util.LinkedHashMap;
24 import java.util.Map.Entry;
25 import java.util.concurrent.ScheduledFuture;
26 import java.util.concurrent.ScheduledThreadPoolExecutor;
27 import java.util.concurrent.ThreadFactory;
28 import java.util.concurrent.atomic.AtomicInteger;
29
30 import com.google.common.annotations.VisibleForTesting;
31 import org.apache.commons.logging.Log;
32 import org.apache.commons.logging.LogFactory;
33 import org.apache.hadoop.hbase.ScheduledChore.ChoreServicer;
34 import org.apache.hadoop.hbase.classification.InterfaceAudience;
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55 @InterfaceAudience.Private
56 public class ChoreService implements ChoreServicer {
57 private static final Log LOG = LogFactory.getLog(ChoreService.class);
58
59
60
61
62 public final static int MIN_CORE_POOL_SIZE = 1;
63
64
65
66
67 private final ScheduledThreadPoolExecutor scheduler;
68
69
70
71
72 private final HashMap<ScheduledChore, ScheduledFuture<?>> scheduledChores;
73
74
75
76
77
78
79
80 private final HashMap<ScheduledChore, Boolean> choresMissingStartTime;
81
82
83
84
85
86
87
88 private final String coreThreadPoolPrefix;
89
90
91
92
93
94
95 @VisibleForTesting
96 public ChoreService(final String coreThreadPoolPrefix) {
97 this(coreThreadPoolPrefix, MIN_CORE_POOL_SIZE, false);
98 }
99
100
101
102
103
104 public ChoreService(final String coreThreadPoolPrefix, final boolean jitter) {
105 this(coreThreadPoolPrefix, MIN_CORE_POOL_SIZE, jitter);
106 }
107
108
109
110
111
112
113
114
115 public ChoreService(final String coreThreadPoolPrefix, int corePoolSize, boolean jitter) {
116 this.coreThreadPoolPrefix = coreThreadPoolPrefix;
117 if (corePoolSize < MIN_CORE_POOL_SIZE) {
118 corePoolSize = MIN_CORE_POOL_SIZE;
119 }
120
121 final ThreadFactory threadFactory = new ChoreServiceThreadFactory(coreThreadPoolPrefix);
122 if (jitter) {
123 scheduler = new JitterScheduledThreadPoolExecutorImpl(corePoolSize, threadFactory, 0.1);
124 } else {
125 scheduler = new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
126 }
127
128 scheduler.setRemoveOnCancelPolicy(true);
129 scheduledChores = new HashMap<ScheduledChore, ScheduledFuture<?>>();
130 choresMissingStartTime = new HashMap<ScheduledChore, Boolean>();
131 }
132
133
134
135
136
137 public static ChoreService getInstance(final String coreThreadPoolPrefix) {
138 return new ChoreService(coreThreadPoolPrefix);
139 }
140
141
142
143
144
145
146
147
148 public synchronized boolean scheduleChore(ScheduledChore chore) {
149 if (chore == null) {
150 return false;
151 }
152
153 try {
154 chore.setChoreServicer(this);
155 ScheduledFuture<?> future =
156 scheduler.scheduleAtFixedRate(chore, chore.getInitialDelay(), chore.getPeriod(),
157 chore.getTimeUnit());
158 scheduledChores.put(chore, future);
159 return true;
160 } catch (Exception exception) {
161 if (LOG.isInfoEnabled()) {
162 LOG.info("Could not successfully schedule chore: " + chore.getName());
163 }
164 return false;
165 }
166 }
167
168
169
170
171
172 private synchronized void rescheduleChore(ScheduledChore chore) {
173 if (chore == null) return;
174
175 if (scheduledChores.containsKey(chore)) {
176 ScheduledFuture<?> future = scheduledChores.get(chore);
177 future.cancel(false);
178 }
179 scheduleChore(chore);
180 }
181
182 @Override
183 public synchronized void cancelChore(ScheduledChore chore) {
184 cancelChore(chore, true);
185 }
186
187 @Override
188 public synchronized void cancelChore(ScheduledChore chore, boolean mayInterruptIfRunning) {
189 if (chore != null && scheduledChores.containsKey(chore)) {
190 ScheduledFuture<?> future = scheduledChores.get(chore);
191 future.cancel(mayInterruptIfRunning);
192 scheduledChores.remove(chore);
193
194
195
196 if (choresMissingStartTime.containsKey(chore)) {
197 choresMissingStartTime.remove(chore);
198 requestCorePoolDecrease();
199 }
200 }
201 }
202
203 @Override
204 public synchronized boolean isChoreScheduled(ScheduledChore chore) {
205 return chore != null && scheduledChores.containsKey(chore)
206 && !scheduledChores.get(chore).isDone();
207 }
208
209 @Override
210 public synchronized boolean triggerNow(ScheduledChore chore) {
211 if (chore == null) {
212 return false;
213 } else {
214 rescheduleChore(chore);
215 return true;
216 }
217 }
218
219
220
221
222 int getNumberOfScheduledChores() {
223 return scheduledChores.size();
224 }
225
226
227
228
229
230 int getNumberOfChoresMissingStartTime() {
231 return choresMissingStartTime.size();
232 }
233
234
235
236
237 int getCorePoolSize() {
238 return scheduler.getCorePoolSize();
239 }
240
241
242
243
244
245 static class ChoreServiceThreadFactory implements ThreadFactory {
246 private final String threadPrefix;
247 private final static String THREAD_NAME_SUFFIX = "_ChoreService_";
248 private AtomicInteger threadNumber = new AtomicInteger(1);
249
250
251
252
253 public ChoreServiceThreadFactory(final String threadPrefix) {
254 this.threadPrefix = threadPrefix;
255 }
256
257 @Override
258 public Thread newThread(Runnable r) {
259 Thread thread =
260 new Thread(r, threadPrefix + THREAD_NAME_SUFFIX + threadNumber.getAndIncrement());
261 thread.setDaemon(true);
262 return thread;
263 }
264 }
265
266
267
268
269
270
271
272 private synchronized boolean requestCorePoolIncrease() {
273
274
275
276 if (scheduler.getCorePoolSize() < scheduledChores.size()) {
277 scheduler.setCorePoolSize(scheduler.getCorePoolSize() + 1);
278 printChoreServiceDetails("requestCorePoolIncrease");
279 return true;
280 }
281 return false;
282 }
283
284
285
286
287
288
289 private synchronized void requestCorePoolDecrease() {
290 if (scheduler.getCorePoolSize() > MIN_CORE_POOL_SIZE) {
291 scheduler.setCorePoolSize(scheduler.getCorePoolSize() - 1);
292 printChoreServiceDetails("requestCorePoolDecrease");
293 }
294 }
295
296 @Override
297 public synchronized void onChoreMissedStartTime(ScheduledChore chore) {
298 if (chore == null || !scheduledChores.containsKey(chore)) return;
299
300
301
302
303 if (!choresMissingStartTime.containsKey(chore) || !choresMissingStartTime.get(chore)) {
304 choresMissingStartTime.put(chore, requestCorePoolIncrease());
305 }
306
307
308
309
310
311 rescheduleChore(chore);
312 printChoreDetails("onChoreMissedStartTime", chore);
313 }
314
315
316
317
318
319
320 public synchronized void shutdown() {
321 scheduler.shutdownNow();
322 if (LOG.isInfoEnabled()) {
323 LOG.info("Chore service for: " + coreThreadPoolPrefix + " had " + scheduledChores.keySet()
324 + " on shutdown");
325 }
326 cancelAllChores(true);
327 scheduledChores.clear();
328 choresMissingStartTime.clear();
329 }
330
331
332
333
334 public boolean isShutdown() {
335 return scheduler.isShutdown();
336 }
337
338
339
340
341 public boolean isTerminated() {
342 return scheduler.isTerminated();
343 }
344
345 private void cancelAllChores(final boolean mayInterruptIfRunning) {
346 ArrayList<ScheduledChore> choresToCancel = new ArrayList<ScheduledChore>();
347
348
349
350 for (ScheduledChore chore : scheduledChores.keySet()) {
351 choresToCancel.add(chore);
352 }
353 for (ScheduledChore chore : choresToCancel) {
354 cancelChore(chore, mayInterruptIfRunning);
355 }
356 choresToCancel.clear();
357 }
358
359
360
361
362 private void printChoreDetails(final String header, ScheduledChore chore) {
363 LinkedHashMap<String, String> output = new LinkedHashMap<String, String>();
364 output.put(header, "");
365 output.put("Chore name: ", chore.getName());
366 output.put("Chore period: ", Integer.toString(chore.getPeriod()));
367 output.put("Chore timeBetweenRuns: ", Long.toString(chore.getTimeBetweenRuns()));
368
369 for (Entry<String, String> entry : output.entrySet()) {
370 if (LOG.isTraceEnabled()) LOG.trace(entry.getKey() + entry.getValue());
371 }
372 }
373
374
375
376
377 private void printChoreServiceDetails(final String header) {
378 LinkedHashMap<String, String> output = new LinkedHashMap<String, String>();
379 output.put(header, "");
380 output.put("ChoreService corePoolSize: ", Integer.toString(getCorePoolSize()));
381 output.put("ChoreService scheduledChores: ", Integer.toString(getNumberOfScheduledChores()));
382 output.put("ChoreService missingStartTimeCount: ",
383 Integer.toString(getNumberOfChoresMissingStartTime()));
384
385 for (Entry<String, String> entry : output.entrySet()) {
386 if (LOG.isTraceEnabled()) LOG.trace(entry.getKey() + entry.getValue());
387 }
388 }
389 }