View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.procedure;
19  
20  import java.io.IOException;
21  import java.util.Collection;
22  import java.util.HashSet;
23  import java.util.List;
24  import java.util.Set;
25  import java.util.concurrent.ConcurrentMap;
26  import java.util.concurrent.ExecutorService;
27  import java.util.concurrent.Future;
28  import java.util.concurrent.RejectedExecutionException;
29  import java.util.concurrent.SynchronousQueue;
30  import java.util.concurrent.ThreadPoolExecutor;
31  import java.util.concurrent.TimeUnit;
32  
33  import org.apache.commons.logging.Log;
34  import org.apache.commons.logging.LogFactory;
35  import org.apache.hadoop.classification.InterfaceAudience;
36  import org.apache.hadoop.hbase.DaemonThreadFactory;
37  import org.apache.hadoop.hbase.errorhandling.ForeignException;
38  import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
39  
40  import com.google.common.collect.MapMaker;
41  
42  /**
43   * This is the master side of a distributed complex procedure execution.
44   * <p>
45   * The {@link Procedure} is generic and subclassing or customization shouldn't be
46   * necessary -- any customization should happen just in {@link Subprocedure}s.
47   */
48  @InterfaceAudience.Private
49  public class ProcedureCoordinator {
50    private static final Log LOG = LogFactory.getLog(ProcedureCoordinator.class);
51  
52    final static long KEEP_ALIVE_MILLIS_DEFAULT = 5000;
53    final static long TIMEOUT_MILLIS_DEFAULT = 60000;
54    final static long WAKE_MILLIS_DEFAULT = 500;
55  
56    private final ProcedureCoordinatorRpcs rpcs;
57    private final ExecutorService pool;
58    private final long wakeTimeMillis;
59    private final long timeoutMillis;
60  
61    // Running procedure table.  Maps procedure name to running procedure reference
62    private final ConcurrentMap<String, Procedure> procedures =
63        new MapMaker().concurrencyLevel(4).weakValues().makeMap();
64  
65    /**
66     * Create and start a ProcedureCoordinator.
67     *
68     * The rpc object registers the ProcedureCoordinator and starts any threads in this
69     * constructor.
70     *
71     * @param rpcs
72     * @param pool Used for executing procedures.
73     */
74    public ProcedureCoordinator(ProcedureCoordinatorRpcs rpcs, ThreadPoolExecutor pool) {
75      this(rpcs, pool, TIMEOUT_MILLIS_DEFAULT, WAKE_MILLIS_DEFAULT);
76    }
77  
78    /**
79     * Create and start a ProcedureCoordinator.
80     *
81     * The rpc object registers the ProcedureCoordinator and starts any threads in
82     * this constructor.
83     *
84     * @param rpcs
85     * @param pool Used for executing procedures.
86     * @param timeoutMillis
87     */
88    public ProcedureCoordinator(ProcedureCoordinatorRpcs rpcs, ThreadPoolExecutor pool,
89        long timeoutMillis, long wakeTimeMillis) {
90      this.timeoutMillis = timeoutMillis;
91      this.wakeTimeMillis = wakeTimeMillis;
92      this.rpcs = rpcs;
93      this.pool = pool;
94      this.rpcs.start(this);
95    }
96  
97    /**
98     * Default thread pool for the procedure
99     *
100    * @param coordName
101    * @param opThreads the maximum number of threads to allow in the pool
102    */
103   public static ThreadPoolExecutor defaultPool(String coordName, int opThreads) {
104     return defaultPool(coordName, opThreads, KEEP_ALIVE_MILLIS_DEFAULT);
105   }
106 
107   /**
108    * Default thread pool for the procedure
109    *
110    * @param coordName
111    * @param opThreads the maximum number of threads to allow in the pool
112    * @param keepAliveMillis the maximum time (ms) that excess idle threads will wait for new tasks
113    */
114   public static ThreadPoolExecutor defaultPool(String coordName, int opThreads,
115       long keepAliveMillis) {
116     return new ThreadPoolExecutor(1, opThreads, keepAliveMillis, TimeUnit.MILLISECONDS,
117         new SynchronousQueue<Runnable>(),
118         new DaemonThreadFactory("(" + coordName + ")-proc-coordinator-pool"));
119   }
120 
121   /**
122    * Shutdown the thread pools and release rpc resources
123    * @throws IOException
124    */
125   public void close() throws IOException {
126     // have to use shutdown now to break any latch waiting
127     pool.shutdownNow();
128     rpcs.close();
129   }
130 
131   /**
132    * Submit an procedure to kick off its dependent subprocedures.
133    * @param proc Procedure to execute
134    * @return <tt>true</tt> if the procedure was started correctly, <tt>false</tt> if the
135    *         procedure or any subprocedures could not be started.  Failure could be due to
136    *         submitting a procedure multiple times (or one with the same name), or some sort
137    *         of IO problem.  On errors, the procedure's monitor holds a reference to the exception
138    *         that caused the failure.
139    */
140   boolean submitProcedure(Procedure proc) {
141     // if the submitted procedure was null, then we don't want to run it
142     if (proc == null) {
143       return false;
144     }
145     String procName = proc.getName();
146 
147     // make sure we aren't already running a procedure of that name
148     synchronized (procedures) {
149       Procedure oldProc = procedures.get(procName);
150       if (oldProc != null) {
151         // procedures are always eventually completed on both successful and failed execution
152         try {
153           if (!oldProc.isCompleted()) {
154             LOG.warn("Procedure " + procName + " currently running.  Rejecting new request");
155             return false;
156           }
157           else {
158             LOG.debug("Procedure " + procName
159               + " was in running list but was completed.  Accepting new attempt.");
160             procedures.remove(procName);
161           }
162         } catch (ForeignException e) {
163           LOG.debug("Procedure " + procName
164             + " was in running list but has exception.  Accepting new attempt.");
165           procedures.remove(procName);
166         }
167       }
168     }
169 
170     // kick off the procedure's execution in a separate thread
171     Future<Void> f = null;
172     try {
173       synchronized (procedures) {
174         this.procedures.put(procName, proc);
175         f = this.pool.submit(proc);
176       }
177       return true;
178     } catch (RejectedExecutionException e) {
179       LOG.warn("Procedure " + procName + " rejected by execution pool.  Propagating error and " +
180           "cancelling operation.", e);
181       // Remove the procedure from the list since is not started
182       this.procedures.remove(procName);
183       // the thread pool is full and we can't run the procedure
184       proc.receive(new ForeignException(procName, e));
185 
186       // cancel procedure proactively
187       if (f != null) {
188         f.cancel(true);
189       }
190     }
191     return false;
192   }
193 
194   /**
195    * The connection to the rest of the procedure group (members and coordinator) has been
196    * broken/lost/failed. This should fail any interested procedures, but not attempt to notify other
197    * members since we cannot reach them anymore.
198    * @param message description of the error
199    * @param cause the actual cause of the failure
200    */
201   void rpcConnectionFailure(final String message, final IOException cause) {
202     Collection<Procedure> toNotify = procedures.values();
203 
204     for (Procedure proc : toNotify) {
205       if (proc == null) {
206         continue;
207       }
208       // notify the elements, if they aren't null
209       proc.receive(new ForeignException(proc.getName(), cause));
210     }
211   }
212 
213   /**
214    * Abort the procedure with the given name
215    * @param procName name of the procedure to abort
216    * @param reason serialized information about the abort
217    */
218   public void abortProcedure(String procName, ForeignException reason) {
219     // if we know about the Procedure, notify it
220     synchronized(procedures) {
221       Procedure proc = procedures.get(procName);
222       if (proc == null) {
223         return;
224       }
225       proc.receive(reason);
226     }
227   }
228 
229   /**
230    * Exposed for hooking with unit tests.
231    * @param procName
232    * @param procArgs
233    * @param expectedMembers
234    * @return the newly created procedure
235    */
236   Procedure createProcedure(ForeignExceptionDispatcher fed, String procName, byte[] procArgs,
237       List<String> expectedMembers) {
238     // build the procedure
239     return new Procedure(this, fed, wakeTimeMillis, timeoutMillis,
240         procName, procArgs, expectedMembers);
241   }
242 
243   /**
244    * Kick off the named procedure
245    * Currently only one procedure with the same type and name is allowed to run at a time.
246    * @param procName name of the procedure to start
247    * @param procArgs arguments for the procedure
248    * @param expectedMembers expected members to start
249    * @return handle to the running procedure, if it was started correctly,
250    *         <tt>null</tt> otherwise.
251    *         Null could be due to submitting a procedure multiple times
252    *         (or one with the same name), or runtime exception.
253    *         Check the procedure's monitor that holds a reference to the exception
254    *         that caused the failure.
255    */
256   public Procedure startProcedure(ForeignExceptionDispatcher fed, String procName, byte[] procArgs,
257       List<String> expectedMembers) {
258     Procedure proc = createProcedure(fed, procName, procArgs, expectedMembers);
259     if (!this.submitProcedure(proc)) {
260       LOG.error("Failed to submit procedure '" + procName + "'");
261       return null;
262     }
263     return proc;
264   }
265 
266   /**
267    * Notification that the procedure had the specified member acquired its part of the barrier
268    * via {@link Subprocedure#acquireBarrier()}.
269    * @param procName name of the procedure that acquired
270    * @param member name of the member that acquired
271    */
272   void memberAcquiredBarrier(String procName, final String member) {
273     Procedure proc = procedures.get(procName);
274     if (proc == null) {
275       LOG.warn("Member '"+ member +"' is trying to acquire an unknown procedure '"+ procName +"'");
276       return;
277     }
278 
279     proc.barrierAcquiredByMember(member);
280   }
281 
282   /**
283    * Notification that the procedure had another member finished executing its in-barrier subproc
284    * via {@link Subprocedure#insideBarrier()}.
285    * @param procName name of the subprocedure that finished
286    * @param member name of the member that executed and released its barrier
287    * @param dataFromMember the data that the member returned along with the notification
288    */
289   void memberFinishedBarrier(String procName, final String member, byte[] dataFromMember) {
290     Procedure proc = procedures.get(procName);
291     if (proc == null) {
292       LOG.warn("Member '"+ member +"' is trying to release an unknown procedure '"+ procName +"'");
293       return;
294     }
295     proc.barrierReleasedByMember(member, dataFromMember);
296   }
297 
298   /**
299    * @return the rpcs implementation for all current procedures
300    */
301   ProcedureCoordinatorRpcs getRpcs() {
302     return rpcs;
303   }
304 
305   /**
306    * Returns the procedure.  This Procedure is a live instance so should not be modified but can
307    * be inspected.
308    * @param name Name of the procedure
309    * @return Procedure or null if not present any more
310    */
311   public Procedure getProcedure(String name) {
312     return procedures.get(name);
313   }
314 
315   /**
316    * @return Return set of all procedure names.
317    */
318   public Set<String> getProcedureNames() {
319     return new HashSet<String>(procedures.keySet());
320   }
321 }