1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.procedure;
19
20 import java.io.IOException;
21 import java.util.Collection;
22 import java.util.HashSet;
23 import java.util.List;
24 import java.util.Set;
25 import java.util.concurrent.ConcurrentMap;
26 import java.util.concurrent.ExecutorService;
27 import java.util.concurrent.RejectedExecutionException;
28 import java.util.concurrent.SynchronousQueue;
29 import java.util.concurrent.ThreadPoolExecutor;
30 import java.util.concurrent.TimeUnit;
31
32 import org.apache.commons.logging.Log;
33 import org.apache.commons.logging.LogFactory;
34 import org.apache.hadoop.hbase.classification.InterfaceAudience;
35 import org.apache.hadoop.hbase.DaemonThreadFactory;
36 import org.apache.hadoop.hbase.errorhandling.ForeignException;
37 import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
38
39 import com.google.common.collect.MapMaker;
40
41
42
43
44
45
46
47 @InterfaceAudience.Private
48 public class ProcedureCoordinator {
49 private static final Log LOG = LogFactory.getLog(ProcedureCoordinator.class);
50
51 final static long KEEP_ALIVE_MILLIS_DEFAULT = 5000;
52 final static long TIMEOUT_MILLIS_DEFAULT = 60000;
53 final static long WAKE_MILLIS_DEFAULT = 500;
54
55 private final ProcedureCoordinatorRpcs rpcs;
56 private final ExecutorService pool;
57 private final long wakeTimeMillis;
58 private final long timeoutMillis;
59
60
61 private final ConcurrentMap<String, Procedure> procedures =
62 new MapMaker().concurrencyLevel(4).weakValues().makeMap();
63
64
65
66
67
68
69
70
71
72
73 public ProcedureCoordinator(ProcedureCoordinatorRpcs rpcs, ThreadPoolExecutor pool) {
74 this(rpcs, pool, TIMEOUT_MILLIS_DEFAULT, WAKE_MILLIS_DEFAULT);
75 }
76
77
78
79
80
81
82
83
84
85
86
87 public ProcedureCoordinator(ProcedureCoordinatorRpcs rpcs, ThreadPoolExecutor pool,
88 long timeoutMillis, long wakeTimeMillis) {
89 this.timeoutMillis = timeoutMillis;
90 this.wakeTimeMillis = wakeTimeMillis;
91 this.rpcs = rpcs;
92 this.pool = pool;
93 this.rpcs.start(this);
94 }
95
96
97
98
99
100
101
102 public static ThreadPoolExecutor defaultPool(String coordName, int opThreads) {
103 return defaultPool(coordName, opThreads, KEEP_ALIVE_MILLIS_DEFAULT);
104 }
105
106
107
108
109
110
111
112
113 public static ThreadPoolExecutor defaultPool(String coordName, int opThreads,
114 long keepAliveMillis) {
115 return new ThreadPoolExecutor(1, opThreads, keepAliveMillis, TimeUnit.MILLISECONDS,
116 new SynchronousQueue<Runnable>(),
117 new DaemonThreadFactory("(" + coordName + ")-proc-coordinator-pool"));
118 }
119
120
121
122
123
124 public void close() throws IOException {
125
126 pool.shutdownNow();
127 rpcs.close();
128 }
129
130
131
132
133
134
135
136
137
138
139 boolean submitProcedure(Procedure proc) {
140
141 if (proc == null) {
142 return false;
143 }
144 String procName = proc.getName();
145
146
147 Procedure oldProc = procedures.get(procName);
148 if (oldProc != null) {
149
150 try {
151 if (!oldProc.isCompleted()) {
152 LOG.warn("Procedure " + procName + " currently running. Rejecting new request");
153 return false;
154 } else {
155 LOG.debug("Procedure " + procName
156 + " was in running list but was completed. Accepting new attempt.");
157 if (!procedures.remove(procName, oldProc)) {
158 LOG.warn("Procedure " + procName
159 + " has been resubmitted by another thread. Rejecting this request.");
160 return false;
161 }
162 }
163 } catch (ForeignException e) {
164 LOG.debug("Procedure " + procName
165 + " was in running list but has exception. Accepting new attempt.");
166 if (!procedures.remove(procName, oldProc)) {
167 LOG.warn("Procedure " + procName
168 + " has been resubmitted by another thread. Rejecting this request.");
169 return false;
170 }
171 }
172 }
173
174
175 try {
176 if (this.procedures.putIfAbsent(procName, proc) == null) {
177 LOG.debug("Submitting procedure " + procName);
178 this.pool.submit(proc);
179 return true;
180 } else {
181 LOG.error("Another thread has submitted procedure '" + procName + "'. Ignoring this attempt.");
182 return false;
183 }
184 } catch (RejectedExecutionException e) {
185 LOG.warn("Procedure " + procName + " rejected by execution pool. Propagating error.", e);
186
187 this.procedures.remove(procName, proc);
188
189 proc.receive(new ForeignException(procName, e));
190 }
191 return false;
192 }
193
194
195
196
197
198
199
200
201 void rpcConnectionFailure(final String message, final IOException cause) {
202 Collection<Procedure> toNotify = procedures.values();
203
204 boolean isTraceEnabled = LOG.isTraceEnabled();
205 LOG.debug("received connection failure: " + message, cause);
206 for (Procedure proc : toNotify) {
207 if (proc == null) {
208 continue;
209 }
210
211 if (isTraceEnabled) {
212 LOG.trace("connection failure - notify procedure: " + proc.getName());
213 }
214 proc.receive(new ForeignException(proc.getName(), cause));
215 }
216 }
217
218
219
220
221
222
223 public void abortProcedure(String procName, ForeignException reason) {
224 LOG.debug("abort procedure " + procName, reason);
225
226 Procedure proc = procedures.get(procName);
227 if (proc == null) {
228 return;
229 }
230 proc.receive(reason);
231 }
232
233
234
235
236
237
238
239
240 Procedure createProcedure(ForeignExceptionDispatcher fed, String procName, byte[] procArgs,
241 List<String> expectedMembers) {
242
243 return new Procedure(this, fed, wakeTimeMillis, timeoutMillis,
244 procName, procArgs, expectedMembers);
245 }
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260 public Procedure startProcedure(ForeignExceptionDispatcher fed, String procName, byte[] procArgs,
261 List<String> expectedMembers) {
262 Procedure proc = createProcedure(fed, procName, procArgs, expectedMembers);
263 if (!this.submitProcedure(proc)) {
264 LOG.error("Failed to submit procedure '" + procName + "'");
265 return null;
266 }
267 return proc;
268 }
269
270
271
272
273
274
275
276 void memberAcquiredBarrier(String procName, final String member) {
277 Procedure proc = procedures.get(procName);
278 if (proc == null) {
279 LOG.warn("Member '"+ member +"' is trying to acquire an unknown procedure '"+ procName +"'");
280 return;
281 }
282 if (LOG.isTraceEnabled()) {
283 LOG.trace("Member '"+ member +"' acquired procedure '"+ procName +"'");
284 }
285 proc.barrierAcquiredByMember(member);
286 }
287
288
289
290
291
292
293
294
295 void memberFinishedBarrier(String procName, final String member, byte[] dataFromMember) {
296 Procedure proc = procedures.get(procName);
297 if (proc == null) {
298 LOG.warn("Member '"+ member +"' is trying to release an unknown procedure '"+ procName +"'");
299 return;
300 }
301 if (LOG.isTraceEnabled()) {
302 LOG.trace("Member '"+ member +"' released procedure '"+ procName +"'");
303 }
304 proc.barrierReleasedByMember(member, dataFromMember);
305 }
306
307
308
309
310 ProcedureCoordinatorRpcs getRpcs() {
311 return rpcs;
312 }
313
314
315
316
317
318
319
320 public Procedure getProcedure(String name) {
321 return procedures.get(name);
322 }
323
324
325
326
327 public Set<String> getProcedureNames() {
328 return new HashSet<String>(procedures.keySet());
329 }
330 }