001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure;
019
020import java.io.IOException;
021import java.util.concurrent.Callable;
022import java.util.concurrent.CountDownLatch;
023import org.apache.hadoop.hbase.errorhandling.ForeignException;
024import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
025import org.apache.hadoop.hbase.errorhandling.ForeignExceptionListener;
026import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare;
027import org.apache.hadoop.hbase.errorhandling.TimeoutExceptionInjector;
028import org.apache.yetus.audience.InterfaceAudience;
029import org.apache.zookeeper.KeeperException;
030import org.slf4j.Logger;
031import org.slf4j.LoggerFactory;
032
033/**
034 * Distributed procedure member's Subprocedure.  A procedure is sarted on a ProcedureCoordinator
035 * which communicates with ProcedureMembers who create and start its part of the Procedure.  This
036 * sub part is called a Subprocedure
037 *
038 * Users should subclass this and implement {@link #acquireBarrier()} (get local barrier for this
039 * member), {@link #insideBarrier()} (execute while globally barriered and release barrier) and
040 * {@link #cleanup(Exception)} (release state associated with subprocedure.)
041 *
042 * When submitted to a ProcedureMemeber, the call method is executed in a separate thread.
043 * Latches are use too block its progress and trigger continuations when barrier conditions are
044 * met.
045 *
046 * Exception that makes it out of calls to {@link #acquireBarrier()} or {@link #insideBarrier()}
047 * gets converted into {@link ForeignException}, which will get propagated to the
048 * {@link ProcedureCoordinator}.
049 *
050 * There is a category of procedure (ex: online-snapshots), and a user-specified instance-specific
051 * barrierName. (ex: snapshot121126).
052 */
053@InterfaceAudience.Private
054abstract public class Subprocedure implements Callable<Void> {
055  private static final Logger LOG = LoggerFactory.getLogger(Subprocedure.class);
056
057  // Name of the procedure
058  final private String barrierName;
059
060  //
061  // Execution state
062  //
063
064  /** wait on before allowing the in barrier phase to proceed */
065  private final CountDownLatch inGlobalBarrier;
066  /** counted down when the Subprocedure has completed */
067  private final CountDownLatch releasedLocalBarrier;
068
069  //
070  // Error handling
071  //
072  /** monitor to check for errors */
073  protected final ForeignExceptionDispatcher monitor;
074  /** frequency to check for errors (ms) */
075  protected final long wakeFrequency;
076  protected final TimeoutExceptionInjector executionTimeoutTimer;
077  protected final ProcedureMemberRpcs rpcs;
078
079  private volatile boolean complete = false;
080
081  /**
082   * @param member reference to the member managing this subprocedure
083   * @param procName name of the procedure this subprocedure is associated with
084   * @param monitor notified if there is an error in the subprocedure
085   * @param wakeFrequency time in millis to wake to check if there is an error via the monitor (in
086   *          milliseconds).
087   * @param timeout time in millis that will trigger a subprocedure abort if it has not completed
088   */
089  public Subprocedure(ProcedureMember member, String procName, ForeignExceptionDispatcher monitor,
090      long wakeFrequency, long timeout) {
091    // Asserts should be caught during unit testing
092    assert member != null : "procedure member should be non-null";
093    assert member.getRpcs() != null : "rpc handlers should be non-null";
094    assert procName != null : "procedure name should be non-null";
095    assert monitor != null : "monitor should be non-null";
096
097    // Default to a very large timeout
098    this.rpcs = member.getRpcs();
099    this.barrierName = procName;
100    this.monitor = monitor;
101    // forward any failures to coordinator.  Since this is a dispatcher, resend loops should not be
102    // possible.
103    this.monitor.addListener(new ForeignExceptionListener() {
104      @Override
105      public void receive(ForeignException ee) {
106        // if this is a notification from a remote source, just log
107        if (ee.isRemote()) {
108          LOG.debug("Was remote foreign exception, not redispatching error", ee);
109          return;
110        }
111        // if this is a local KeeperException, don't attempt to notify other members
112        if (ee.getCause() instanceof KeeperException) {
113          LOG.debug("Was KeeperException, not redispatching error", ee);
114          return;
115        }
116        // if it is other local error, then send it to the coordinator
117        try {
118          rpcs.sendMemberAborted(Subprocedure.this, ee);
119        } catch (IOException e) {
120          // this will fail all the running procedures, since the connection is down
121          LOG.error("Can't reach controller, not propagating error", e);
122        }
123      }
124    });
125
126    this.wakeFrequency = wakeFrequency;
127    this.inGlobalBarrier = new CountDownLatch(1);
128    this.releasedLocalBarrier = new CountDownLatch(1);
129
130    // accept error from timer thread, this needs to be started.
131    this.executionTimeoutTimer = new TimeoutExceptionInjector(monitor, timeout);
132  }
133
134  public String getName() {
135     return barrierName;
136  }
137
138  public String getMemberName() {
139    return rpcs.getMemberName();
140  }
141
142  private void rethrowException() throws ForeignException {
143    monitor.rethrowException();
144  }
145
146  /**
147   * Execute the Subprocedure {@link #acquireBarrier()} and {@link #insideBarrier()} methods
148   * while keeping some state for other threads to access.
149   *
150   * This would normally be executed by the ProcedureMemeber when a acquire message comes from the
151   * coordinator.  Rpcs are used to spend message back to the coordinator after different phases
152   * are executed.  Any exceptions caught during the execution (except for InterruptedException) get
153   * converted and propagated to coordinator via {@link ProcedureMemberRpcs#sendMemberAborted(
154   * Subprocedure, ForeignException)}.
155   */
156  @SuppressWarnings("finally")
157  @Override
158  final public Void call() {
159    LOG.debug("Starting subprocedure '" + barrierName + "' with timeout " +
160        executionTimeoutTimer.getMaxTime() + "ms");
161    // start the execution timeout timer
162    executionTimeoutTimer.start();
163
164    try {
165      // start by checking for error first
166      rethrowException();
167      LOG.debug("Subprocedure '" + barrierName + "' starting 'acquire' stage");
168      acquireBarrier();
169      LOG.debug("Subprocedure '" + barrierName + "' locally acquired");
170      rethrowException();
171
172      // vote yes to coordinator about being prepared
173      rpcs.sendMemberAcquired(this);
174      LOG.debug("Subprocedure '" + barrierName + "' coordinator notified of 'acquire', waiting on" +
175          " 'reached' or 'abort' from coordinator");
176
177      // wait for the procedure to reach global barrier before proceding
178      waitForReachedGlobalBarrier();
179      rethrowException(); // if Coordinator aborts, will bail from here with exception
180
181      // In traditional 2PC, if a member reaches this state the TX has been committed and the
182      // member is responsible for rolling forward and recovering and completing the subsequent
183      // operations in the case of failure.  It cannot rollback.
184      //
185      // This implementation is not 2PC since it can still rollback here, and thus has different
186      // semantics.
187
188      LOG.debug("Subprocedure '" + barrierName + "' received 'reached' from coordinator.");
189      byte[] dataToCoordinator = insideBarrier();
190      LOG.debug("Subprocedure '" + barrierName + "' locally completed");
191      rethrowException();
192
193      // Ack that the member has executed and released local barrier
194      rpcs.sendMemberCompleted(this, dataToCoordinator);
195      LOG.debug("Subprocedure '" + barrierName + "' has notified controller of completion");
196
197      // make sure we didn't get an external exception
198      rethrowException();
199    } catch (Exception e) {
200      String msg = null;
201      if (e instanceof InterruptedException) {
202        msg = "Procedure '" + barrierName + "' aborting due to interrupt!" +
203            " Likely due to pool shutdown.";
204        Thread.currentThread().interrupt();
205      } else if (e instanceof ForeignException) {
206        msg = "Subprocedure '" + barrierName + "' aborting due to a ForeignException!";
207      } else {
208        msg = "Subprocedure '" + barrierName + "' failed!";
209      }
210      cancel(msg, e);
211
212      LOG.debug("Subprocedure '" + barrierName + "' running cleanup.");
213      cleanup(e);
214    } finally {
215      releasedLocalBarrier.countDown();
216
217      // tell the timer we are done, if we get here successfully
218      executionTimeoutTimer.complete();
219      complete = true;
220      LOG.debug("Subprocedure '" + barrierName + "' completed.");
221      return null;
222    }
223  }
224
225  boolean isComplete() {
226    return complete;
227  }
228
229  /**
230   * exposed for testing.
231   */
232  ForeignExceptionSnare getErrorCheckable() {
233    return this.monitor;
234  }
235
236  /**
237   * The implementation of this method should gather and hold required resources (locks, disk
238   * space, etc) to satisfy the Procedures barrier condition.  For example, this would be where
239   * to make all the regions on a RS on the quiescent for an procedure that required all regions
240   * to be globally quiesed.
241   *
242   * Users should override this method.  If a quiescent is not required, this is overkill but
243   * can still be used to execute a procedure on all members and to propagate any exceptions.
244   *
245   * @throws ForeignException
246   */
247  abstract public void acquireBarrier() throws ForeignException;
248
249  /**
250   * The implementation of this method should act with the assumption that the barrier condition
251   * has been satisfied.  Continuing the previous example, a condition could be that all RS's
252   * globally have been quiesced, and procedures that require this precondition could be
253   * implemented here.
254   * The implementation should also collect the result of the subprocedure as data to be returned
255   * to the coordinator upon successful completion.
256   * Users should override this method.
257   * @return the data the subprocedure wants to return to coordinator side.
258   * @throws ForeignException
259   */
260  abstract public byte[] insideBarrier() throws ForeignException;
261
262  /**
263   * Users should override this method. This implementation of this method should rollback and
264   * cleanup any temporary or partially completed state that the {@link #acquireBarrier()} may have
265   * created.
266   * @param e
267   */
268  abstract public void cleanup(Exception e);
269
270  /**
271   * Method to cancel the Subprocedure by injecting an exception from and external source.
272   * @param cause
273   */
274  public void cancel(String msg, Throwable cause) {
275    LOG.error(msg, cause);
276    complete = true;
277    if (cause instanceof ForeignException) {
278      monitor.receive((ForeignException) cause);
279    } else {
280      monitor.receive(new ForeignException(getMemberName(), cause));
281    }
282  }
283
284  /**
285   * Callback for the member rpcs to call when the global barrier has been reached.  This
286   * unblocks the main subprocedure exectuion thread so that the Subprocedure's
287   * {@link #insideBarrier()} method can be run.
288   */
289  public void receiveReachedGlobalBarrier() {
290    inGlobalBarrier.countDown();
291  }
292
293  //
294  // Subprocedure Internal State interface
295  //
296
297  /**
298   * Wait for the reached global barrier notification.
299   *
300   * Package visibility for testing
301   *
302   * @throws ForeignException
303   * @throws InterruptedException
304   */
305  void waitForReachedGlobalBarrier() throws ForeignException, InterruptedException {
306    Procedure.waitForLatch(inGlobalBarrier, monitor, wakeFrequency,
307        barrierName + ":remote acquired");
308  }
309
310  /**
311   * Waits until the entire procedure has globally completed, or has been aborted.
312   * @throws ForeignException
313   * @throws InterruptedException
314   */
315  public void waitForLocallyCompleted() throws ForeignException, InterruptedException {
316    Procedure.waitForLatch(releasedLocalBarrier, monitor, wakeFrequency,
317        barrierName + ":completed");
318  }
319
320  /**
321   * Empty Subprocedure for testing.
322   *
323   * Must be public for stubbing used in testing to work.
324   */
325  public static class SubprocedureImpl extends Subprocedure {
326
327    public SubprocedureImpl(ProcedureMember member, String opName,
328        ForeignExceptionDispatcher monitor, long wakeFrequency, long timeout) {
329      super(member, opName, monitor, wakeFrequency, timeout);
330    }
331
332    @Override
333    public void acquireBarrier() throws ForeignException {}
334
335    @Override
336    public byte[] insideBarrier() throws ForeignException {
337      return new byte[0];
338    }
339
340    @Override
341    public void cleanup(Exception e) {}
342  };
343}