001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure; 019 020import java.io.IOException; 021import java.util.concurrent.Callable; 022import java.util.concurrent.CountDownLatch; 023import org.apache.hadoop.hbase.errorhandling.ForeignException; 024import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; 025import org.apache.hadoop.hbase.errorhandling.ForeignExceptionListener; 026import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare; 027import org.apache.hadoop.hbase.errorhandling.TimeoutExceptionInjector; 028import org.apache.yetus.audience.InterfaceAudience; 029import org.apache.zookeeper.KeeperException; 030import org.slf4j.Logger; 031import org.slf4j.LoggerFactory; 032 033/** 034 * Distributed procedure member's Subprocedure. A procedure is sarted on a ProcedureCoordinator 035 * which communicates with ProcedureMembers who create and start its part of the Procedure. This 036 * sub part is called a Subprocedure 037 * 038 * Users should subclass this and implement {@link #acquireBarrier()} (get local barrier for this 039 * member), {@link #insideBarrier()} (execute while globally barriered and release barrier) and 040 * {@link #cleanup(Exception)} (release state associated with subprocedure.) 041 * 042 * When submitted to a ProcedureMemeber, the call method is executed in a separate thread. 043 * Latches are use too block its progress and trigger continuations when barrier conditions are 044 * met. 045 * 046 * Exception that makes it out of calls to {@link #acquireBarrier()} or {@link #insideBarrier()} 047 * gets converted into {@link ForeignException}, which will get propagated to the 048 * {@link ProcedureCoordinator}. 049 * 050 * There is a category of procedure (ex: online-snapshots), and a user-specified instance-specific 051 * barrierName. (ex: snapshot121126). 052 */ 053@InterfaceAudience.Private 054abstract public class Subprocedure implements Callable<Void> { 055 private static final Logger LOG = LoggerFactory.getLogger(Subprocedure.class); 056 057 // Name of the procedure 058 final private String barrierName; 059 060 // 061 // Execution state 062 // 063 064 /** wait on before allowing the in barrier phase to proceed */ 065 private final CountDownLatch inGlobalBarrier; 066 /** counted down when the Subprocedure has completed */ 067 private final CountDownLatch releasedLocalBarrier; 068 069 // 070 // Error handling 071 // 072 /** monitor to check for errors */ 073 protected final ForeignExceptionDispatcher monitor; 074 /** frequency to check for errors (ms) */ 075 protected final long wakeFrequency; 076 protected final TimeoutExceptionInjector executionTimeoutTimer; 077 protected final ProcedureMemberRpcs rpcs; 078 079 private volatile boolean complete = false; 080 081 /** 082 * @param member reference to the member managing this subprocedure 083 * @param procName name of the procedure this subprocedure is associated with 084 * @param monitor notified if there is an error in the subprocedure 085 * @param wakeFrequency time in millis to wake to check if there is an error via the monitor (in 086 * milliseconds). 087 * @param timeout time in millis that will trigger a subprocedure abort if it has not completed 088 */ 089 public Subprocedure(ProcedureMember member, String procName, ForeignExceptionDispatcher monitor, 090 long wakeFrequency, long timeout) { 091 // Asserts should be caught during unit testing 092 assert member != null : "procedure member should be non-null"; 093 assert member.getRpcs() != null : "rpc handlers should be non-null"; 094 assert procName != null : "procedure name should be non-null"; 095 assert monitor != null : "monitor should be non-null"; 096 097 // Default to a very large timeout 098 this.rpcs = member.getRpcs(); 099 this.barrierName = procName; 100 this.monitor = monitor; 101 // forward any failures to coordinator. Since this is a dispatcher, resend loops should not be 102 // possible. 103 this.monitor.addListener(new ForeignExceptionListener() { 104 @Override 105 public void receive(ForeignException ee) { 106 // if this is a notification from a remote source, just log 107 if (ee.isRemote()) { 108 LOG.debug("Was remote foreign exception, not redispatching error", ee); 109 return; 110 } 111 // if this is a local KeeperException, don't attempt to notify other members 112 if (ee.getCause() instanceof KeeperException) { 113 LOG.debug("Was KeeperException, not redispatching error", ee); 114 return; 115 } 116 // if it is other local error, then send it to the coordinator 117 try { 118 rpcs.sendMemberAborted(Subprocedure.this, ee); 119 } catch (IOException e) { 120 // this will fail all the running procedures, since the connection is down 121 LOG.error("Can't reach controller, not propagating error", e); 122 } 123 } 124 }); 125 126 this.wakeFrequency = wakeFrequency; 127 this.inGlobalBarrier = new CountDownLatch(1); 128 this.releasedLocalBarrier = new CountDownLatch(1); 129 130 // accept error from timer thread, this needs to be started. 131 this.executionTimeoutTimer = new TimeoutExceptionInjector(monitor, timeout); 132 } 133 134 public String getName() { 135 return barrierName; 136 } 137 138 public String getMemberName() { 139 return rpcs.getMemberName(); 140 } 141 142 private void rethrowException() throws ForeignException { 143 monitor.rethrowException(); 144 } 145 146 /** 147 * Execute the Subprocedure {@link #acquireBarrier()} and {@link #insideBarrier()} methods 148 * while keeping some state for other threads to access. 149 * 150 * This would normally be executed by the ProcedureMemeber when a acquire message comes from the 151 * coordinator. Rpcs are used to spend message back to the coordinator after different phases 152 * are executed. Any exceptions caught during the execution (except for InterruptedException) get 153 * converted and propagated to coordinator via {@link ProcedureMemberRpcs#sendMemberAborted( 154 * Subprocedure, ForeignException)}. 155 */ 156 @SuppressWarnings("finally") 157 @Override 158 final public Void call() { 159 LOG.debug("Starting subprocedure '" + barrierName + "' with timeout " + 160 executionTimeoutTimer.getMaxTime() + "ms"); 161 // start the execution timeout timer 162 executionTimeoutTimer.start(); 163 164 try { 165 // start by checking for error first 166 rethrowException(); 167 LOG.debug("Subprocedure '" + barrierName + "' starting 'acquire' stage"); 168 acquireBarrier(); 169 LOG.debug("Subprocedure '" + barrierName + "' locally acquired"); 170 rethrowException(); 171 172 // vote yes to coordinator about being prepared 173 rpcs.sendMemberAcquired(this); 174 LOG.debug("Subprocedure '" + barrierName + "' coordinator notified of 'acquire', waiting on" + 175 " 'reached' or 'abort' from coordinator"); 176 177 // wait for the procedure to reach global barrier before proceding 178 waitForReachedGlobalBarrier(); 179 rethrowException(); // if Coordinator aborts, will bail from here with exception 180 181 // In traditional 2PC, if a member reaches this state the TX has been committed and the 182 // member is responsible for rolling forward and recovering and completing the subsequent 183 // operations in the case of failure. It cannot rollback. 184 // 185 // This implementation is not 2PC since it can still rollback here, and thus has different 186 // semantics. 187 188 LOG.debug("Subprocedure '" + barrierName + "' received 'reached' from coordinator."); 189 byte[] dataToCoordinator = insideBarrier(); 190 LOG.debug("Subprocedure '" + barrierName + "' locally completed"); 191 rethrowException(); 192 193 // Ack that the member has executed and released local barrier 194 rpcs.sendMemberCompleted(this, dataToCoordinator); 195 LOG.debug("Subprocedure '" + barrierName + "' has notified controller of completion"); 196 197 // make sure we didn't get an external exception 198 rethrowException(); 199 } catch (Exception e) { 200 String msg = null; 201 if (e instanceof InterruptedException) { 202 msg = "Procedure '" + barrierName + "' aborting due to interrupt!" + 203 " Likely due to pool shutdown."; 204 Thread.currentThread().interrupt(); 205 } else if (e instanceof ForeignException) { 206 msg = "Subprocedure '" + barrierName + "' aborting due to a ForeignException!"; 207 } else { 208 msg = "Subprocedure '" + barrierName + "' failed!"; 209 } 210 cancel(msg, e); 211 212 LOG.debug("Subprocedure '" + barrierName + "' running cleanup."); 213 cleanup(e); 214 } finally { 215 releasedLocalBarrier.countDown(); 216 217 // tell the timer we are done, if we get here successfully 218 executionTimeoutTimer.complete(); 219 complete = true; 220 LOG.debug("Subprocedure '" + barrierName + "' completed."); 221 return null; 222 } 223 } 224 225 boolean isComplete() { 226 return complete; 227 } 228 229 /** 230 * exposed for testing. 231 */ 232 ForeignExceptionSnare getErrorCheckable() { 233 return this.monitor; 234 } 235 236 /** 237 * The implementation of this method should gather and hold required resources (locks, disk 238 * space, etc) to satisfy the Procedures barrier condition. For example, this would be where 239 * to make all the regions on a RS on the quiescent for an procedure that required all regions 240 * to be globally quiesed. 241 * 242 * Users should override this method. If a quiescent is not required, this is overkill but 243 * can still be used to execute a procedure on all members and to propagate any exceptions. 244 * 245 * @throws ForeignException 246 */ 247 abstract public void acquireBarrier() throws ForeignException; 248 249 /** 250 * The implementation of this method should act with the assumption that the barrier condition 251 * has been satisfied. Continuing the previous example, a condition could be that all RS's 252 * globally have been quiesced, and procedures that require this precondition could be 253 * implemented here. 254 * The implementation should also collect the result of the subprocedure as data to be returned 255 * to the coordinator upon successful completion. 256 * Users should override this method. 257 * @return the data the subprocedure wants to return to coordinator side. 258 * @throws ForeignException 259 */ 260 abstract public byte[] insideBarrier() throws ForeignException; 261 262 /** 263 * Users should override this method. This implementation of this method should rollback and 264 * cleanup any temporary or partially completed state that the {@link #acquireBarrier()} may have 265 * created. 266 * @param e 267 */ 268 abstract public void cleanup(Exception e); 269 270 /** 271 * Method to cancel the Subprocedure by injecting an exception from and external source. 272 * @param cause 273 */ 274 public void cancel(String msg, Throwable cause) { 275 LOG.error(msg, cause); 276 complete = true; 277 if (cause instanceof ForeignException) { 278 monitor.receive((ForeignException) cause); 279 } else { 280 monitor.receive(new ForeignException(getMemberName(), cause)); 281 } 282 } 283 284 /** 285 * Callback for the member rpcs to call when the global barrier has been reached. This 286 * unblocks the main subprocedure exectuion thread so that the Subprocedure's 287 * {@link #insideBarrier()} method can be run. 288 */ 289 public void receiveReachedGlobalBarrier() { 290 inGlobalBarrier.countDown(); 291 } 292 293 // 294 // Subprocedure Internal State interface 295 // 296 297 /** 298 * Wait for the reached global barrier notification. 299 * 300 * Package visibility for testing 301 * 302 * @throws ForeignException 303 * @throws InterruptedException 304 */ 305 void waitForReachedGlobalBarrier() throws ForeignException, InterruptedException { 306 Procedure.waitForLatch(inGlobalBarrier, monitor, wakeFrequency, 307 barrierName + ":remote acquired"); 308 } 309 310 /** 311 * Waits until the entire procedure has globally completed, or has been aborted. 312 * @throws ForeignException 313 * @throws InterruptedException 314 */ 315 public void waitForLocallyCompleted() throws ForeignException, InterruptedException { 316 Procedure.waitForLatch(releasedLocalBarrier, monitor, wakeFrequency, 317 barrierName + ":completed"); 318 } 319 320 /** 321 * Empty Subprocedure for testing. 322 * 323 * Must be public for stubbing used in testing to work. 324 */ 325 public static class SubprocedureImpl extends Subprocedure { 326 327 public SubprocedureImpl(ProcedureMember member, String opName, 328 ForeignExceptionDispatcher monitor, long wakeFrequency, long timeout) { 329 super(member, opName, monitor, wakeFrequency, timeout); 330 } 331 332 @Override 333 public void acquireBarrier() throws ForeignException {} 334 335 @Override 336 public byte[] insideBarrier() throws ForeignException { 337 return new byte[0]; 338 } 339 340 @Override 341 public void cleanup(Exception e) {} 342 }; 343}