001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.HashMap; 023import java.util.List; 024import java.util.concurrent.Callable; 025import java.util.concurrent.CountDownLatch; 026import java.util.concurrent.TimeUnit; 027import org.apache.hadoop.hbase.errorhandling.ForeignException; 028import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; 029import org.apache.hadoop.hbase.errorhandling.ForeignExceptionListener; 030import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare; 031import org.apache.hadoop.hbase.errorhandling.TimeoutExceptionInjector; 032import org.apache.yetus.audience.InterfaceAudience; 033import org.slf4j.Logger; 034import org.slf4j.LoggerFactory; 035 036import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 037 038/** 039 * A globally-barriered distributed procedure. This class encapsulates state and methods for 040 * tracking and managing a distributed procedure, as well as aborting if any member encounters a 041 * problem or if a cancellation is requested. 042 * <p> 043 * All procedures first attempt to reach a barrier point with the {@link #sendGlobalBarrierStart()} 044 * method. The procedure contacts all members and waits for all subprocedures to execute 045 * {@link Subprocedure#acquireBarrier} to acquire its local piece of the global barrier and then 046 * send acquisition info back to the coordinator. If all acquisitions at subprocedures succeed, the 047 * coordinator then will call {@link #sendGlobalBarrierReached()}. This notifies members to execute 048 * the {@link Subprocedure#insideBarrier()} method. The procedure is blocked until all 049 * {@link Subprocedure#insideBarrier} executions complete at the members. When 050 * {@link Subprocedure#insideBarrier} completes at each member, the member sends notification to the 051 * coordinator. Once all members complete, the coordinator calls 052 * {@link #sendGlobalBarrierComplete()}. 053 * <p> 054 * If errors are encountered remotely, they are forwarded to the coordinator, and 055 * {@link Subprocedure#cleanup(Exception)} is called. 056 * <p> 057 * Each Procedure and each Subprocedure enforces a time limit on the execution time. If the time 058 * limit expires before the procedure completes the {@link TimeoutExceptionInjector} will trigger an 059 * {@link ForeignException} to abort the procedure. This is particularly useful for situations when 060 * running a distributed {@link Subprocedure} so participants can avoid blocking for extreme amounts 061 * of time if one of the participants fails or takes a really long time (e.g. GC pause). 062 * <p> 063 * Users should generally not directly create or subclass instances of this. They are created for 064 * them implicitly via 065 * {@link ProcedureCoordinator#startProcedure(ForeignExceptionDispatcher, String, byte[], List)}} 066 */ 067@InterfaceAudience.Private 068public class Procedure implements Callable<Void>, ForeignExceptionListener { 069 private static final Logger LOG = LoggerFactory.getLogger(Procedure.class); 070 071 // 072 // Arguments and naming 073 // 074 075 // Name of the procedure 076 final private String procName; 077 // Arguments for this procedure execution 078 final private byte[] args; 079 080 // 081 // Execution State 082 // 083 /** latch for waiting until all members have acquire in barrier state */ 084 final CountDownLatch acquiredBarrierLatch; 085 /** latch for waiting until all members have executed and released their in barrier state */ 086 final CountDownLatch releasedBarrierLatch; 087 /** latch for waiting until a procedure has completed */ 088 final CountDownLatch completedLatch; 089 /** monitor to check for errors */ 090 private final ForeignExceptionDispatcher monitor; 091 092 // 093 // Execution Timeout Handling. 094 // 095 096 /** frequency to check for errors (ms) */ 097 protected final long wakeFrequency; 098 protected final TimeoutExceptionInjector timeoutInjector; 099 100 // 101 // Members' and Coordinator's state 102 // 103 104 /** lock to prevent nodes from acquiring and then releasing before we can track them */ 105 private final Object joinBarrierLock = new Object(); 106 private final List<String> acquiringMembers; 107 private final List<String> inBarrierMembers; 108 private final HashMap<String, byte[]> dataFromFinishedMembers; 109 private ProcedureCoordinator coord; 110 111 /** 112 * Creates a procedure. (FOR TESTING) {@link Procedure} state to be run by a 113 * {@link ProcedureCoordinator}. 114 * @param coord coordinator to call back to for general errors (e.g. 115 * {@link ProcedureCoordinator#rpcConnectionFailure(String, IOException)}). 116 * @param monitor error monitor to check for external errors 117 * @param wakeFreq frequency to check for errors while waiting 118 * @param timeout amount of time to allow the procedure to run before cancelling 119 * @param procName name of the procedure instance 120 * @param args argument data associated with the procedure instance 121 * @param expectedMembers names of the expected members 122 */ 123 public Procedure(ProcedureCoordinator coord, ForeignExceptionDispatcher monitor, long wakeFreq, 124 long timeout, String procName, byte[] args, List<String> expectedMembers) { 125 this.coord = coord; 126 this.acquiringMembers = new ArrayList<>(expectedMembers); 127 this.inBarrierMembers = new ArrayList<>(acquiringMembers.size()); 128 this.dataFromFinishedMembers = new HashMap<>(); 129 this.procName = procName; 130 this.args = args; 131 this.monitor = monitor; 132 this.wakeFrequency = wakeFreq; 133 134 int count = expectedMembers.size(); 135 this.acquiredBarrierLatch = new CountDownLatch(count); 136 this.releasedBarrierLatch = new CountDownLatch(count); 137 this.completedLatch = new CountDownLatch(1); 138 this.timeoutInjector = new TimeoutExceptionInjector(monitor, timeout); 139 } 140 141 /** 142 * Create a procedure. Users should generally not directly create instances of this. They are 143 * created them implicitly via 144 * {@link ProcedureCoordinator#createProcedure(ForeignExceptionDispatcher, String, byte[], List)}} 145 * @param coord coordinator to call back to for general errors (e.g. 146 * {@link ProcedureCoordinator#rpcConnectionFailure(String, IOException)}). 147 * @param wakeFreq frequency to check for errors while waiting 148 * @param timeout amount of time to allow the procedure to run before cancelling 149 * @param procName name of the procedure instance 150 * @param args argument data associated with the procedure instance 151 * @param expectedMembers names of the expected members 152 */ 153 public Procedure(ProcedureCoordinator coord, long wakeFreq, long timeout, String procName, 154 byte[] args, List<String> expectedMembers) { 155 this(coord, new ForeignExceptionDispatcher(), wakeFreq, timeout, procName, args, 156 expectedMembers); 157 } 158 159 public String getName() { 160 return procName; 161 } 162 163 /** 164 * Returns String of the procedure members both trying to enter the barrier and already in barrier 165 */ 166 public String getStatus() { 167 String waiting, done; 168 synchronized (joinBarrierLock) { 169 waiting = acquiringMembers.toString(); 170 done = inBarrierMembers.toString(); 171 } 172 return "Procedure " + procName + " { waiting=" + waiting + " done=" + done + " }"; 173 } 174 175 /** 176 * Get the ForeignExceptionDispatcher 177 * @return the Procedure's monitor. 178 */ 179 public ForeignExceptionDispatcher getErrorMonitor() { 180 return monitor; 181 } 182 183 /** 184 * This call is the main execution thread of the barriered procedure. It sends messages and 185 * essentially blocks until all procedure members acquire or later complete but periodically 186 * checks for foreign exceptions. 187 */ 188 @Override 189 @SuppressWarnings("finally") 190 final public Void call() { 191 LOG.info("Starting procedure '" + procName + "'"); 192 // start the timer 193 timeoutInjector.start(); 194 195 // run the procedure 196 try { 197 // start by checking for error first 198 monitor.rethrowException(); 199 LOG.debug("Procedure '" + procName + "' starting 'acquire'"); 200 sendGlobalBarrierStart(); 201 202 // wait for all the members to report acquisition 203 LOG.debug("Waiting for all members to 'acquire'"); 204 waitForLatch(acquiredBarrierLatch, monitor, wakeFrequency, "acquired"); 205 monitor.rethrowException(); 206 207 LOG.debug("Procedure '" + procName + "' starting 'in-barrier' execution."); 208 sendGlobalBarrierReached(); 209 210 // wait for all members to report barrier release 211 LOG.debug("Waiting for all members to 'release'"); 212 waitForLatch(releasedBarrierLatch, monitor, wakeFrequency, "released"); 213 214 // make sure we didn't get an error during in barrier execution and release 215 monitor.rethrowException(); 216 LOG.info("Procedure '" + procName + "' execution completed"); 217 } catch (Exception e) { 218 if (e instanceof InterruptedException) { 219 Thread.currentThread().interrupt(); 220 } 221 String msg = "Procedure '" + procName + "' execution failed!"; 222 LOG.error(msg, e); 223 receive(new ForeignException(getName(), e)); 224 } finally { 225 LOG.debug("Running finish phase."); 226 sendGlobalBarrierComplete(); 227 completedLatch.countDown(); 228 229 // tell the timer we are done, if we get here successfully 230 timeoutInjector.complete(); 231 return null; 232 } 233 } 234 235 /** 236 * Sends a message to Members to create a new {@link Subprocedure} for this Procedure and execute 237 * the {@link Subprocedure#acquireBarrier} step. n 238 */ 239 public void sendGlobalBarrierStart() throws ForeignException { 240 // start the procedure 241 LOG.debug("Starting procedure '" + procName + "', kicking off acquire phase on members."); 242 try { 243 // send procedure barrier start to specified list of members. cloning the list to avoid 244 // concurrent modification from the controller setting the prepared nodes 245 coord.getRpcs().sendGlobalBarrierAcquire(this, args, 246 Lists.newArrayList(this.acquiringMembers)); 247 } catch (IOException e) { 248 coord.rpcConnectionFailure("Can't reach controller.", e); 249 } catch (IllegalArgumentException e) { 250 throw new ForeignException(getName(), e); 251 } 252 } 253 254 /** 255 * Sends a message to all members that the global barrier condition has been satisfied. This 256 * should only be executed after all members have completed its 257 * {@link Subprocedure#acquireBarrier()} call successfully. This triggers the member 258 * {@link Subprocedure#insideBarrier} method. n 259 */ 260 public void sendGlobalBarrierReached() throws ForeignException { 261 try { 262 // trigger to have member run {@link Subprocedure#insideBarrier} 263 coord.getRpcs().sendGlobalBarrierReached(this, Lists.newArrayList(inBarrierMembers)); 264 } catch (IOException e) { 265 coord.rpcConnectionFailure("Can't reach controller.", e); 266 } 267 } 268 269 /** 270 * Sends a message to members that all {@link Subprocedure#insideBarrier} calls have completed. 271 * After this executes, the coordinator can assume that any state resources about this barrier 272 * procedure state has been released. 273 */ 274 public void sendGlobalBarrierComplete() { 275 LOG.debug("Finished coordinator procedure - removing self from list of running procedures"); 276 try { 277 coord.getRpcs().resetMembers(this); 278 } catch (IOException e) { 279 coord.rpcConnectionFailure("Failed to reset procedure:" + procName, e); 280 } 281 } 282 283 // 284 // Call backs from other external processes. 285 // 286 287 /** 288 * Call back triggered by an individual member upon successful local barrier acquisition n 289 */ 290 public void barrierAcquiredByMember(String member) { 291 LOG.debug("member: '" + member + "' joining acquired barrier for procedure '" + procName 292 + "' on coordinator"); 293 if (this.acquiringMembers.contains(member)) { 294 synchronized (joinBarrierLock) { 295 if (this.acquiringMembers.remove(member)) { 296 this.inBarrierMembers.add(member); 297 acquiredBarrierLatch.countDown(); 298 } 299 } 300 LOG.debug( 301 "Waiting on: " + acquiredBarrierLatch + " remaining members to acquire global barrier"); 302 } else { 303 LOG.warn("Member " + member + " joined barrier, but we weren't waiting on it to join." 304 + " Continuing on."); 305 } 306 } 307 308 /** 309 * Call back triggered by a individual member upon successful local in-barrier execution and 310 * release nn 311 */ 312 public void barrierReleasedByMember(String member, byte[] dataFromMember) { 313 boolean removed = false; 314 synchronized (joinBarrierLock) { 315 removed = this.inBarrierMembers.remove(member); 316 if (removed) { 317 releasedBarrierLatch.countDown(); 318 } 319 } 320 if (removed) { 321 LOG.debug("Member: '" + member + "' released barrier for procedure'" + procName 322 + "', counting down latch. Waiting for " + releasedBarrierLatch.getCount() + " more"); 323 } else { 324 LOG.warn("Member: '" + member + "' released barrier for procedure'" + procName 325 + "', but we weren't waiting on it to release!"); 326 } 327 dataFromFinishedMembers.put(member, dataFromMember); 328 } 329 330 /** 331 * Waits until the entire procedure has globally completed, or has been aborted. If an exception 332 * is thrown the procedure may or not have run cleanup to trigger the completion latch yet. nn 333 */ 334 public void waitForCompleted() throws ForeignException, InterruptedException { 335 waitForLatch(completedLatch, monitor, wakeFrequency, procName + " completed"); 336 } 337 338 /** 339 * Waits until the entire procedure has globally completed, or has been aborted. If an exception 340 * is thrown the procedure may or not have run cleanup to trigger the completion latch yet. 341 * @return data returned from procedure members upon successfully completing subprocedure. nn 342 */ 343 public HashMap<String, byte[]> waitForCompletedWithRet() 344 throws ForeignException, InterruptedException { 345 waitForCompleted(); 346 return dataFromFinishedMembers; 347 } 348 349 /** 350 * Check if the entire procedure has globally completed, or has been aborted. n 351 */ 352 public boolean isCompleted() throws ForeignException { 353 // Rethrow exception if any 354 monitor.rethrowException(); 355 return (completedLatch.getCount() == 0); 356 } 357 358 /** 359 * A callback that handles incoming ForeignExceptions. 360 */ 361 @Override 362 public void receive(ForeignException e) { 363 monitor.receive(e); 364 } 365 366 /** 367 * Wait for latch to count to zero, ignoring any spurious wake-ups, but waking periodically to 368 * check for errors 369 * @param latch latch to wait on 370 * @param monitor monitor to check for errors while waiting 371 * @param wakeFrequency frequency to wake up and check for errors (in 372 * {@link TimeUnit#MILLISECONDS}) 373 * @param latchDescription description of the latch, for logging 374 * @throws ForeignException type of error the monitor can throw, if the task fails 375 * @throws InterruptedException if we are interrupted while waiting on latch 376 */ 377 public static void waitForLatch(CountDownLatch latch, ForeignExceptionSnare monitor, 378 long wakeFrequency, String latchDescription) throws ForeignException, InterruptedException { 379 boolean released = false; 380 while (!released) { 381 if (monitor != null) { 382 monitor.rethrowException(); 383 } 384 /* 385 * ForeignExceptionDispatcher.LOG.debug("Waiting for '" + latchDescription + 386 * "' latch. (sleep:" + wakeFrequency + " ms)"); 387 */ 388 released = latch.await(wakeFrequency, TimeUnit.MILLISECONDS); 389 } 390 // check error again in case an error raised during last wait 391 if (monitor != null) { 392 monitor.rethrowException(); 393 } 394 } 395}