View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.procedure;
19  
20  import java.io.IOException;
21  import java.util.ArrayList;
22  import java.util.HashMap;
23  import java.util.List;
24  import java.util.concurrent.Callable;
25  import java.util.concurrent.CountDownLatch;
26  import java.util.concurrent.TimeUnit;
27  
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.apache.hadoop.hbase.classification.InterfaceAudience;
31  import org.apache.hadoop.hbase.errorhandling.ForeignException;
32  import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
33  import org.apache.hadoop.hbase.errorhandling.ForeignExceptionListener;
34  import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare;
35  import org.apache.hadoop.hbase.errorhandling.TimeoutExceptionInjector;
36  
37  import com.google.common.collect.Lists;
38  
39  /**
40   * A globally-barriered distributed procedure.  This class encapsulates state and methods for
41   * tracking and managing a distributed procedure, as well as aborting if any member encounters
42   * a problem or if a cancellation is requested.
43   * <p>
44   * All procedures first attempt to reach a barrier point with the {@link #sendGlobalBarrierStart()}
45   * method.  The procedure contacts all members and waits for all subprocedures to execute
46   * {@link Subprocedure#acquireBarrier} to acquire its local piece of the global barrier and then
47   * send acquisition info back to the coordinator.  If all acquisitions at subprocedures succeed,
48   * the coordinator then will call {@link #sendGlobalBarrierReached()}.  This notifies members to
49   * execute the {@link Subprocedure#insideBarrier()} method.  The procedure is blocked until all
50   * {@link Subprocedure#insideBarrier} executions complete at the members.  When
51   * {@link Subprocedure#insideBarrier} completes at each member, the member sends notification to
52   * the coordinator.  Once all members complete, the coordinator calls
53   * {@link #sendGlobalBarrierComplete()}.
54   * <p>
55   * If errors are encountered remotely, they are forwarded to the coordinator, and
56   * {@link Subprocedure#cleanup(Exception)} is called.
57   * <p>
58   * Each Procedure and each Subprocedure enforces a time limit on the execution time. If the time
59   * limit expires before the procedure completes the {@link TimeoutExceptionInjector} will trigger
60   * an {@link ForeignException} to abort the procedure.  This is particularly useful for situations
61   * when running a distributed {@link Subprocedure} so participants can avoid blocking for extreme
62   * amounts of time if one of the participants fails or takes a really long time (e.g. GC pause).
63   * <p>
64   * Users should generally not directly create or subclass instances of this.  They are created
65   * for them implicitly via {@link ProcedureCoordinator#startProcedure(ForeignExceptionDispatcher,
66   * String, byte[], List)}}
67   */
68  @InterfaceAudience.Private
69  public class Procedure implements Callable<Void>, ForeignExceptionListener {
70    private static final Log LOG = LogFactory.getLog(Procedure.class);
71  
72    //
73    // Arguments and naming
74    //
75  
76    // Name of the procedure
77    final private String procName;
78    // Arguments for this procedure execution
79    final private byte[] args;
80  
81    //
82    // Execution State
83    //
84    /** latch for waiting until all members have acquire in barrier state */
85    final CountDownLatch acquiredBarrierLatch;
86    /** latch for waiting until all members have executed and released their in barrier state */
87    final CountDownLatch releasedBarrierLatch;
88    /** latch for waiting until a procedure has completed */
89    final CountDownLatch completedLatch;
90    /** monitor to check for errors */
91    private final ForeignExceptionDispatcher monitor;
92  
93    //
94    // Execution Timeout Handling.
95    //
96  
97    /** frequency to check for errors (ms) */
98    protected final long wakeFrequency;
99    protected final TimeoutExceptionInjector timeoutInjector;
100 
101   //
102   // Members' and Coordinator's state
103   //
104 
105   /** lock to prevent nodes from acquiring and then releasing before we can track them */
106   private Object joinBarrierLock = new Object();
107   private final List<String> acquiringMembers;
108   private final List<String> inBarrierMembers;
109   private final HashMap<String, byte[]> dataFromFinishedMembers;
110   private ProcedureCoordinator coord;
111 
112   /**
113    * Creates a procedure. (FOR TESTING)
114    *
115    * {@link Procedure} state to be run by a {@link ProcedureCoordinator}.
116    * @param coord coordinator to call back to for general errors (e.g.
117    *          {@link ProcedureCoordinator#rpcConnectionFailure(String, IOException)}).
118    * @param monitor error monitor to check for external errors
119    * @param wakeFreq frequency to check for errors while waiting
120    * @param timeout amount of time to allow the procedure to run before cancelling
121    * @param procName name of the procedure instance
122    * @param args argument data associated with the procedure instance
123    * @param expectedMembers names of the expected members
124    */
125   public Procedure(ProcedureCoordinator coord, ForeignExceptionDispatcher monitor, long wakeFreq,
126       long timeout, String procName, byte[] args, List<String> expectedMembers) {
127     this.coord = coord;
128     this.acquiringMembers = new ArrayList<String>(expectedMembers);
129     this.inBarrierMembers = new ArrayList<String>(acquiringMembers.size());
130     this.dataFromFinishedMembers = new HashMap<String, byte[]>();
131     this.procName = procName;
132     this.args = args;
133     this.monitor = monitor;
134     this.wakeFrequency = wakeFreq;
135 
136     int count = expectedMembers.size();
137     this.acquiredBarrierLatch = new CountDownLatch(count);
138     this.releasedBarrierLatch = new CountDownLatch(count);
139     this.completedLatch = new CountDownLatch(1);
140     this.timeoutInjector = new TimeoutExceptionInjector(monitor, timeout);
141   }
142 
143   /**
144    * Create a procedure.
145    *
146    * Users should generally not directly create instances of this.  They are created them
147    * implicitly via {@link ProcedureCoordinator#createProcedure(ForeignExceptionDispatcher,
148    * String, byte[], List)}}
149    *
150    * @param coord coordinator to call back to for general errors (e.g.
151    *          {@link ProcedureCoordinator#rpcConnectionFailure(String, IOException)}).
152    * @param wakeFreq frequency to check for errors while waiting
153    * @param timeout amount of time to allow the procedure to run before cancelling
154    * @param procName name of the procedure instance
155    * @param args argument data associated with the procedure instance
156    * @param expectedMembers names of the expected members
157    */
158   public Procedure(ProcedureCoordinator coord, long wakeFreq, long timeout,
159       String procName, byte[] args, List<String> expectedMembers) {
160     this(coord, new ForeignExceptionDispatcher(), wakeFreq, timeout, procName, args,
161         expectedMembers);
162   }
163 
164   public String getName() {
165     return procName;
166   }
167 
168   /**
169    * @return String of the procedure members both trying to enter the barrier and already in barrier
170    */
171   public String getStatus() {
172     String waiting, done;
173     synchronized (joinBarrierLock) {
174       waiting = acquiringMembers.toString();
175       done = inBarrierMembers.toString();
176     }
177     return "Procedure " + procName + " { waiting=" + waiting + " done="+ done + " }";
178   }
179 
180   /**
181    * Get the ForeignExceptionDispatcher
182    * @return the Procedure's monitor.
183    */
184   public ForeignExceptionDispatcher getErrorMonitor() {
185     return monitor;
186   }
187 
188   /**
189    * This call is the main execution thread of the barriered procedure.  It sends messages and
190    * essentially blocks until all procedure members acquire or later complete but periodically
191    * checks for foreign exceptions.
192    */
193   @Override
194   @SuppressWarnings("finally")
195   final public Void call() {
196     LOG.info("Starting procedure '" + procName + "'");
197     // start the timer
198     timeoutInjector.start();
199 
200     // run the procedure
201     try {
202       // start by checking for error first
203       monitor.rethrowException();
204       LOG.debug("Procedure '" + procName + "' starting 'acquire'");
205       sendGlobalBarrierStart();
206 
207       // wait for all the members to report acquisition
208       LOG.debug("Waiting for all members to 'acquire'");
209       waitForLatch(acquiredBarrierLatch, monitor, wakeFrequency, "acquired");
210       monitor.rethrowException();
211 
212       LOG.debug("Procedure '" + procName + "' starting 'in-barrier' execution.");
213       sendGlobalBarrierReached();
214 
215       // wait for all members to report barrier release
216       waitForLatch(releasedBarrierLatch, monitor, wakeFrequency, "released");
217 
218       // make sure we didn't get an error during in barrier execution and release
219       monitor.rethrowException();
220       LOG.info("Procedure '" + procName + "' execution completed");
221     } catch (Exception e) {
222       if (e instanceof InterruptedException) {
223         Thread.currentThread().interrupt();
224       }
225       String msg = "Procedure '" + procName +"' execution failed!";
226       LOG.error(msg, e);
227       receive(new ForeignException(getName(), e));
228     } finally {
229       LOG.debug("Running finish phase.");
230       sendGlobalBarrierComplete();
231       completedLatch.countDown();
232 
233       // tell the timer we are done, if we get here successfully
234       timeoutInjector.complete();
235       return null;
236     }
237   }
238 
239   /**
240    * Sends a message to Members to create a new {@link Subprocedure} for this Procedure and execute
241    * the {@link Subprocedure#acquireBarrier} step.
242    * @throws ForeignException
243    */
244   public void sendGlobalBarrierStart() throws ForeignException {
245     // start the procedure
246     LOG.debug("Starting procedure '" + procName + "', kicking off acquire phase on members.");
247     try {
248       // send procedure barrier start to specified list of members. cloning the list to avoid
249       // concurrent modification from the controller setting the prepared nodes
250       coord.getRpcs().sendGlobalBarrierAcquire(this, args, Lists.newArrayList(this.acquiringMembers));
251     } catch (IOException e) {
252       coord.rpcConnectionFailure("Can't reach controller.", e);
253     } catch (IllegalArgumentException e) {
254       throw new ForeignException(getName(), e);
255     }
256   }
257 
258   /**
259    * Sends a message to all members that the global barrier condition has been satisfied.  This
260    * should only be executed after all members have completed its
261    * {@link Subprocedure#acquireBarrier()} call successfully.  This triggers the member
262    * {@link Subprocedure#insideBarrier} method.
263    * @throws ForeignException
264    */
265   public void sendGlobalBarrierReached() throws ForeignException {
266     try {
267       // trigger to have member run {@link Subprocedure#insideBarrier}
268       coord.getRpcs().sendGlobalBarrierReached(this, Lists.newArrayList(inBarrierMembers));
269     } catch (IOException e) {
270       coord.rpcConnectionFailure("Can't reach controller.", e);
271     }
272   }
273 
274   /**
275    * Sends a message to members that all {@link Subprocedure#insideBarrier} calls have completed.
276    * After this executes, the coordinator can assume that any state resources about this barrier
277    * procedure state has been released.
278    */
279   public void sendGlobalBarrierComplete() {
280     LOG.debug("Finished coordinator procedure - removing self from list of running procedures");
281     try {
282       coord.getRpcs().resetMembers(this);
283     } catch (IOException e) {
284       coord.rpcConnectionFailure("Failed to reset procedure:" + procName, e);
285     }
286   }
287 
288   //
289   // Call backs from other external processes.
290   //
291 
292   /**
293    * Call back triggered by an individual member upon successful local barrier acquisition
294    * @param member
295    */
296   public void barrierAcquiredByMember(String member) {
297     LOG.debug("member: '" + member + "' joining acquired barrier for procedure '" + procName
298         + "' on coordinator");
299     if (this.acquiringMembers.contains(member)) {
300       synchronized (joinBarrierLock) {
301         if (this.acquiringMembers.remove(member)) {
302           this.inBarrierMembers.add(member);
303           acquiredBarrierLatch.countDown();
304         }
305       }
306       LOG.debug("Waiting on: " + acquiredBarrierLatch + " remaining members to acquire global barrier");
307     } else {
308       LOG.warn("Member " + member + " joined barrier, but we weren't waiting on it to join." +
309           " Continuing on.");
310     }
311   }
312 
313   /**
314    * Call back triggered by a individual member upon successful local in-barrier execution and
315    * release
316    * @param member
317    * @param dataFromMember
318    */
319   public void barrierReleasedByMember(String member, byte[] dataFromMember) {
320     boolean removed = false;
321     synchronized (joinBarrierLock) {
322       removed = this.inBarrierMembers.remove(member);
323       if (removed) {
324         releasedBarrierLatch.countDown();
325       }
326     }
327     if (removed) {
328       LOG.debug("Member: '" + member + "' released barrier for procedure'" + procName
329           + "', counting down latch.  Waiting for " + releasedBarrierLatch.getCount()
330           + " more");
331     } else {
332       LOG.warn("Member: '" + member + "' released barrier for procedure'" + procName
333           + "', but we weren't waiting on it to release!");
334     }
335     dataFromFinishedMembers.put(member, dataFromMember);
336   }
337 
338   /**
339    * Waits until the entire procedure has globally completed, or has been aborted.  If an
340    * exception is thrown the procedure may or not have run cleanup to trigger the completion latch
341    * yet.
342    * @throws ForeignException
343    * @throws InterruptedException
344    */
345   public void waitForCompleted() throws ForeignException, InterruptedException {
346     waitForLatch(completedLatch, monitor, wakeFrequency, procName + " completed");
347   }
348 
349   /**
350    * Waits until the entire procedure has globally completed, or has been aborted.  If an
351    * exception is thrown the procedure may or not have run cleanup to trigger the completion latch
352    * yet.
353    * @return data returned from procedure members upon successfully completing subprocedure.
354    * @throws ForeignException
355    * @throws InterruptedException
356    */
357   public HashMap<String, byte[]> waitForCompletedWithRet() throws ForeignException, InterruptedException {
358     waitForCompleted();
359     return dataFromFinishedMembers;
360   }
361 
362   /**
363    * Check if the entire procedure has globally completed, or has been aborted.
364    * @throws ForeignException
365    */
366   public boolean isCompleted() throws ForeignException {
367     // Rethrow exception if any
368     monitor.rethrowException();
369     return (completedLatch.getCount() == 0);
370   }
371 
372   /**
373    * A callback that handles incoming ForeignExceptions.
374    */
375   @Override
376   public void receive(ForeignException e) {
377     monitor.receive(e);
378   }
379 
380   /**
381    * Wait for latch to count to zero, ignoring any spurious wake-ups, but waking periodically to
382    * check for errors
383    * @param latch latch to wait on
384    * @param monitor monitor to check for errors while waiting
385    * @param wakeFrequency frequency to wake up and check for errors (in
386    *          {@link TimeUnit#MILLISECONDS})
387    * @param latchDescription description of the latch, for logging
388    * @throws ForeignException type of error the monitor can throw, if the task fails
389    * @throws InterruptedException if we are interrupted while waiting on latch
390    */
391   public static void waitForLatch(CountDownLatch latch, ForeignExceptionSnare monitor,
392       long wakeFrequency, String latchDescription) throws ForeignException,
393       InterruptedException {
394     boolean released = false;
395     while (!released) {
396       if (monitor != null) {
397         monitor.rethrowException();
398       }
399       /*
400       ForeignExceptionDispatcher.LOG.debug("Waiting for '" + latchDescription + "' latch. (sleep:"
401           + wakeFrequency + " ms)"); */
402       released = latch.await(wakeFrequency, TimeUnit.MILLISECONDS);
403     }
404     // check error again in case an error raised during last wait
405     if (monitor != null) {
406       monitor.rethrowException();
407     }
408   }
409 }