View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.procedure;
19  
20  import java.io.IOException;
21  import java.util.Collection;
22  import java.util.HashSet;
23  import java.util.List;
24  import java.util.Set;
25  import java.util.concurrent.ConcurrentMap;
26  import java.util.concurrent.ExecutorService;
27  import java.util.concurrent.RejectedExecutionException;
28  import java.util.concurrent.SynchronousQueue;
29  import java.util.concurrent.ThreadPoolExecutor;
30  import java.util.concurrent.TimeUnit;
31  
32  import org.apache.commons.logging.Log;
33  import org.apache.commons.logging.LogFactory;
34  import org.apache.hadoop.hbase.classification.InterfaceAudience;
35  import org.apache.hadoop.hbase.DaemonThreadFactory;
36  import org.apache.hadoop.hbase.errorhandling.ForeignException;
37  import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
38  
39  import com.google.common.collect.MapMaker;
40  
41  /**
42   * This is the master side of a distributed complex procedure execution.
43   * <p>
44   * The {@link Procedure} is generic and subclassing or customization shouldn't be
45   * necessary -- any customization should happen just in {@link Subprocedure}s.
46   */
47  @InterfaceAudience.Private
48  public class ProcedureCoordinator {
49    private static final Log LOG = LogFactory.getLog(ProcedureCoordinator.class);
50  
51    final static long KEEP_ALIVE_MILLIS_DEFAULT = 5000;
52    final static long TIMEOUT_MILLIS_DEFAULT = 60000;
53    final static long WAKE_MILLIS_DEFAULT = 500;
54  
55    private final ProcedureCoordinatorRpcs rpcs;
56    private final ExecutorService pool;
57    private final long wakeTimeMillis;
58    private final long timeoutMillis;
59  
60    // Running procedure table.  Maps procedure name to running procedure reference
61    private final ConcurrentMap<String, Procedure> procedures =
62        new MapMaker().concurrencyLevel(4).weakValues().makeMap();
63  
64    /**
65     * Create and start a ProcedureCoordinator.
66     *
67     * The rpc object registers the ProcedureCoordinator and starts any threads in this
68     * constructor.
69     *
70     * @param rpcs
71     * @param pool Used for executing procedures.
72     */
73    public ProcedureCoordinator(ProcedureCoordinatorRpcs rpcs, ThreadPoolExecutor pool) {
74      this(rpcs, pool, TIMEOUT_MILLIS_DEFAULT, WAKE_MILLIS_DEFAULT);
75    }
76  
77    /**
78     * Create and start a ProcedureCoordinator.
79     *
80     * The rpc object registers the ProcedureCoordinator and starts any threads in
81     * this constructor.
82     *
83     * @param rpcs
84     * @param pool Used for executing procedures.
85     * @param timeoutMillis
86     */
87    public ProcedureCoordinator(ProcedureCoordinatorRpcs rpcs, ThreadPoolExecutor pool,
88        long timeoutMillis, long wakeTimeMillis) {
89      this.timeoutMillis = timeoutMillis;
90      this.wakeTimeMillis = wakeTimeMillis;
91      this.rpcs = rpcs;
92      this.pool = pool;
93      this.rpcs.start(this);
94    }
95  
96    /**
97     * Default thread pool for the procedure
98     *
99     * @param coordName
100    * @param opThreads the maximum number of threads to allow in the pool
101    */
102   public static ThreadPoolExecutor defaultPool(String coordName, int opThreads) {
103     return defaultPool(coordName, opThreads, KEEP_ALIVE_MILLIS_DEFAULT);
104   }
105 
106   /**
107    * Default thread pool for the procedure
108    *
109    * @param coordName
110    * @param opThreads the maximum number of threads to allow in the pool
111    * @param keepAliveMillis the maximum time (ms) that excess idle threads will wait for new tasks
112    */
113   public static ThreadPoolExecutor defaultPool(String coordName, int opThreads,
114       long keepAliveMillis) {
115     return new ThreadPoolExecutor(1, opThreads, keepAliveMillis, TimeUnit.MILLISECONDS,
116         new SynchronousQueue<Runnable>(),
117         new DaemonThreadFactory("(" + coordName + ")-proc-coordinator-pool"));
118   }
119 
120   /**
121    * Shutdown the thread pools and release rpc resources
122    * @throws IOException
123    */
124   public void close() throws IOException {
125     // have to use shutdown now to break any latch waiting
126     pool.shutdownNow();
127     rpcs.close();
128   }
129 
130   /**
131    * Submit an procedure to kick off its dependent subprocedures.
132    * @param proc Procedure to execute
133    * @return <tt>true</tt> if the procedure was started correctly, <tt>false</tt> if the
134    *         procedure or any subprocedures could not be started.  Failure could be due to
135    *         submitting a procedure multiple times (or one with the same name), or some sort
136    *         of IO problem.  On errors, the procedure's monitor holds a reference to the exception
137    *         that caused the failure.
138    */
139   boolean submitProcedure(Procedure proc) {
140     // if the submitted procedure was null, then we don't want to run it
141     if (proc == null) {
142       return false;
143     }
144     String procName = proc.getName();
145 
146     // make sure we aren't already running a procedure of that name
147     Procedure oldProc = procedures.get(procName);
148     if (oldProc != null) {
149       // procedures are always eventually completed on both successful and failed execution
150       try {
151         if (!oldProc.isCompleted()) {
152           LOG.warn("Procedure " + procName + " currently running.  Rejecting new request");
153           return false;
154         } else {
155           LOG.debug("Procedure " + procName
156               + " was in running list but was completed.  Accepting new attempt.");
157           if (!procedures.remove(procName, oldProc)) {
158             LOG.warn("Procedure " + procName
159                 + " has been resubmitted by another thread. Rejecting this request.");
160             return false;
161           }
162         }
163       } catch (ForeignException e) {
164         LOG.debug("Procedure " + procName
165             + " was in running list but has exception.  Accepting new attempt.");
166         if (!procedures.remove(procName, oldProc)) {
167           LOG.warn("Procedure " + procName
168               + " has been resubmitted by another thread. Rejecting this request.");
169           return false;
170         }
171       }
172     }
173 
174     // kick off the procedure's execution in a separate thread
175     try {
176       if (this.procedures.putIfAbsent(procName, proc) == null) {
177         LOG.debug("Submitting procedure " + procName);
178         this.pool.submit(proc);
179         return true;
180       } else {
181         LOG.error("Another thread has submitted procedure '" + procName + "'. Ignoring this attempt.");
182         return false;
183       }
184     } catch (RejectedExecutionException e) {
185       LOG.warn("Procedure " + procName + " rejected by execution pool.  Propagating error.", e);
186       // Remove the procedure from the list since is not started
187       this.procedures.remove(procName, proc);
188       // the thread pool is full and we can't run the procedure
189       proc.receive(new ForeignException(procName, e));
190     }
191     return false;
192   }
193 
194   /**
195    * The connection to the rest of the procedure group (members and coordinator) has been
196    * broken/lost/failed. This should fail any interested procedures, but not attempt to notify other
197    * members since we cannot reach them anymore.
198    * @param message description of the error
199    * @param cause the actual cause of the failure
200    */
201   void rpcConnectionFailure(final String message, final IOException cause) {
202     Collection<Procedure> toNotify = procedures.values();
203 
204     boolean isTraceEnabled = LOG.isTraceEnabled();
205     LOG.debug("received connection failure: " + message, cause);
206     for (Procedure proc : toNotify) {
207       if (proc == null) {
208         continue;
209       }
210       // notify the elements, if they aren't null
211       if (isTraceEnabled) {
212         LOG.trace("connection failure - notify procedure: " + proc.getName());
213       }
214       proc.receive(new ForeignException(proc.getName(), cause));
215     }
216   }
217 
218   /**
219    * Abort the procedure with the given name
220    * @param procName name of the procedure to abort
221    * @param reason serialized information about the abort
222    */
223   public void abortProcedure(String procName, ForeignException reason) {
224     LOG.debug("abort procedure " + procName, reason);
225     // if we know about the Procedure, notify it
226     Procedure proc = procedures.get(procName);
227     if (proc == null) {
228       return;
229     }
230     proc.receive(reason);
231   }
232 
233   /**
234    * Exposed for hooking with unit tests.
235    * @param procName
236    * @param procArgs
237    * @param expectedMembers
238    * @return the newly created procedure
239    */
240   Procedure createProcedure(ForeignExceptionDispatcher fed, String procName, byte[] procArgs,
241       List<String> expectedMembers) {
242     // build the procedure
243     return new Procedure(this, fed, wakeTimeMillis, timeoutMillis,
244         procName, procArgs, expectedMembers);
245   }
246 
247   /**
248    * Kick off the named procedure
249    * Currently only one procedure with the same type and name is allowed to run at a time.
250    * @param procName name of the procedure to start
251    * @param procArgs arguments for the procedure
252    * @param expectedMembers expected members to start
253    * @return handle to the running procedure, if it was started correctly,
254    *         <tt>null</tt> otherwise.
255    *         Null could be due to submitting a procedure multiple times
256    *         (or one with the same name), or runtime exception.
257    *         Check the procedure's monitor that holds a reference to the exception
258    *         that caused the failure.
259    */
260   public Procedure startProcedure(ForeignExceptionDispatcher fed, String procName, byte[] procArgs,
261       List<String> expectedMembers) {
262     Procedure proc = createProcedure(fed, procName, procArgs, expectedMembers);
263     if (!this.submitProcedure(proc)) {
264       LOG.error("Failed to submit procedure '" + procName + "'");
265       return null;
266     }
267     return proc;
268   }
269 
270   /**
271    * Notification that the procedure had the specified member acquired its part of the barrier
272    * via {@link Subprocedure#acquireBarrier()}.
273    * @param procName name of the procedure that acquired
274    * @param member name of the member that acquired
275    */
276   void memberAcquiredBarrier(String procName, final String member) {
277     Procedure proc = procedures.get(procName);
278     if (proc == null) {
279       LOG.warn("Member '"+ member +"' is trying to acquire an unknown procedure '"+ procName +"'");
280       return;
281     }
282     if (LOG.isTraceEnabled()) {
283       LOG.trace("Member '"+ member +"' acquired procedure '"+ procName +"'");
284     }
285     proc.barrierAcquiredByMember(member);
286   }
287 
288   /**
289    * Notification that the procedure had another member finished executing its in-barrier subproc
290    * via {@link Subprocedure#insideBarrier()}.
291    * @param procName name of the subprocedure that finished
292    * @param member name of the member that executed and released its barrier
293    * @param dataFromMember the data that the member returned along with the notification
294    */
295   void memberFinishedBarrier(String procName, final String member, byte[] dataFromMember) {
296     Procedure proc = procedures.get(procName);
297     if (proc == null) {
298       LOG.warn("Member '"+ member +"' is trying to release an unknown procedure '"+ procName +"'");
299       return;
300     }
301     if (LOG.isTraceEnabled()) {
302       LOG.trace("Member '"+ member +"' released procedure '"+ procName +"'");
303     }
304     proc.barrierReleasedByMember(member, dataFromMember);
305   }
306 
307   /**
308    * @return the rpcs implementation for all current procedures
309    */
310   ProcedureCoordinatorRpcs getRpcs() {
311     return rpcs;
312   }
313 
314   /**
315    * Returns the procedure.  This Procedure is a live instance so should not be modified but can
316    * be inspected.
317    * @param name Name of the procedure
318    * @return Procedure or null if not present any more
319    */
320   public Procedure getProcedure(String name) {
321     return procedures.get(name);
322   }
323 
324   /**
325    * @return Return set of all procedure names.
326    */
327   public Set<String> getProcedureNames() {
328     return new HashSet<String>(procedures.keySet());
329   }
330 }