View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.procedure;
19  
20  import java.io.IOException;
21  import java.util.concurrent.Callable;
22  import java.util.concurrent.CountDownLatch;
23  
24  import org.apache.commons.logging.Log;
25  import org.apache.commons.logging.LogFactory;
26  import org.apache.hadoop.hbase.errorhandling.ForeignException;
27  import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
28  import org.apache.hadoop.hbase.errorhandling.ForeignExceptionListener;
29  import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare;
30  import org.apache.hadoop.hbase.errorhandling.TimeoutExceptionInjector;
31  
32  /**
33   * Distributed procedure member's Subprocedure.  A procedure is sarted on a ProcedureCoordinator
34   * which communicates with ProcedureMembers who create and start its part of the Procedure.  This
35   * sub part is called a Subprocedure
36   *
37   * Users should subclass this and implement {@link #acquireBarrier()} (get local barrier for this
38   * member), {@link #insideBarrier()} (execute while globally barriered and release barrier) and
39   * {@link #cleanup(Exception)} (release state associated with subprocedure.)
40   *
41   * When submitted to a ProcedureMemeber, the call method is executed in a separate thread.
42   * Latches are use too block its progress and trigger continuations when barrier conditions are
43   * met.
44   *
45   * Exception that makes it out of calls to {@link #acquireBarrier()} or {@link #insideBarrier()}
46   * gets converted into {@link ForeignException}, which will get propagated to the
47   * {@link ProcedureCoordinator}.
48   *
49   * There is a category of procedure (ex: online-snapshots), and a user-specified instance-specific
50   * barrierName. (ex: snapshot121126).
51   */
52  abstract public class Subprocedure implements Callable<Void> {
53    private static final Log LOG = LogFactory.getLog(Subprocedure.class);
54  
55    // Name of the procedure
56    final private String barrierName;
57  
58    //
59    // Execution state
60    //
61  
62    /** wait on before allowing the in barrier phase to proceed */
63    private final CountDownLatch inGlobalBarrier;
64    /** counted down when the Subprocedure has completed */
65    private final CountDownLatch releasedLocalBarrier;
66  
67    //
68    // Error handling
69    //
70    /** monitor to check for errors */
71    protected final ForeignExceptionDispatcher monitor;
72    /** frequency to check for errors (ms) */
73    protected final long wakeFrequency;
74    protected final TimeoutExceptionInjector executionTimeoutTimer;
75    protected final ProcedureMemberRpcs rpcs;
76  
77    private volatile boolean complete = false;
78  
79    /**
80     * @param member reference to the member managing this subprocedure
81     * @param procName name of the procedure this subprocedure is associated with
82     * @param monitor notified if there is an error in the subprocedure
83     * @param wakeFrequency time in millis to wake to check if there is an error via the monitor (in
84     *          milliseconds).
85     * @param timeout time in millis that will trigger a subprocedure abort if it has not completed
86     */
87    public Subprocedure(ProcedureMember member, String procName, ForeignExceptionDispatcher monitor,
88        long wakeFrequency, long timeout) {
89      // Asserts should be caught during unit testing
90      assert member != null : "procedure member should be non-null";
91      assert member.getRpcs() != null : "rpc handlers should be non-null";
92      assert procName != null : "procedure name should be non-null";
93      assert monitor != null : "monitor should be non-null";
94  
95      // Default to a very large timeout
96      this.rpcs = member.getRpcs();
97      this.barrierName = procName;
98      this.monitor = monitor;
99      // forward any failures to coordinator.  Since this is a dispatcher, resend loops should not be
100     // possible.
101     this.monitor.addListener(new ForeignExceptionListener() {
102       @Override
103       public void receive(ForeignException ee) {
104         // if this is a notification from a remote source, just log
105         if (ee.isRemote()) {
106           LOG.debug("Was remote foreign exception, not redispatching error", ee);
107           return;
108         }
109 
110         // if it is local, then send it to the coordinator
111         try {
112           rpcs.sendMemberAborted(Subprocedure.this, ee);
113         } catch (IOException e) {
114           // this will fail all the running procedures, since the connection is down
115           LOG.error("Can't reach controller, not propagating error", e);
116         }
117       }
118     });
119 
120     this.wakeFrequency = wakeFrequency;
121     this.inGlobalBarrier = new CountDownLatch(1);
122     this.releasedLocalBarrier = new CountDownLatch(1);
123 
124     // accept error from timer thread, this needs to be started.
125     this.executionTimeoutTimer = new TimeoutExceptionInjector(monitor, timeout);
126   }
127 
128   public String getName() {
129      return barrierName;
130   }
131 
132   public String getMemberName() {
133     return rpcs.getMemberName();
134   }
135 
136   private void rethrowException() throws ForeignException {
137     monitor.rethrowException();
138   }
139 
140   /**
141    * Execute the Subprocedure {@link #acquireBarrier()} and {@link #insideBarrier()} methods
142    * while keeping some state for other threads to access.
143    *
144    * This would normally be executed by the ProcedureMemeber when a acquire message comes from the
145    * coordinator.  Rpcs are used to spend message back to the coordinator after different phases
146    * are executed.  Any exceptions caught during the execution (except for InterrupedException) get
147    * converted and propagated to coordinator via {@link ProcedureMemberRpcs#sendMemberAborted(
148    * Subprocedure, ForeignException)}.
149    */
150   @SuppressWarnings("finally")
151   final public Void call() {
152     LOG.debug("Starting subprocedure '" + barrierName + "' with timeout " +
153         executionTimeoutTimer.getMaxTime() + "ms");
154     // start the execution timeout timer
155     executionTimeoutTimer.start();
156 
157     try {
158       // start by checking for error first
159       rethrowException();
160       LOG.debug("Subprocedure '" + barrierName + "' starting 'acquire' stage");
161       acquireBarrier();
162       LOG.debug("Subprocedure '" + barrierName + "' locally acquired");
163       rethrowException();
164 
165       // vote yes to coordinator about being prepared
166       rpcs.sendMemberAcquired(this);
167       LOG.debug("Subprocedure '" + barrierName + "' coordinator notified of 'acquire', waiting on" +
168           " 'reached' or 'abort' from coordinator");
169 
170       // wait for the procedure to reach global barrier before proceding
171       waitForReachedGlobalBarrier();
172       rethrowException(); // if Coordinator aborts, will bail from here with exception
173 
174       // In traditional 2PC, if a member reaches this state the TX has been committed and the
175       // member is responsible for rolling forward and recovering and completing the subsequent
176       // operations in the case of failure.  It cannot rollback.
177       //
178       // This implementation is not 2PC since it can still rollback here, and thus has different
179       // semantics.
180 
181       LOG.debug("Subprocedure '" + barrierName + "' received 'reached' from coordinator.");
182       byte[] dataToCoordinator = insideBarrier();
183       LOG.debug("Subprocedure '" + barrierName + "' locally completed");
184       rethrowException();
185 
186       // Ack that the member has executed and released local barrier
187       rpcs.sendMemberCompleted(this, dataToCoordinator);
188       LOG.debug("Subprocedure '" + barrierName + "' has notified controller of completion");
189 
190       // make sure we didn't get an external exception
191       rethrowException();
192     } catch (Exception e) {
193       String msg = null;
194       if (e instanceof InterruptedException) {
195         msg = "Procedure '" + barrierName + "' aborting due to interrupt!" +
196             " Likely due to pool shutdown.";
197         Thread.currentThread().interrupt();
198       } else if (e instanceof ForeignException) {
199         msg = "Subprocedure '" + barrierName + "' aborting due to a ForeignException!";
200       } else {
201         msg = "Subprocedure '" + barrierName + "' failed!";
202       }
203       cancel(msg, e);
204 
205       LOG.debug("Subprocedure '" + barrierName + "' running cleanup.");
206       cleanup(e);
207     } finally {
208       releasedLocalBarrier.countDown();
209 
210       // tell the timer we are done, if we get here successfully
211       executionTimeoutTimer.complete();
212       complete = true;
213       LOG.debug("Subprocedure '" + barrierName + "' completed.");
214       return null;
215     }
216   }
217 
218   boolean isComplete() {
219     return complete;
220   }
221 
222   /**
223    * exposed for testing.
224    */
225   ForeignExceptionSnare getErrorCheckable() {
226     return this.monitor;
227   }
228 
229   /**
230    * The implementation of this method should gather and hold required resources (locks, disk
231    * space, etc) to satisfy the Procedures barrier condition.  For example, this would be where
232    * to make all the regions on a RS on the quiescent for an procedure that required all regions
233    * to be globally quiesed.
234    *
235    * Users should override this method.  If a quiescent is not required, this is overkill but
236    * can still be used to execute a procedure on all members and to propagate any exceptions.
237    *
238    * @throws ForeignException
239    */
240   abstract public void acquireBarrier() throws ForeignException;
241 
242   /**
243    * The implementation of this method should act with the assumption that the barrier condition
244    * has been satisfied.  Continuing the previous example, a condition could be that all RS's
245    * globally have been quiesced, and procedures that require this precondition could be
246    * implemented here.
247    * The implementation should also collect the result of the subprocedure as data to be returned
248    * to the coordinator upon successful completion.
249    * Users should override this method.
250    * @return the data the subprocedure wants to return to coordinator side.
251    * @throws ForeignException
252    */
253   abstract public byte[] insideBarrier() throws ForeignException;
254 
255   /**
256    * Users should override this method. This implementation of this method should rollback and
257    * cleanup any temporary or partially completed state that the {@link #acquireBarrier()} may have
258    * created.
259    * @param e
260    */
261   abstract public void cleanup(Exception e);
262 
263   /**
264    * Method to cancel the Subprocedure by injecting an exception from and external source.
265    * @param cause
266    */
267   public void cancel(String msg, Throwable cause) {
268     LOG.error(msg, cause);
269     complete = true;
270     if (cause instanceof ForeignException) {
271       monitor.receive((ForeignException) cause);
272     } else {
273       monitor.receive(new ForeignException(getMemberName(), cause));
274     }
275   }
276 
277   /**
278    * Callback for the member rpcs to call when the global barrier has been reached.  This
279    * unblocks the main subprocedure exectuion thread so that the Subprocedure's
280    * {@link #insideBarrier()} method can be run.
281    */
282   public void receiveReachedGlobalBarrier() {
283     inGlobalBarrier.countDown();
284   }
285 
286   //
287   // Subprocedure Internal State interface
288   //
289 
290   /**
291    * Wait for the reached global barrier notification.
292    *
293    * Package visibility for testing
294    *
295    * @throws ForeignException
296    * @throws InterruptedException
297    */
298   void waitForReachedGlobalBarrier() throws ForeignException, InterruptedException {
299     Procedure.waitForLatch(inGlobalBarrier, monitor, wakeFrequency,
300         barrierName + ":remote acquired");
301   }
302 
303   /**
304    * Waits until the entire procedure has globally completed, or has been aborted.
305    * @throws ForeignException
306    * @throws InterruptedException
307    */
308   public void waitForLocallyCompleted() throws ForeignException, InterruptedException {
309     Procedure.waitForLatch(releasedLocalBarrier, monitor, wakeFrequency,
310         barrierName + ":completed");
311   }
312 
313   /**
314    * Empty Subprocedure for testing.
315    *
316    * Must be public for stubbing used in testing to work.
317    */
318   public static class SubprocedureImpl extends Subprocedure {
319 
320     public SubprocedureImpl(ProcedureMember member, String opName,
321         ForeignExceptionDispatcher monitor, long wakeFrequency, long timeout) {
322       super(member, opName, monitor, wakeFrequency, timeout);
323     }
324 
325     @Override
326     public void acquireBarrier() throws ForeignException {}
327 
328     @Override
329     public byte[] insideBarrier() throws ForeignException {
330       return new byte[0];
331     }
332 
333     @Override
334     public void cleanup(Exception e) {}
335   };
336 }