View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.procedure;
19  
20  import java.io.IOException;
21  import java.util.ArrayList;
22  import java.util.List;
23  import java.util.concurrent.Callable;
24  import java.util.concurrent.CountDownLatch;
25  import java.util.concurrent.TimeUnit;
26  
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.classification.InterfaceAudience;
30  import org.apache.hadoop.hbase.errorhandling.ForeignException;
31  import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
32  import org.apache.hadoop.hbase.errorhandling.ForeignExceptionListener;
33  import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare;
34  import org.apache.hadoop.hbase.errorhandling.TimeoutExceptionInjector;
35  
36  import com.google.common.collect.Lists;
37  
38  /**
39   * A globally-barriered distributed procedure.  This class encapsulates state and methods for
40   * tracking and managing a distributed procedure, as well as aborting if any member encounters
41   * a problem or if a cancellation is requested.
42   * <p>
43   * All procedures first attempt to reach a barrier point with the {@link #sendGlobalBarrierStart()}
44   * method.  The procedure contacts all members and waits for all subprocedures to execute
45   * {@link Subprocedure#acquireBarrier} to acquire its local piece of the global barrier and then
46   * send acquisition info back to the coordinator.  If all acquisitions at subprocedures succeed,
47   * the coordinator then will call {@link #sendGlobalBarrierReached()}.  This notifies members to
48   * execute the {@link Subprocedure#insideBarrier()} method.  The procedure is blocked until all
49   * {@link Subprocedure#insideBarrier} executions complete at the members.  When
50   * {@link Subprocedure#insideBarrier} completes at each member, the member sends notification to
51   * the coordinator.  Once all members complete, the coordinator calls
52   * {@link #sendGlobalBarrierComplete()}.
53   * <p>
54   * If errors are encountered remotely, they are forwarded to the coordinator, and
55   * {@link Subprocedure#cleanup(Exception)} is called.
56   * <p>
57   * Each Procedure and each Subprocedure enforces a time limit on the execution time. If the time
58   * limit expires before the procedure completes the {@link TimeoutExceptionInjector} will trigger
59   * an {@link ForeignException} to abort the procedure.  This is particularly useful for situations
60   * when running a distributed {@link Subprocedure} so participants can avoid blocking for extreme
61   * amounts of time if one of the participants fails or takes a really long time (e.g. GC pause).
62   * <p>
63   * Users should generally not directly create or subclass instances of this.  They are created
64   * for them implicitly via {@link ProcedureCoordinator#startProcedure(ForeignExceptionDispatcher,
65   * String, byte[], List)}}
66   */
67  @InterfaceAudience.Private
68  public class Procedure implements Callable<Void>, ForeignExceptionListener {
69    private static final Log LOG = LogFactory.getLog(Procedure.class);
70  
71    //
72    // Arguments and naming
73    //
74  
75    // Name of the procedure
76    final private String procName;
77    // Arguments for this procedure execution
78    final private byte[] args;
79  
80    //
81    // Execution State
82    //
83    /** latch for waiting until all members have acquire in barrier state */
84    final CountDownLatch acquiredBarrierLatch;
85    /** latch for waiting until all members have executed and released their in barrier state */
86    final CountDownLatch releasedBarrierLatch;
87    /** latch for waiting until a procedure has completed */
88    final CountDownLatch completedLatch;
89    /** monitor to check for errors */
90    private final ForeignExceptionDispatcher monitor;
91  
92    //
93    // Execution Timeout Handling.
94    //
95  
96    /** frequency to check for errors (ms) */
97    protected final long wakeFrequency;
98    protected final TimeoutExceptionInjector timeoutInjector;
99  
100   //
101   // Members' and Coordinator's state
102   //
103 
104   /** lock to prevent nodes from acquiring and then releasing before we can track them */
105   private Object joinBarrierLock = new Object();
106   private final List<String> acquiringMembers;
107   private final List<String> inBarrierMembers;
108   private ProcedureCoordinator coord;
109 
110   /**
111    * Creates a procedure. (FOR TESTING)
112    *
113    * {@link Procedure} state to be run by a {@link ProcedureCoordinator}.
114    * @param coord coordinator to call back to for general errors (e.g.
115    *          {@link ProcedureCoordinator#rpcConnectionFailure(String, IOException)}).
116    * @param monitor error monitor to check for external errors
117    * @param wakeFreq frequency to check for errors while waiting
118    * @param timeout amount of time to allow the procedure to run before cancelling
119    * @param procName name of the procedure instance
120    * @param args argument data associated with the procedure instance
121    * @param expectedMembers names of the expected members
122    */
123   public Procedure(ProcedureCoordinator coord, ForeignExceptionDispatcher monitor, long wakeFreq,
124       long timeout, String procName, byte[] args, List<String> expectedMembers) {
125     this.coord = coord;
126     this.acquiringMembers = new ArrayList<String>(expectedMembers);
127     this.inBarrierMembers = new ArrayList<String>(acquiringMembers.size());
128     this.procName = procName;
129     this.args = args;
130     this.monitor = monitor;
131     this.wakeFrequency = wakeFreq;
132 
133     int count = expectedMembers.size();
134     this.acquiredBarrierLatch = new CountDownLatch(count);
135     this.releasedBarrierLatch = new CountDownLatch(count);
136     this.completedLatch = new CountDownLatch(1);
137     this.timeoutInjector = new TimeoutExceptionInjector(monitor, timeout);
138   }
139 
140   /**
141    * Create a procedure.
142    *
143    * Users should generally not directly create instances of this.  They are created them
144    * implicitly via {@link ProcedureCoordinator#createProcedure(ForeignExceptionDispatcher,
145    * String, byte[], List)}}
146    *
147    * @param coord coordinator to call back to for general errors (e.g.
148    *          {@link ProcedureCoordinator#rpcConnectionFailure(String, IOException)}).
149    * @param wakeFreq frequency to check for errors while waiting
150    * @param timeout amount of time to allow the procedure to run before cancelling
151    * @param procName name of the procedure instance
152    * @param args argument data associated with the procedure instance
153    * @param expectedMembers names of the expected members
154    */
155   public Procedure(ProcedureCoordinator coord, long wakeFreq, long timeout,
156       String procName, byte[] args, List<String> expectedMembers) {
157     this(coord, new ForeignExceptionDispatcher(), wakeFreq, timeout, procName, args,
158         expectedMembers);
159   }
160 
161   public String getName() {
162     return procName;
163   }
164 
165   /**
166    * @return String of the procedure members both trying to enter the barrier and already in barrier
167    */
168   public String getStatus() {
169     String waiting, done;
170     synchronized (joinBarrierLock) {
171       waiting = acquiringMembers.toString();
172       done = inBarrierMembers.toString();
173     }
174     return "Procedure " + procName + " { waiting=" + waiting + " done="+ done + " }";
175   }
176 
177   /**
178    * Get the ForeignExceptionDispatcher
179    * @return the Procedure's monitor.
180    */
181   public ForeignExceptionDispatcher getErrorMonitor() {
182     return monitor;
183   }
184 
185   /**
186    * This call is the main execution thread of the barriered procedure.  It sends messages and
187    * essentially blocks until all procedure members acquire or later complete but periodically
188    * checks for foreign exceptions.
189    */
190   @Override
191   @SuppressWarnings("finally")
192   final public Void call() {
193     LOG.info("Starting procedure '" + procName + "'");
194     // start the timer
195     timeoutInjector.start();
196 
197     // run the procedure
198     try {
199       // start by checking for error first
200       monitor.rethrowException();
201       LOG.debug("Procedure '" + procName + "' starting 'acquire'");
202       sendGlobalBarrierStart();
203 
204       // wait for all the members to report acquisition
205       LOG.debug("Waiting for all members to 'acquire'");
206       waitForLatch(acquiredBarrierLatch, monitor, wakeFrequency, "acquired");
207       monitor.rethrowException();
208 
209       LOG.debug("Procedure '" + procName + "' starting 'in-barrier' execution.");
210       sendGlobalBarrierReached();
211 
212       // wait for all members to report barrier release
213       waitForLatch(releasedBarrierLatch, monitor, wakeFrequency, "released");
214 
215       // make sure we didn't get an error during in barrier execution and release
216       monitor.rethrowException();
217       LOG.info("Procedure '" + procName + "' execution completed");
218     } catch (Exception e) {
219       if (e instanceof InterruptedException) {
220         Thread.currentThread().interrupt();
221       }
222       String msg = "Procedure '" + procName +"' execution failed!";
223       LOG.error(msg, e);
224       receive(new ForeignException(getName(), e));
225     } finally {
226       LOG.debug("Running finish phase.");
227       sendGlobalBarrierComplete();
228       completedLatch.countDown();
229 
230       // tell the timer we are done, if we get here successfully
231       timeoutInjector.complete();
232       return null;
233     }
234   }
235 
236   /**
237    * Sends a message to Members to create a new {@link Subprocedure} for this Procedure and execute
238    * the {@link Subprocedure#acquireBarrier} step.
239    * @throws ForeignException
240    */
241   public void sendGlobalBarrierStart() throws ForeignException {
242     // start the procedure
243     LOG.debug("Starting procedure '" + procName + "', kicking off acquire phase on members.");
244     try {
245       // send procedure barrier start to specified list of members. cloning the list to avoid
246       // concurrent modification from the controller setting the prepared nodes
247       coord.getRpcs().sendGlobalBarrierAcquire(this, args, Lists.newArrayList(this.acquiringMembers));
248     } catch (IOException e) {
249       coord.rpcConnectionFailure("Can't reach controller.", e);
250     } catch (IllegalArgumentException e) {
251       throw new ForeignException(getName(), e);
252     }
253   }
254 
255   /**
256    * Sends a message to all members that the global barrier condition has been satisfied.  This
257    * should only be executed after all members have completed its
258    * {@link Subprocedure#acquireBarrier()} call successfully.  This triggers the member
259    * {@link Subprocedure#insideBarrier} method.
260    * @throws ForeignException
261    */
262   public void sendGlobalBarrierReached() throws ForeignException {
263     try {
264       // trigger to have member run {@link Subprocedure#insideBarrier}
265       coord.getRpcs().sendGlobalBarrierReached(this, Lists.newArrayList(inBarrierMembers));
266     } catch (IOException e) {
267       coord.rpcConnectionFailure("Can't reach controller.", e);
268     }
269   }
270 
271   /**
272    * Sends a message to members that all {@link Subprocedure#insideBarrier} calls have completed.
273    * After this executes, the coordinator can assume that any state resources about this barrier
274    * procedure state has been released.
275    */
276   public void sendGlobalBarrierComplete() {
277     LOG.debug("Finished coordinator procedure - removing self from list of running procedures");
278     try {
279       coord.getRpcs().resetMembers(this);
280     } catch (IOException e) {
281       coord.rpcConnectionFailure("Failed to reset procedure:" + procName, e);
282     }
283   }
284 
285   //
286   // Call backs from other external processes.
287   //
288 
289   /**
290    * Call back triggered by an individual member upon successful local barrier acquisition
291    * @param member
292    */
293   public void barrierAcquiredByMember(String member) {
294     LOG.debug("member: '" + member + "' joining acquired barrier for procedure '" + procName
295         + "' on coordinator");
296     if (this.acquiringMembers.contains(member)) {
297       synchronized (joinBarrierLock) {
298         if (this.acquiringMembers.remove(member)) {
299           this.inBarrierMembers.add(member);
300           acquiredBarrierLatch.countDown();
301         }
302       }
303       LOG.debug("Waiting on: " + acquiredBarrierLatch + " remaining members to acquire global barrier");
304     } else {
305       LOG.warn("Member " + member + " joined barrier, but we weren't waiting on it to join." +
306           " Continuing on.");
307     }
308   }
309 
310   /**
311    * Call back triggered by a individual member upon successful local in-barrier execution and
312    * release
313    * @param member
314    */
315   public void barrierReleasedByMember(String member) {
316     boolean removed = false;
317     synchronized (joinBarrierLock) {
318       removed = this.inBarrierMembers.remove(member);
319       if (removed) {
320         releasedBarrierLatch.countDown();
321       }
322     }
323     if (removed) {
324       LOG.debug("Member: '" + member + "' released barrier for procedure'" + procName
325           + "', counting down latch.  Waiting for " + releasedBarrierLatch.getCount()
326           + " more");
327     } else {
328       LOG.warn("Member: '" + member + "' released barrier for procedure'" + procName
329           + "', but we weren't waiting on it to release!");
330     }
331   }
332 
333   /**
334    * Waits until the entire procedure has globally completed, or has been aborted.  If an
335    * exception is thrown the procedure may or not have run cleanup to trigger the completion latch
336    * yet.
337    * @throws ForeignException
338    * @throws InterruptedException
339    */
340   public void waitForCompleted() throws ForeignException, InterruptedException {
341     waitForLatch(completedLatch, monitor, wakeFrequency, procName + " completed");
342   }
343 
344   /**
345    * A callback that handles incoming ForeignExceptions.
346    */
347   @Override
348   public void receive(ForeignException e) {
349     monitor.receive(e);
350   }
351 
352   /**
353    * Wait for latch to count to zero, ignoring any spurious wake-ups, but waking periodically to
354    * check for errors
355    * @param latch latch to wait on
356    * @param monitor monitor to check for errors while waiting
357    * @param wakeFrequency frequency to wake up and check for errors (in
358    *          {@link TimeUnit#MILLISECONDS})
359    * @param latchDescription description of the latch, for logging
360    * @throws ForeignException type of error the monitor can throw, if the task fails
361    * @throws InterruptedException if we are interrupted while waiting on latch
362    */
363   public static void waitForLatch(CountDownLatch latch, ForeignExceptionSnare monitor,
364       long wakeFrequency, String latchDescription) throws ForeignException,
365       InterruptedException {
366     boolean released = false;
367     while (!released) {
368       if (monitor != null) {
369         monitor.rethrowException();
370       }
371       /*
372       ForeignExceptionDispatcher.LOG.debug("Waiting for '" + latchDescription + "' latch. (sleep:"
373           + wakeFrequency + " ms)"); */
374       released = latch.await(wakeFrequency, TimeUnit.MILLISECONDS);
375     }
376     // check error again in case an error raised during last wait
377     if (monitor != null) {
378       monitor.rethrowException();
379     }
380   }
381 }