1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  package org.apache.hadoop.hbase.procedure;
19  
20  import java.io.IOException;
21  import java.util.Collection;
22  import java.util.HashSet;
23  import java.util.List;
24  import java.util.Set;
25  import java.util.concurrent.ConcurrentMap;
26  import java.util.concurrent.ExecutorService;
27  import java.util.concurrent.RejectedExecutionException;
28  import java.util.concurrent.SynchronousQueue;
29  import java.util.concurrent.ThreadPoolExecutor;
30  import java.util.concurrent.TimeUnit;
31  
32  import org.apache.commons.logging.Log;
33  import org.apache.commons.logging.LogFactory;
34  import org.apache.hadoop.hbase.classification.InterfaceAudience;
35  import org.apache.hadoop.hbase.DaemonThreadFactory;
36  import org.apache.hadoop.hbase.errorhandling.ForeignException;
37  import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
38  
39  import com.google.common.collect.MapMaker;
40  
41  
42  
43  
44  
45  
46  
47  @InterfaceAudience.Private
48  public class ProcedureCoordinator {
49    private static final Log LOG = LogFactory.getLog(ProcedureCoordinator.class);
50  
51    final static long KEEP_ALIVE_MILLIS_DEFAULT = 5000;
52    final static long TIMEOUT_MILLIS_DEFAULT = 60000;
53    final static long WAKE_MILLIS_DEFAULT = 500;
54  
55    private final ProcedureCoordinatorRpcs rpcs;
56    private final ExecutorService pool;
57    private final long wakeTimeMillis;
58    private final long timeoutMillis;
59  
60    
61    private final ConcurrentMap<String, Procedure> procedures =
62        new MapMaker().concurrencyLevel(4).weakValues().makeMap();
63  
64    
65  
66  
67  
68  
69  
70  
71  
72  
73    public ProcedureCoordinator(ProcedureCoordinatorRpcs rpcs, ThreadPoolExecutor pool) {
74      this(rpcs, pool, TIMEOUT_MILLIS_DEFAULT, WAKE_MILLIS_DEFAULT);
75    }
76  
77    
78  
79  
80  
81  
82  
83  
84  
85  
86  
87    public ProcedureCoordinator(ProcedureCoordinatorRpcs rpcs, ThreadPoolExecutor pool,
88        long timeoutMillis, long wakeTimeMillis) {
89      this.timeoutMillis = timeoutMillis;
90      this.wakeTimeMillis = wakeTimeMillis;
91      this.rpcs = rpcs;
92      this.pool = pool;
93      this.rpcs.start(this);
94    }
95  
96    
97  
98  
99  
100 
101 
102   public static ThreadPoolExecutor defaultPool(String coordName, int opThreads) {
103     return defaultPool(coordName, opThreads, KEEP_ALIVE_MILLIS_DEFAULT);
104   }
105 
106   
107 
108 
109 
110 
111 
112 
113   public static ThreadPoolExecutor defaultPool(String coordName, int opThreads,
114       long keepAliveMillis) {
115     return new ThreadPoolExecutor(1, opThreads, keepAliveMillis, TimeUnit.MILLISECONDS,
116         new SynchronousQueue<Runnable>(),
117         new DaemonThreadFactory("(" + coordName + ")-proc-coordinator-pool"));
118   }
119 
120   
121 
122 
123 
124   public void close() throws IOException {
125     
126     pool.shutdownNow();
127     rpcs.close();
128   }
129 
130   
131 
132 
133 
134 
135 
136 
137 
138 
139   boolean submitProcedure(Procedure proc) {
140     
141     if (proc == null) {
142       return false;
143     }
144     String procName = proc.getName();
145 
146     
147     Procedure oldProc = procedures.get(procName);
148     if (oldProc != null) {
149       
150       try {
151         if (!oldProc.isCompleted()) {
152           LOG.warn("Procedure " + procName + " currently running.  Rejecting new request");
153           return false;
154         } else {
155           LOG.debug("Procedure " + procName
156               + " was in running list but was completed.  Accepting new attempt.");
157           if (!procedures.remove(procName, oldProc)) {
158             LOG.warn("Procedure " + procName
159                 + " has been resubmitted by another thread. Rejecting this request.");
160             return false;
161           }
162         }
163       } catch (ForeignException e) {
164         LOG.debug("Procedure " + procName
165             + " was in running list but has exception.  Accepting new attempt.");
166         if (!procedures.remove(procName, oldProc)) {
167           LOG.warn("Procedure " + procName
168               + " has been resubmitted by another thread. Rejecting this request.");
169           return false;
170         }
171       }
172     }
173 
174     
175     try {
176       if (this.procedures.putIfAbsent(procName, proc) == null) {
177         this.pool.submit(proc);
178         return true;
179       } else {
180         LOG.error("Another thread has submitted procedure '" + procName + "'. Ignoring this attempt.");
181         return false;
182       }
183     } catch (RejectedExecutionException e) {
184       LOG.warn("Procedure " + procName + " rejected by execution pool.  Propagating error.", e);
185       
186       this.procedures.remove(procName, proc);
187       
188       proc.receive(new ForeignException(procName, e));
189     }
190     return false;
191   }
192 
193   
194 
195 
196 
197 
198 
199 
200   void rpcConnectionFailure(final String message, final IOException cause) {
201     Collection<Procedure> toNotify = procedures.values();
202 
203     for (Procedure proc : toNotify) {
204       if (proc == null) {
205         continue;
206       }
207       
208       proc.receive(new ForeignException(proc.getName(), cause));
209     }
210   }
211 
212   
213 
214 
215 
216 
217   public void abortProcedure(String procName, ForeignException reason) {
218     
219     Procedure proc = procedures.get(procName);
220     if (proc == null) {
221       return;
222     }
223     proc.receive(reason);
224   }
225 
226   
227 
228 
229 
230 
231 
232 
233   Procedure createProcedure(ForeignExceptionDispatcher fed, String procName, byte[] procArgs,
234       List<String> expectedMembers) {
235     
236     return new Procedure(this, fed, wakeTimeMillis, timeoutMillis,
237         procName, procArgs, expectedMembers);
238   }
239 
240   
241 
242 
243 
244 
245 
246 
247 
248 
249 
250 
251 
252 
253   public Procedure startProcedure(ForeignExceptionDispatcher fed, String procName, byte[] procArgs,
254       List<String> expectedMembers) {
255     Procedure proc = createProcedure(fed, procName, procArgs, expectedMembers);
256     if (!this.submitProcedure(proc)) {
257       LOG.error("Failed to submit procedure '" + procName + "'");
258       return null;
259     }
260     return proc;
261   }
262 
263   
264 
265 
266 
267 
268 
269   void memberAcquiredBarrier(String procName, final String member) {
270     Procedure proc = procedures.get(procName);
271     if (proc == null) {
272       LOG.warn("Member '"+ member +"' is trying to acquire an unknown procedure '"+ procName +"'");
273       return;
274     }
275 
276     proc.barrierAcquiredByMember(member);
277   }
278 
279   
280 
281 
282 
283 
284 
285 
286   void memberFinishedBarrier(String procName, final String member, byte[] dataFromMember) {
287     Procedure proc = procedures.get(procName);
288     if (proc == null) {
289       LOG.warn("Member '"+ member +"' is trying to release an unknown procedure '"+ procName +"'");
290       return;
291     }
292     proc.barrierReleasedByMember(member, dataFromMember);
293   }
294 
295   
296 
297 
298   ProcedureCoordinatorRpcs getRpcs() {
299     return rpcs;
300   }
301 
302   
303 
304 
305 
306 
307 
308   public Procedure getProcedure(String name) {
309     return procedures.get(name);
310   }
311 
312   
313 
314 
315   public Set<String> getProcedureNames() {
316     return new HashSet<String>(procedures.keySet());
317   }
318 }