1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase;
20
21 import java.util.ArrayList;
22 import java.util.HashMap;
23 import java.util.LinkedHashMap;
24 import java.util.Map.Entry;
25 import java.util.concurrent.ScheduledFuture;
26 import java.util.concurrent.ScheduledThreadPoolExecutor;
27 import java.util.concurrent.ThreadFactory;
28 import java.util.concurrent.atomic.AtomicInteger;
29
30 import org.apache.commons.logging.Log;
31 import org.apache.commons.logging.LogFactory;
32 import org.apache.hadoop.hbase.ScheduledChore.ChoreServicer;
33 import org.apache.hadoop.hbase.classification.InterfaceAudience;
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54 @InterfaceAudience.Private
55 public class ChoreService implements ChoreServicer {
56 private final Log LOG = LogFactory.getLog(this.getClass());
57
58
59
60
61 public final static int MIN_CORE_POOL_SIZE = 1;
62
63
64
65
66 private final ScheduledThreadPoolExecutor scheduler;
67
68
69
70
71 private final HashMap<ScheduledChore, ScheduledFuture<?>> scheduledChores;
72
73
74
75
76
77
78
79 private final HashMap<ScheduledChore, Boolean> choresMissingStartTime;
80
81
82
83
84
85
86
87 private final String coreThreadPoolPrefix;
88
89
90
91
92
93 public ChoreService(final String coreThreadPoolPrefix) {
94 this(coreThreadPoolPrefix, MIN_CORE_POOL_SIZE);
95 }
96
97
98
99
100
101
102
103
104 public ChoreService(final String coreThreadPoolPrefix, int corePoolSize) {
105 this.coreThreadPoolPrefix = coreThreadPoolPrefix;
106 if (corePoolSize < MIN_CORE_POOL_SIZE) corePoolSize = MIN_CORE_POOL_SIZE;
107 final ThreadFactory threadFactory = new ChoreServiceThreadFactory(coreThreadPoolPrefix);
108 scheduler = new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
109 scheduler.setRemoveOnCancelPolicy(true);
110 scheduledChores = new HashMap<ScheduledChore, ScheduledFuture<?>>();
111 choresMissingStartTime = new HashMap<ScheduledChore, Boolean>();
112 }
113
114
115
116
117
118 public static ChoreService getInstance(final String coreThreadPoolPrefix) {
119 return new ChoreService(coreThreadPoolPrefix);
120 }
121
122
123
124
125
126
127
128
129 public synchronized boolean scheduleChore(ScheduledChore chore) {
130 if (chore == null) return false;
131
132 try {
133 chore.setChoreServicer(this);
134 ScheduledFuture<?> future =
135 scheduler.scheduleAtFixedRate(chore, chore.getInitialDelay(), chore.getPeriod(),
136 chore.getTimeUnit());
137 scheduledChores.put(chore, future);
138 return true;
139 } catch (Exception exception) {
140 if (LOG.isInfoEnabled()) {
141 LOG.info("Could not successfully schedule chore: " + chore.getName());
142 }
143 return false;
144 }
145 }
146
147
148
149
150
151 private synchronized void rescheduleChore(ScheduledChore chore) {
152 if (chore == null) return;
153
154 if (scheduledChores.containsKey(chore)) {
155 ScheduledFuture<?> future = scheduledChores.get(chore);
156 future.cancel(false);
157 }
158 scheduleChore(chore);
159 }
160
161 @Override
162 public synchronized void cancelChore(ScheduledChore chore) {
163 cancelChore(chore, true);
164 }
165
166 @Override
167 public synchronized void cancelChore(ScheduledChore chore, boolean mayInterruptIfRunning) {
168 if (chore != null && scheduledChores.containsKey(chore)) {
169 ScheduledFuture<?> future = scheduledChores.get(chore);
170 future.cancel(mayInterruptIfRunning);
171 scheduledChores.remove(chore);
172
173
174
175 if (choresMissingStartTime.containsKey(chore)) {
176 choresMissingStartTime.remove(chore);
177 requestCorePoolDecrease();
178 }
179 }
180 }
181
182 @Override
183 public synchronized boolean isChoreScheduled(ScheduledChore chore) {
184 return chore != null && scheduledChores.containsKey(chore)
185 && !scheduledChores.get(chore).isDone();
186 }
187
188 @Override
189 public synchronized boolean triggerNow(ScheduledChore chore) {
190 if (chore == null) {
191 return false;
192 } else {
193 rescheduleChore(chore);
194 return true;
195 }
196 }
197
198
199
200
201 int getNumberOfScheduledChores() {
202 return scheduledChores.size();
203 }
204
205
206
207
208
209 int getNumberOfChoresMissingStartTime() {
210 return choresMissingStartTime.size();
211 }
212
213
214
215
216 int getCorePoolSize() {
217 return scheduler.getCorePoolSize();
218 }
219
220
221
222
223
224 static class ChoreServiceThreadFactory implements ThreadFactory {
225 private final String threadPrefix;
226 private final static String THREAD_NAME_SUFFIX = "_ChoreService_";
227 private AtomicInteger threadNumber = new AtomicInteger(1);
228
229
230
231
232 public ChoreServiceThreadFactory(final String threadPrefix) {
233 this.threadPrefix = threadPrefix;
234 }
235
236 @Override
237 public Thread newThread(Runnable r) {
238 Thread thread =
239 new Thread(r, threadPrefix + THREAD_NAME_SUFFIX + threadNumber.getAndIncrement());
240 thread.setDaemon(true);
241 return thread;
242 }
243 }
244
245
246
247
248
249
250
251 private synchronized boolean requestCorePoolIncrease() {
252
253
254
255 if (scheduler.getCorePoolSize() < scheduledChores.size()) {
256 scheduler.setCorePoolSize(scheduler.getCorePoolSize() + 1);
257 printChoreServiceDetails("requestCorePoolIncrease");
258 return true;
259 }
260 return false;
261 }
262
263
264
265
266
267
268 private synchronized void requestCorePoolDecrease() {
269 if (scheduler.getCorePoolSize() > MIN_CORE_POOL_SIZE) {
270 scheduler.setCorePoolSize(scheduler.getCorePoolSize() - 1);
271 printChoreServiceDetails("requestCorePoolDecrease");
272 }
273 }
274
275 @Override
276 public synchronized void onChoreMissedStartTime(ScheduledChore chore) {
277 if (chore == null || !scheduledChores.containsKey(chore)) return;
278
279
280
281
282 if (!choresMissingStartTime.containsKey(chore) || !choresMissingStartTime.get(chore)) {
283 choresMissingStartTime.put(chore, requestCorePoolIncrease());
284 }
285
286
287
288
289
290 rescheduleChore(chore);
291 printChoreDetails("onChoreMissedStartTime", chore);
292 }
293
294
295
296
297
298
299 public synchronized void shutdown() {
300 scheduler.shutdownNow();
301 if (LOG.isInfoEnabled()) {
302 LOG.info("Chore service for: " + coreThreadPoolPrefix + " had " + scheduledChores.keySet()
303 + " on shutdown");
304 }
305 cancelAllChores(true);
306 scheduledChores.clear();
307 choresMissingStartTime.clear();
308 }
309
310
311
312
313 public boolean isShutdown() {
314 return scheduler.isShutdown();
315 }
316
317
318
319
320 public boolean isTerminated() {
321 return scheduler.isTerminated();
322 }
323
324 private void cancelAllChores(final boolean mayInterruptIfRunning) {
325 ArrayList<ScheduledChore> choresToCancel = new ArrayList<ScheduledChore>();
326
327
328
329 for (ScheduledChore chore : scheduledChores.keySet()) {
330 choresToCancel.add(chore);
331 }
332 for (ScheduledChore chore : choresToCancel) {
333 cancelChore(chore, mayInterruptIfRunning);
334 }
335 choresToCancel.clear();
336 }
337
338
339
340
341 private void printChoreDetails(final String header, ScheduledChore chore) {
342 LinkedHashMap<String, String> output = new LinkedHashMap<String, String>();
343 output.put(header, "");
344 output.put("Chore name: ", chore.getName());
345 output.put("Chore period: ", Integer.toString(chore.getPeriod()));
346 output.put("Chore timeBetweenRuns: ", Long.toString(chore.getTimeBetweenRuns()));
347
348 for (Entry<String, String> entry : output.entrySet()) {
349 if (LOG.isTraceEnabled()) LOG.trace(entry.getKey() + entry.getValue());
350 }
351 }
352
353
354
355
356 private void printChoreServiceDetails(final String header) {
357 LinkedHashMap<String, String> output = new LinkedHashMap<String, String>();
358 output.put(header, "");
359 output.put("ChoreService corePoolSize: ", Integer.toString(getCorePoolSize()));
360 output.put("ChoreService scheduledChores: ", Integer.toString(getNumberOfScheduledChores()));
361 output.put("ChoreService missingStartTimeCount: ",
362 Integer.toString(getNumberOfChoresMissingStartTime()));
363
364 for (Entry<String, String> entry : output.entrySet()) {
365 if (LOG.isTraceEnabled()) LOG.trace(entry.getKey() + entry.getValue());
366 }
367 }
368 }