001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure;
019
020import java.io.IOException;
021import java.util.concurrent.Callable;
022import java.util.concurrent.CountDownLatch;
023import org.apache.hadoop.hbase.errorhandling.ForeignException;
024import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
025import org.apache.hadoop.hbase.errorhandling.ForeignExceptionListener;
026import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare;
027import org.apache.hadoop.hbase.errorhandling.TimeoutExceptionInjector;
028import org.apache.yetus.audience.InterfaceAudience;
029import org.apache.zookeeper.KeeperException;
030import org.slf4j.Logger;
031import org.slf4j.LoggerFactory;
032
033/**
034 * Distributed procedure member's Subprocedure. A procedure is sarted on a ProcedureCoordinator
035 * which communicates with ProcedureMembers who create and start its part of the Procedure. This sub
036 * part is called a Subprocedure Users should subclass this and implement {@link #acquireBarrier()}
037 * (get local barrier for this member), {@link #insideBarrier()} (execute while globally barriered
038 * and release barrier) and {@link #cleanup(Exception)} (release state associated with
039 * subprocedure.) When submitted to a ProcedureMember, the call method is executed in a separate
040 * thread. Latches are use too block its progress and trigger continuations when barrier conditions
041 * are met. Exception that makes it out of calls to {@link #acquireBarrier()} or
042 * {@link #insideBarrier()} gets converted into {@link ForeignException}, which will get propagated
043 * to the {@link ProcedureCoordinator}. There is a category of procedure (ex: online-snapshots), and
044 * a user-specified instance-specific barrierName. (ex: snapshot121126).
045 */
046@InterfaceAudience.Private
047abstract public class Subprocedure implements Callable<Void> {
048  private static final Logger LOG = LoggerFactory.getLogger(Subprocedure.class);
049
050  // Name of the procedure
051  final private String barrierName;
052
053  //
054  // Execution state
055  //
056
057  /** wait on before allowing the in barrier phase to proceed */
058  private final CountDownLatch inGlobalBarrier;
059  /** counted down when the Subprocedure has completed */
060  private final CountDownLatch releasedLocalBarrier;
061
062  //
063  // Error handling
064  //
065  /** monitor to check for errors */
066  protected final ForeignExceptionDispatcher monitor;
067  /** frequency to check for errors (ms) */
068  protected final long wakeFrequency;
069  protected final TimeoutExceptionInjector executionTimeoutTimer;
070  protected final ProcedureMemberRpcs rpcs;
071
072  private volatile boolean complete = false;
073
074  /**
075   * @param member        reference to the member managing this subprocedure
076   * @param procName      name of the procedure this subprocedure is associated with
077   * @param monitor       notified if there is an error in the subprocedure
078   * @param wakeFrequency time in millis to wake to check if there is an error via the monitor (in
079   *                      milliseconds).
080   * @param timeout       time in millis that will trigger a subprocedure abort if it has not
081   *                      completed
082   */
083  public Subprocedure(ProcedureMember member, String procName, ForeignExceptionDispatcher monitor,
084    long wakeFrequency, long timeout) {
085    // Asserts should be caught during unit testing
086    assert member != null : "procedure member should be non-null";
087    assert member.getRpcs() != null : "rpc handlers should be non-null";
088    assert procName != null : "procedure name should be non-null";
089    assert monitor != null : "monitor should be non-null";
090
091    // Default to a very large timeout
092    this.rpcs = member.getRpcs();
093    this.barrierName = procName;
094    this.monitor = monitor;
095    // forward any failures to coordinator. Since this is a dispatcher, resend loops should not be
096    // possible.
097    this.monitor.addListener(new ForeignExceptionListener() {
098      @Override
099      public void receive(ForeignException ee) {
100        // if this is a notification from a remote source, just log
101        if (ee.isRemote()) {
102          LOG.debug("Was remote foreign exception, not redispatching error", ee);
103          return;
104        }
105        // if this is a local KeeperException, don't attempt to notify other members
106        if (ee.getCause() instanceof KeeperException) {
107          LOG.debug("Was KeeperException, not redispatching error", ee);
108          return;
109        }
110        // if it is other local error, then send it to the coordinator
111        try {
112          rpcs.sendMemberAborted(Subprocedure.this, ee);
113        } catch (IOException e) {
114          // this will fail all the running procedures, since the connection is down
115          LOG.error("Can't reach controller, not propagating error", e);
116        }
117      }
118    });
119
120    this.wakeFrequency = wakeFrequency;
121    this.inGlobalBarrier = new CountDownLatch(1);
122    this.releasedLocalBarrier = new CountDownLatch(1);
123
124    // accept error from timer thread, this needs to be started.
125    this.executionTimeoutTimer = new TimeoutExceptionInjector(monitor, timeout);
126  }
127
128  public String getName() {
129    return barrierName;
130  }
131
132  public String getMemberName() {
133    return rpcs.getMemberName();
134  }
135
136  private void rethrowException() throws ForeignException {
137    monitor.rethrowException();
138  }
139
140  /**
141   * Execute the Subprocedure {@link #acquireBarrier()} and {@link #insideBarrier()} methods while
142   * keeping some state for other threads to access. This would normally be executed by the
143   * ProcedureMember when a acquire message comes from the coordinator. Rpcs are used to spend
144   * message back to the coordinator after different phases are executed. Any exceptions caught
145   * during the execution (except for InterruptedException) get converted and propagated to
146   * coordinator via {@link ProcedureMemberRpcs#sendMemberAborted( Subprocedure, ForeignException)}.
147   */
148  @SuppressWarnings("finally")
149  @Override
150  final public Void call() {
151    LOG.debug("Starting subprocedure '" + barrierName + "' with timeout "
152      + executionTimeoutTimer.getMaxTime() + "ms");
153    // start the execution timeout timer
154    executionTimeoutTimer.start();
155
156    try {
157      // start by checking for error first
158      rethrowException();
159      LOG.debug("Subprocedure '" + barrierName + "' starting 'acquire' stage");
160      acquireBarrier();
161      LOG.debug("Subprocedure '" + barrierName + "' locally acquired");
162      rethrowException();
163
164      // vote yes to coordinator about being prepared
165      rpcs.sendMemberAcquired(this);
166      LOG.debug("Subprocedure '" + barrierName + "' coordinator notified of 'acquire', waiting on"
167        + " 'reached' or 'abort' from coordinator");
168
169      // wait for the procedure to reach global barrier before proceding
170      waitForReachedGlobalBarrier();
171      rethrowException(); // if Coordinator aborts, will bail from here with exception
172
173      // In traditional 2PC, if a member reaches this state the TX has been committed and the
174      // member is responsible for rolling forward and recovering and completing the subsequent
175      // operations in the case of failure. It cannot rollback.
176      //
177      // This implementation is not 2PC since it can still rollback here, and thus has different
178      // semantics.
179
180      LOG.debug("Subprocedure '" + barrierName + "' received 'reached' from coordinator.");
181      byte[] dataToCoordinator = insideBarrier();
182      LOG.debug("Subprocedure '" + barrierName + "' locally completed");
183      rethrowException();
184
185      // Ack that the member has executed and released local barrier
186      rpcs.sendMemberCompleted(this, dataToCoordinator);
187      LOG.debug("Subprocedure '" + barrierName + "' has notified controller of completion");
188
189      // make sure we didn't get an external exception
190      rethrowException();
191    } catch (Exception e) {
192      String msg = null;
193      if (e instanceof InterruptedException) {
194        msg = "Procedure '" + barrierName + "' aborting due to interrupt!"
195          + " Likely due to pool shutdown.";
196        Thread.currentThread().interrupt();
197      } else if (e instanceof ForeignException) {
198        msg = "Subprocedure '" + barrierName + "' aborting due to a ForeignException!";
199      } else {
200        msg = "Subprocedure '" + barrierName + "' failed!";
201      }
202      cancel(msg, e);
203
204      LOG.debug("Subprocedure '" + barrierName + "' running cleanup.");
205      cleanup(e);
206    } finally {
207      releasedLocalBarrier.countDown();
208
209      // tell the timer we are done, if we get here successfully
210      executionTimeoutTimer.complete();
211      complete = true;
212      LOG.debug("Subprocedure '" + barrierName + "' completed.");
213      return null;
214    }
215  }
216
217  boolean isComplete() {
218    return complete;
219  }
220
221  /**
222   * exposed for testing.
223   */
224  ForeignExceptionSnare getErrorCheckable() {
225    return this.monitor;
226  }
227
228  /**
229   * The implementation of this method should gather and hold required resources (locks, disk space,
230   * etc) to satisfy the Procedures barrier condition. For example, this would be where to make all
231   * the regions on a RS on the quiescent for an procedure that required all regions to be globally
232   * quiesed. Users should override this method. If a quiescent is not required, this is overkill
233   * but can still be used to execute a procedure on all members and to propagate any exceptions.
234   */
235  abstract public void acquireBarrier() throws ForeignException;
236
237  /**
238   * The implementation of this method should act with the assumption that the barrier condition has
239   * been satisfied. Continuing the previous example, a condition could be that all RS's globally
240   * have been quiesced, and procedures that require this precondition could be implemented here.
241   * The implementation should also collect the result of the subprocedure as data to be returned to
242   * the coordinator upon successful completion. Users should override this method.
243   * @return the data the subprocedure wants to return to coordinator side.
244   */
245  abstract public byte[] insideBarrier() throws ForeignException;
246
247  /**
248   * Users should override this method. This implementation of this method should rollback and
249   * cleanup any temporary or partially completed state that the {@link #acquireBarrier()} may have
250   * created.
251   */
252  abstract public void cleanup(Exception e);
253
254  /**
255   * Method to cancel the Subprocedure by injecting an exception from and external source.
256   */
257  public void cancel(String msg, Throwable cause) {
258    LOG.error(msg, cause);
259    complete = true;
260    if (cause instanceof ForeignException) {
261      monitor.receive((ForeignException) cause);
262    } else {
263      monitor.receive(new ForeignException(getMemberName(), cause));
264    }
265  }
266
267  /**
268   * Callback for the member rpcs to call when the global barrier has been reached. This unblocks
269   * the main subprocedure exectuion thread so that the Subprocedure's {@link #insideBarrier()}
270   * method can be run.
271   */
272  public void receiveReachedGlobalBarrier() {
273    inGlobalBarrier.countDown();
274  }
275
276  //
277  // Subprocedure Internal State interface
278  //
279
280  /**
281   * Wait for the reached global barrier notification. Package visibility for testing
282   */
283  void waitForReachedGlobalBarrier() throws ForeignException, InterruptedException {
284    Procedure.waitForLatch(inGlobalBarrier, monitor, wakeFrequency,
285      barrierName + ":remote acquired");
286  }
287
288  /**
289   * Waits until the entire procedure has globally completed, or has been aborted.
290   */
291  public void waitForLocallyCompleted() throws ForeignException, InterruptedException {
292    Procedure.waitForLatch(releasedLocalBarrier, monitor, wakeFrequency,
293      barrierName + ":completed");
294  }
295
296  /**
297   * Empty Subprocedure for testing. Must be public for stubbing used in testing to work.
298   */
299  public static class SubprocedureImpl extends Subprocedure {
300
301    public SubprocedureImpl(ProcedureMember member, String opName,
302      ForeignExceptionDispatcher monitor, long wakeFrequency, long timeout) {
303      super(member, opName, monitor, wakeFrequency, timeout);
304    }
305
306    @Override
307    public void acquireBarrier() throws ForeignException {
308    }
309
310    @Override
311    public byte[] insideBarrier() throws ForeignException {
312      return new byte[0];
313    }
314
315    @Override
316    public void cleanup(Exception e) {
317    }
318  }
319}