001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.HashMap; 023import java.util.List; 024import java.util.concurrent.Callable; 025import java.util.concurrent.CountDownLatch; 026import java.util.concurrent.TimeUnit; 027 028import org.apache.yetus.audience.InterfaceAudience; 029import org.slf4j.Logger; 030import org.slf4j.LoggerFactory; 031import org.apache.hadoop.hbase.errorhandling.ForeignException; 032import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; 033import org.apache.hadoop.hbase.errorhandling.ForeignExceptionListener; 034import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare; 035import org.apache.hadoop.hbase.errorhandling.TimeoutExceptionInjector; 036 037import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 038 039/** 040 * A globally-barriered distributed procedure. This class encapsulates state and methods for 041 * tracking and managing a distributed procedure, as well as aborting if any member encounters 042 * a problem or if a cancellation is requested. 043 * <p> 044 * All procedures first attempt to reach a barrier point with the {@link #sendGlobalBarrierStart()} 045 * method. The procedure contacts all members and waits for all subprocedures to execute 046 * {@link Subprocedure#acquireBarrier} to acquire its local piece of the global barrier and then 047 * send acquisition info back to the coordinator. If all acquisitions at subprocedures succeed, 048 * the coordinator then will call {@link #sendGlobalBarrierReached()}. This notifies members to 049 * execute the {@link Subprocedure#insideBarrier()} method. The procedure is blocked until all 050 * {@link Subprocedure#insideBarrier} executions complete at the members. When 051 * {@link Subprocedure#insideBarrier} completes at each member, the member sends notification to 052 * the coordinator. Once all members complete, the coordinator calls 053 * {@link #sendGlobalBarrierComplete()}. 054 * <p> 055 * If errors are encountered remotely, they are forwarded to the coordinator, and 056 * {@link Subprocedure#cleanup(Exception)} is called. 057 * <p> 058 * Each Procedure and each Subprocedure enforces a time limit on the execution time. If the time 059 * limit expires before the procedure completes the {@link TimeoutExceptionInjector} will trigger 060 * an {@link ForeignException} to abort the procedure. This is particularly useful for situations 061 * when running a distributed {@link Subprocedure} so participants can avoid blocking for extreme 062 * amounts of time if one of the participants fails or takes a really long time (e.g. GC pause). 063 * <p> 064 * Users should generally not directly create or subclass instances of this. They are created 065 * for them implicitly via {@link ProcedureCoordinator#startProcedure(ForeignExceptionDispatcher, 066 * String, byte[], List)}} 067 */ 068@InterfaceAudience.Private 069public class Procedure implements Callable<Void>, ForeignExceptionListener { 070 private static final Logger LOG = LoggerFactory.getLogger(Procedure.class); 071 072 // 073 // Arguments and naming 074 // 075 076 // Name of the procedure 077 final private String procName; 078 // Arguments for this procedure execution 079 final private byte[] args; 080 081 // 082 // Execution State 083 // 084 /** latch for waiting until all members have acquire in barrier state */ 085 final CountDownLatch acquiredBarrierLatch; 086 /** latch for waiting until all members have executed and released their in barrier state */ 087 final CountDownLatch releasedBarrierLatch; 088 /** latch for waiting until a procedure has completed */ 089 final CountDownLatch completedLatch; 090 /** monitor to check for errors */ 091 private final ForeignExceptionDispatcher monitor; 092 093 // 094 // Execution Timeout Handling. 095 // 096 097 /** frequency to check for errors (ms) */ 098 protected final long wakeFrequency; 099 protected final TimeoutExceptionInjector timeoutInjector; 100 101 // 102 // Members' and Coordinator's state 103 // 104 105 /** lock to prevent nodes from acquiring and then releasing before we can track them */ 106 private final Object joinBarrierLock = new Object(); 107 private final List<String> acquiringMembers; 108 private final List<String> inBarrierMembers; 109 private final HashMap<String, byte[]> dataFromFinishedMembers; 110 private ProcedureCoordinator coord; 111 112 /** 113 * Creates a procedure. (FOR TESTING) 114 * 115 * {@link Procedure} state to be run by a {@link ProcedureCoordinator}. 116 * @param coord coordinator to call back to for general errors (e.g. 117 * {@link ProcedureCoordinator#rpcConnectionFailure(String, IOException)}). 118 * @param monitor error monitor to check for external errors 119 * @param wakeFreq frequency to check for errors while waiting 120 * @param timeout amount of time to allow the procedure to run before cancelling 121 * @param procName name of the procedure instance 122 * @param args argument data associated with the procedure instance 123 * @param expectedMembers names of the expected members 124 */ 125 public Procedure(ProcedureCoordinator coord, ForeignExceptionDispatcher monitor, long wakeFreq, 126 long timeout, String procName, byte[] args, List<String> expectedMembers) { 127 this.coord = coord; 128 this.acquiringMembers = new ArrayList<>(expectedMembers); 129 this.inBarrierMembers = new ArrayList<>(acquiringMembers.size()); 130 this.dataFromFinishedMembers = new HashMap<>(); 131 this.procName = procName; 132 this.args = args; 133 this.monitor = monitor; 134 this.wakeFrequency = wakeFreq; 135 136 int count = expectedMembers.size(); 137 this.acquiredBarrierLatch = new CountDownLatch(count); 138 this.releasedBarrierLatch = new CountDownLatch(count); 139 this.completedLatch = new CountDownLatch(1); 140 this.timeoutInjector = new TimeoutExceptionInjector(monitor, timeout); 141 } 142 143 /** 144 * Create a procedure. 145 * 146 * Users should generally not directly create instances of this. They are created them 147 * implicitly via {@link ProcedureCoordinator#createProcedure(ForeignExceptionDispatcher, 148 * String, byte[], List)}} 149 * 150 * @param coord coordinator to call back to for general errors (e.g. 151 * {@link ProcedureCoordinator#rpcConnectionFailure(String, IOException)}). 152 * @param wakeFreq frequency to check for errors while waiting 153 * @param timeout amount of time to allow the procedure to run before cancelling 154 * @param procName name of the procedure instance 155 * @param args argument data associated with the procedure instance 156 * @param expectedMembers names of the expected members 157 */ 158 public Procedure(ProcedureCoordinator coord, long wakeFreq, long timeout, 159 String procName, byte[] args, List<String> expectedMembers) { 160 this(coord, new ForeignExceptionDispatcher(), wakeFreq, timeout, procName, args, 161 expectedMembers); 162 } 163 164 public String getName() { 165 return procName; 166 } 167 168 /** 169 * @return String of the procedure members both trying to enter the barrier and already in barrier 170 */ 171 public String getStatus() { 172 String waiting, done; 173 synchronized (joinBarrierLock) { 174 waiting = acquiringMembers.toString(); 175 done = inBarrierMembers.toString(); 176 } 177 return "Procedure " + procName + " { waiting=" + waiting + " done="+ done + " }"; 178 } 179 180 /** 181 * Get the ForeignExceptionDispatcher 182 * @return the Procedure's monitor. 183 */ 184 public ForeignExceptionDispatcher getErrorMonitor() { 185 return monitor; 186 } 187 188 /** 189 * This call is the main execution thread of the barriered procedure. It sends messages and 190 * essentially blocks until all procedure members acquire or later complete but periodically 191 * checks for foreign exceptions. 192 */ 193 @Override 194 @SuppressWarnings("finally") 195 final public Void call() { 196 LOG.info("Starting procedure '" + procName + "'"); 197 // start the timer 198 timeoutInjector.start(); 199 200 // run the procedure 201 try { 202 // start by checking for error first 203 monitor.rethrowException(); 204 LOG.debug("Procedure '" + procName + "' starting 'acquire'"); 205 sendGlobalBarrierStart(); 206 207 // wait for all the members to report acquisition 208 LOG.debug("Waiting for all members to 'acquire'"); 209 waitForLatch(acquiredBarrierLatch, monitor, wakeFrequency, "acquired"); 210 monitor.rethrowException(); 211 212 LOG.debug("Procedure '" + procName + "' starting 'in-barrier' execution."); 213 sendGlobalBarrierReached(); 214 215 // wait for all members to report barrier release 216 LOG.debug("Waiting for all members to 'release'"); 217 waitForLatch(releasedBarrierLatch, monitor, wakeFrequency, "released"); 218 219 // make sure we didn't get an error during in barrier execution and release 220 monitor.rethrowException(); 221 LOG.info("Procedure '" + procName + "' execution completed"); 222 } catch (Exception e) { 223 if (e instanceof InterruptedException) { 224 Thread.currentThread().interrupt(); 225 } 226 String msg = "Procedure '" + procName +"' execution failed!"; 227 LOG.error(msg, e); 228 receive(new ForeignException(getName(), e)); 229 } finally { 230 LOG.debug("Running finish phase."); 231 sendGlobalBarrierComplete(); 232 completedLatch.countDown(); 233 234 // tell the timer we are done, if we get here successfully 235 timeoutInjector.complete(); 236 return null; 237 } 238 } 239 240 /** 241 * Sends a message to Members to create a new {@link Subprocedure} for this Procedure and execute 242 * the {@link Subprocedure#acquireBarrier} step. 243 * @throws ForeignException 244 */ 245 public void sendGlobalBarrierStart() throws ForeignException { 246 // start the procedure 247 LOG.debug("Starting procedure '" + procName + "', kicking off acquire phase on members."); 248 try { 249 // send procedure barrier start to specified list of members. cloning the list to avoid 250 // concurrent modification from the controller setting the prepared nodes 251 coord.getRpcs().sendGlobalBarrierAcquire(this, args, Lists.newArrayList(this.acquiringMembers)); 252 } catch (IOException e) { 253 coord.rpcConnectionFailure("Can't reach controller.", e); 254 } catch (IllegalArgumentException e) { 255 throw new ForeignException(getName(), e); 256 } 257 } 258 259 /** 260 * Sends a message to all members that the global barrier condition has been satisfied. This 261 * should only be executed after all members have completed its 262 * {@link Subprocedure#acquireBarrier()} call successfully. This triggers the member 263 * {@link Subprocedure#insideBarrier} method. 264 * @throws ForeignException 265 */ 266 public void sendGlobalBarrierReached() throws ForeignException { 267 try { 268 // trigger to have member run {@link Subprocedure#insideBarrier} 269 coord.getRpcs().sendGlobalBarrierReached(this, Lists.newArrayList(inBarrierMembers)); 270 } catch (IOException e) { 271 coord.rpcConnectionFailure("Can't reach controller.", e); 272 } 273 } 274 275 /** 276 * Sends a message to members that all {@link Subprocedure#insideBarrier} calls have completed. 277 * After this executes, the coordinator can assume that any state resources about this barrier 278 * procedure state has been released. 279 */ 280 public void sendGlobalBarrierComplete() { 281 LOG.debug("Finished coordinator procedure - removing self from list of running procedures"); 282 try { 283 coord.getRpcs().resetMembers(this); 284 } catch (IOException e) { 285 coord.rpcConnectionFailure("Failed to reset procedure:" + procName, e); 286 } 287 } 288 289 // 290 // Call backs from other external processes. 291 // 292 293 /** 294 * Call back triggered by an individual member upon successful local barrier acquisition 295 * @param member 296 */ 297 public void barrierAcquiredByMember(String member) { 298 LOG.debug("member: '" + member + "' joining acquired barrier for procedure '" + procName 299 + "' on coordinator"); 300 if (this.acquiringMembers.contains(member)) { 301 synchronized (joinBarrierLock) { 302 if (this.acquiringMembers.remove(member)) { 303 this.inBarrierMembers.add(member); 304 acquiredBarrierLatch.countDown(); 305 } 306 } 307 LOG.debug("Waiting on: " + acquiredBarrierLatch + " remaining members to acquire global barrier"); 308 } else { 309 LOG.warn("Member " + member + " joined barrier, but we weren't waiting on it to join." + 310 " Continuing on."); 311 } 312 } 313 314 /** 315 * Call back triggered by a individual member upon successful local in-barrier execution and 316 * release 317 * @param member 318 * @param dataFromMember 319 */ 320 public void barrierReleasedByMember(String member, byte[] dataFromMember) { 321 boolean removed = false; 322 synchronized (joinBarrierLock) { 323 removed = this.inBarrierMembers.remove(member); 324 if (removed) { 325 releasedBarrierLatch.countDown(); 326 } 327 } 328 if (removed) { 329 LOG.debug("Member: '" + member + "' released barrier for procedure'" + procName 330 + "', counting down latch. Waiting for " + releasedBarrierLatch.getCount() 331 + " more"); 332 } else { 333 LOG.warn("Member: '" + member + "' released barrier for procedure'" + procName 334 + "', but we weren't waiting on it to release!"); 335 } 336 dataFromFinishedMembers.put(member, dataFromMember); 337 } 338 339 /** 340 * Waits until the entire procedure has globally completed, or has been aborted. If an 341 * exception is thrown the procedure may or not have run cleanup to trigger the completion latch 342 * yet. 343 * @throws ForeignException 344 * @throws InterruptedException 345 */ 346 public void waitForCompleted() throws ForeignException, InterruptedException { 347 waitForLatch(completedLatch, monitor, wakeFrequency, procName + " completed"); 348 } 349 350 /** 351 * Waits until the entire procedure has globally completed, or has been aborted. If an 352 * exception is thrown the procedure may or not have run cleanup to trigger the completion latch 353 * yet. 354 * @return data returned from procedure members upon successfully completing subprocedure. 355 * @throws ForeignException 356 * @throws InterruptedException 357 */ 358 public HashMap<String, byte[]> waitForCompletedWithRet() throws ForeignException, InterruptedException { 359 waitForCompleted(); 360 return dataFromFinishedMembers; 361 } 362 363 /** 364 * Check if the entire procedure has globally completed, or has been aborted. 365 * @throws ForeignException 366 */ 367 public boolean isCompleted() throws ForeignException { 368 // Rethrow exception if any 369 monitor.rethrowException(); 370 return (completedLatch.getCount() == 0); 371 } 372 373 /** 374 * A callback that handles incoming ForeignExceptions. 375 */ 376 @Override 377 public void receive(ForeignException e) { 378 monitor.receive(e); 379 } 380 381 /** 382 * Wait for latch to count to zero, ignoring any spurious wake-ups, but waking periodically to 383 * check for errors 384 * @param latch latch to wait on 385 * @param monitor monitor to check for errors while waiting 386 * @param wakeFrequency frequency to wake up and check for errors (in 387 * {@link TimeUnit#MILLISECONDS}) 388 * @param latchDescription description of the latch, for logging 389 * @throws ForeignException type of error the monitor can throw, if the task fails 390 * @throws InterruptedException if we are interrupted while waiting on latch 391 */ 392 public static void waitForLatch(CountDownLatch latch, ForeignExceptionSnare monitor, 393 long wakeFrequency, String latchDescription) throws ForeignException, 394 InterruptedException { 395 boolean released = false; 396 while (!released) { 397 if (monitor != null) { 398 monitor.rethrowException(); 399 } 400 /* 401 ForeignExceptionDispatcher.LOG.debug("Waiting for '" + latchDescription + "' latch. (sleep:" 402 + wakeFrequency + " ms)"); */ 403 released = latch.await(wakeFrequency, TimeUnit.MILLISECONDS); 404 } 405 // check error again in case an error raised during last wait 406 if (monitor != null) { 407 monitor.rethrowException(); 408 } 409 } 410}