1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.procedure;
19
20 import java.io.Closeable;
21 import java.io.IOException;
22 import java.util.concurrent.ConcurrentMap;
23 import java.util.concurrent.ExecutorService;
24 import java.util.concurrent.RejectedExecutionException;
25 import java.util.concurrent.SynchronousQueue;
26 import java.util.concurrent.ThreadPoolExecutor;
27 import java.util.concurrent.TimeUnit;
28
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.apache.hadoop.hbase.classification.InterfaceAudience;
32 import org.apache.hadoop.hbase.DaemonThreadFactory;
33 import org.apache.hadoop.hbase.errorhandling.ForeignException;
34
35 import com.google.common.collect.MapMaker;
36
37
38
39
40
41
42 @InterfaceAudience.Private
43 public class ProcedureMember implements Closeable {
44 private static final Log LOG = LogFactory.getLog(ProcedureMember.class);
45
46 final static long KEEP_ALIVE_MILLIS_DEFAULT = 5000;
47
48 private final SubprocedureFactory builder;
49 private final ProcedureMemberRpcs rpcs;
50
51 private final ConcurrentMap<String,Subprocedure> subprocs =
52 new MapMaker().concurrencyLevel(4).weakValues().makeMap();
53 private final ExecutorService pool;
54
55
56
57
58
59
60
61
62 public ProcedureMember(ProcedureMemberRpcs rpcs, ThreadPoolExecutor pool,
63 SubprocedureFactory factory) {
64 this.pool = pool;
65 this.rpcs = rpcs;
66 this.builder = factory;
67 }
68
69
70
71
72
73
74
75 public static ThreadPoolExecutor defaultPool(String memberName, int procThreads) {
76 return defaultPool(memberName, procThreads, KEEP_ALIVE_MILLIS_DEFAULT);
77 }
78
79
80
81
82
83
84
85
86 public static ThreadPoolExecutor defaultPool(String memberName, int procThreads,
87 long keepAliveMillis) {
88 return new ThreadPoolExecutor(1, procThreads, keepAliveMillis, TimeUnit.MILLISECONDS,
89 new SynchronousQueue<Runnable>(),
90 new DaemonThreadFactory("member: '" + memberName + "' subprocedure-pool"));
91 }
92
93
94
95
96
97
98 ProcedureMemberRpcs getRpcs() {
99 return rpcs;
100 }
101
102
103
104
105
106
107
108
109
110
111 public Subprocedure createSubprocedure(String opName, byte[] data) {
112 return builder.buildSubprocedure(opName, data);
113 }
114
115
116
117
118
119
120
121
122 public boolean submitSubprocedure(Subprocedure subproc) {
123
124 if (subproc == null) {
125 LOG.warn("Submitted null subprocedure, nothing to run here.");
126 return false;
127 }
128
129 String procName = subproc.getName();
130 if (procName == null || procName.length() == 0) {
131 LOG.error("Subproc name cannot be null or the empty string");
132 return false;
133 }
134
135
136 Subprocedure rsub = subprocs.get(procName);
137 if (rsub != null) {
138 if (!rsub.isComplete()) {
139 LOG.error("Subproc '" + procName + "' is already running. Bailing out");
140 return false;
141 }
142 LOG.warn("A completed old subproc " + procName + " is still present, removing");
143 if (!subprocs.remove(procName, rsub)) {
144 LOG.error("Another thread has replaced existing subproc '" + procName + "'. Bailing out");
145 return false;
146 }
147 }
148
149 LOG.debug("Submitting new Subprocedure:" + procName);
150
151
152 try {
153 if (subprocs.putIfAbsent(procName, subproc) == null) {
154 this.pool.submit(subproc);
155 return true;
156 } else {
157 LOG.error("Another thread has submitted subproc '" + procName + "'. Bailing out");
158 return false;
159 }
160 } catch (RejectedExecutionException e) {
161 subprocs.remove(procName, subproc);
162
163
164 String msg = "Subprocedure pool is full!";
165 subproc.cancel(msg, e.getCause());
166 }
167
168 LOG.error("Failed to start subprocedure '" + procName + "'");
169 return false;
170 }
171
172
173
174
175
176 public void receivedReachedGlobalBarrier(String procName) {
177 Subprocedure subproc = subprocs.get(procName);
178 if (subproc == null) {
179 LOG.warn("Unexpected reached globa barrier message for Sub-Procedure '" + procName + "'");
180 return;
181 }
182 if (LOG.isTraceEnabled()) {
183 LOG.trace("reached global barrier message for Sub-Procedure '" + procName + "'");
184 }
185 subproc.receiveReachedGlobalBarrier();
186 }
187
188
189
190
191 @Override
192 public void close() throws IOException {
193
194 pool.shutdownNow();
195 }
196
197
198
199
200
201
202
203 boolean closeAndWait(long timeoutMs) throws InterruptedException {
204 pool.shutdown();
205 return pool.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS);
206 }
207
208
209
210
211
212
213
214
215
216 public void controllerConnectionFailure(final String message, final Throwable cause,
217 final String procName) {
218 LOG.error(message, cause);
219 if (procName == null) {
220 return;
221 }
222 Subprocedure toNotify = subprocs.get(procName);
223 if (toNotify != null) {
224 toNotify.cancel(message, cause);
225 }
226 }
227
228
229
230
231
232
233 public void receiveAbortProcedure(String procName, ForeignException ee) {
234 LOG.debug("Request received to abort procedure " + procName, ee);
235
236 Subprocedure sub = subprocs.get(procName);
237 if (sub == null) {
238 LOG.info("Received abort on procedure with no local subprocedure " + procName +
239 ", ignoring it.", ee);
240 return;
241 }
242 String msg = "Propagating foreign exception to subprocedure " + sub.getName();
243 LOG.error(msg, ee);
244 sub.cancel(msg, ee);
245 }
246 }