001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.master.assignment; 020 021import java.io.IOException; 022import java.util.concurrent.atomic.AtomicBoolean; 023import org.apache.hadoop.hbase.ServerName; 024import org.apache.hadoop.hbase.TableName; 025import org.apache.hadoop.hbase.client.RegionInfo; 026import org.apache.hadoop.hbase.exceptions.UnexpectedStateException; 027import org.apache.hadoop.hbase.master.assignment.RegionStates.RegionStateNode; 028import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; 029import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface; 030import org.apache.hadoop.hbase.procedure2.FailedRemoteDispatchException; 031import org.apache.hadoop.hbase.procedure2.Procedure; 032import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; 033import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; 034import org.apache.hadoop.hbase.procedure2.ProcedureUtil; 035import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation; 036import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure; 037import org.apache.hadoop.hbase.procedure2.RemoteProcedureException; 038import org.apache.yetus.audience.InterfaceAudience; 039import org.slf4j.Logger; 040import org.slf4j.LoggerFactory; 041 042import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 043 044import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RegionTransitionState; 045import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; 046import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; 047 048/** 049 * Base class for the Assign and Unassign Procedure. 050 * 051 * Locking: 052 * Takes exclusive lock on the region being assigned/unassigned. Thus, there can only be one 053 * RegionTransitionProcedure per region running at a time (see MasterProcedureScheduler). 054 * 055 * <p>This procedure is asynchronous and responds to external events. 056 * The AssignmentManager will notify this procedure when the RS completes 057 * the operation and reports the transitioned state 058 * (see the Assign and Unassign class for more detail).</p> 059 * 060 * <p>Procedures move from the REGION_TRANSITION_QUEUE state when they are 061 * first submitted, to the REGION_TRANSITION_DISPATCH state when the request 062 * to remote server is sent and the Procedure is suspended waiting on external 063 * event to be woken again. Once the external event is triggered, Procedure 064 * moves to the REGION_TRANSITION_FINISH state.</p> 065 * 066 * <p>NOTE: {@link AssignProcedure} and {@link UnassignProcedure} should not be thought of 067 * as being asymmetric, at least currently. 068 * <ul> 069 * <li>{@link AssignProcedure} moves through all the above described states and implements methods 070 * associated with each while {@link UnassignProcedure} starts at state 071 * REGION_TRANSITION_DISPATCH and state REGION_TRANSITION_QUEUE is not supported.</li> 072 * 073 * <li>When any step in {@link AssignProcedure} fails, failure handler 074 * AssignProcedure#handleFailure(MasterProcedureEnv, RegionStateNode) re-attempts the 075 * assignment by setting the procedure state to REGION_TRANSITION_QUEUE and forces 076 * assignment to a different target server by setting {@link AssignProcedure#forceNewPlan}. When 077 * the number of attempts reaches threshold configuration 'hbase.assignment.maximum.attempts', 078 * the procedure is aborted. For {@link UnassignProcedure}, similar re-attempts are 079 * intentionally not implemented. It is a 'one shot' procedure. See its class doc for how it 080 * handles failure. 081 * </li> 082 * <li>If we find a region in an 'unexpected' state, we'll complain and retry with backoff forever. 083 * The 'unexpected' state needs to be fixed either by another running Procedure or by operator 084 * intervention (Regions in 'unexpected' state indicates bug or unexpected transition type). 085 * For this to work, subclasses need to persist the 'attempt' counter kept in this class when 086 * they do serializeStateData and restore it inside their deserializeStateData, just as they do 087 * for {@link #regionInfo}. 088 * </li> 089 * </ul> 090 * </p> 091 * 092 * <p>TODO: Considering it is a priority doing all we can to get make a region available as soon as 093 * possible, re-attempting with any target makes sense if specified target fails in case of 094 * {@link AssignProcedure}. For {@link UnassignProcedure}, our concern is preventing data loss 095 * on failed unassign. See class doc for explanation. 096 */ 097@InterfaceAudience.Private 098public abstract class RegionTransitionProcedure 099 extends Procedure<MasterProcedureEnv> 100 implements TableProcedureInterface, 101 RemoteProcedure<MasterProcedureEnv, ServerName> { 102 private static final Logger LOG = LoggerFactory.getLogger(RegionTransitionProcedure.class); 103 104 protected final AtomicBoolean aborted = new AtomicBoolean(false); 105 106 private RegionTransitionState transitionState = RegionTransitionState.REGION_TRANSITION_QUEUE; 107 /** 108 * This data member must be persisted. Expectation is that it is done by subclasses in their 109 * {@link #serializeStateData(ProcedureStateSerializer)} call, restoring {@link #regionInfo} 110 * in their {@link #deserializeStateData(ProcedureStateSerializer)} method. 111 */ 112 private RegionInfo regionInfo; 113 114 /** 115 * this data member must also be persisted. 116 * @see #regionInfo 117 */ 118 private boolean override; 119 120 /** 121 * Like {@link #regionInfo}, the expectation is that subclasses persist the value of this 122 * data member. It is used doing backoff when Procedure gets stuck. 123 */ 124 private int attempt; 125 126 // Required by the Procedure framework to create the procedure on replay 127 public RegionTransitionProcedure() {} 128 129 public RegionTransitionProcedure(final RegionInfo regionInfo, boolean override) { 130 this.regionInfo = regionInfo; 131 this.override = override; 132 } 133 134 @VisibleForTesting 135 public RegionInfo getRegionInfo() { 136 return regionInfo; 137 } 138 139 /** 140 * This setter is for subclasses to call in their 141 * {@link #deserializeStateData(ProcedureStateSerializer)} method. Expectation is that 142 * subclasses will persist `regioninfo` in their 143 * {@link #serializeStateData(ProcedureStateSerializer)} method and then restore `regionInfo` on 144 * deserialization by calling this. 145 */ 146 protected void setRegionInfo(final RegionInfo regionInfo) { 147 this.regionInfo = regionInfo; 148 } 149 150 /** 151 * This setter is for subclasses to call in their 152 * {@link #deserializeStateData(ProcedureStateSerializer)} method. Expectation is that 153 * subclasses will persist `override` in their 154 * {@link #serializeStateData(ProcedureStateSerializer)} method and then restore `override` on 155 * deserialization by calling this. 156 */ 157 protected void setOverride(boolean override) { 158 this.override = override; 159 } 160 161 162 /** 163 * This setter is for subclasses to call in their 164 * {@link #deserializeStateData(ProcedureStateSerializer)} method. 165 * @see #setRegionInfo(RegionInfo) 166 */ 167 protected void setAttempt(int attempt) { 168 this.attempt = attempt; 169 } 170 171 protected int getAttempt() { 172 return this.attempt; 173 } 174 175 @Override 176 public TableName getTableName() { 177 RegionInfo hri = getRegionInfo(); 178 return hri != null? hri.getTable(): null; 179 } 180 181 public boolean isMeta() { 182 return TableName.isMetaTableName(getTableName()); 183 } 184 185 @Override 186 public void toStringClassDetails(final StringBuilder sb) { 187 sb.append(getClass().getSimpleName()); 188 sb.append(" table="); 189 sb.append(getTableName()); 190 sb.append(", region="); 191 sb.append(getRegionInfo() == null? null: getRegionInfo().getEncodedName()); 192 if (isOverride()) { 193 // Only log if set. 194 sb.append(", override="); 195 sb.append(isOverride()); 196 } 197 } 198 199 public RegionStateNode getRegionState(final MasterProcedureEnv env) { 200 return env.getAssignmentManager().getRegionStates().getOrCreateRegionStateNode(getRegionInfo()); 201 } 202 203 void setTransitionState(final RegionTransitionState state) { 204 this.transitionState = state; 205 } 206 207 protected RegionTransitionState getTransitionState() { 208 return transitionState; 209 } 210 211 protected abstract boolean startTransition(MasterProcedureEnv env, RegionStateNode regionNode) 212 throws IOException, ProcedureSuspendedException; 213 214 /** 215 * Called when the Procedure is in the REGION_TRANSITION_DISPATCH state. 216 * In here we do the RPC call to OPEN/CLOSE the region. The suspending of 217 * the thread so it sleeps until it gets update that the OPEN/CLOSE has 218 * succeeded is complicated. Read the implementations to learn more. 219 */ 220 protected abstract boolean updateTransition(MasterProcedureEnv env, RegionStateNode regionNode) 221 throws IOException, ProcedureSuspendedException; 222 223 protected abstract void finishTransition(MasterProcedureEnv env, RegionStateNode regionNode) 224 throws IOException, ProcedureSuspendedException; 225 226 protected abstract void reportTransition(MasterProcedureEnv env, 227 RegionStateNode regionNode, TransitionCode code, long seqId) throws UnexpectedStateException; 228 229 @Override 230 public abstract RemoteOperation remoteCallBuild(MasterProcedureEnv env, ServerName serverName); 231 232 /** 233 * @return True if processing of fail is complete; the procedure will be woken from its suspend 234 * and we'll go back to running through procedure steps: 235 * otherwise if false we leave the procedure in suspended state. 236 */ 237 protected abstract boolean remoteCallFailed(MasterProcedureEnv env, 238 RegionStateNode regionNode, IOException exception); 239 240 @Override 241 public synchronized boolean remoteCallFailed(final MasterProcedureEnv env, 242 final ServerName serverName, final IOException exception) { 243 final RegionStateNode regionNode = getRegionState(env); 244 LOG.warn("Remote call failed {} {}", this, regionNode.toShortString(), exception); 245 if (remoteCallFailed(env, regionNode, exception)) { 246 // NOTE: This call to wakeEvent puts this Procedure back on the scheduler. 247 // Thereafter, another Worker can be in here so DO NOT MESS WITH STATE beyond 248 // this method. Just get out of this current processing quickly. 249 regionNode.getProcedureEvent().wake(env.getProcedureScheduler()); 250 return true; 251 } 252 // else leave the procedure in suspended state; it is waiting on another call to this callback 253 return false; 254 } 255 256 /** 257 * Be careful! At the end of this method, the procedure has either succeeded 258 * and this procedure has been set into a suspended state OR, we failed and 259 * this procedure has been put back on the scheduler ready for another worker 260 * to pick it up. In both cases, we need to exit the current Worker processing 261 * immediately! 262 * @return True if we successfully dispatched the call and false if we failed; 263 * if failed, we need to roll back any setup done for the dispatch. 264 */ 265 protected boolean addToRemoteDispatcher(final MasterProcedureEnv env, 266 final ServerName targetServer) { 267 LOG.info("Dispatch {}", this); 268 269 // Put this procedure into suspended mode to wait on report of state change 270 // from remote regionserver. Means Procedure associated ProcedureEvent is marked not 'ready'. 271 getRegionState(env).getProcedureEvent().suspend(); 272 273 // Tricky because the below call to addOperationToNode can fail. If it fails, we need to 274 // backtrack on stuff like the 'suspend' done above -- tricky as the 'wake' requests us -- and 275 // ditto up in the caller; it needs to undo state changes. Inside in remoteCallFailed, it does 276 // wake to undo the above suspend. 277 try { 278 env.getRemoteDispatcher().addOperationToNode(targetServer, this); 279 } catch (FailedRemoteDispatchException frde) { 280 remoteCallFailed(env, targetServer, frde); 281 return false; 282 } 283 return true; 284 } 285 286 @Override 287 public boolean storeInDispatchedQueue() { 288 return false; 289 } 290 291 protected void reportTransition(final MasterProcedureEnv env, final ServerName serverName, 292 final TransitionCode code, final long seqId) throws UnexpectedStateException { 293 final RegionStateNode regionNode = getRegionState(env); 294 if (LOG.isDebugEnabled()) { 295 LOG.debug("Received report " + code + " seqId=" + seqId + ", " + 296 this + "; " + regionNode.toShortString()); 297 } 298 if (!serverName.equals(regionNode.getRegionLocation())) { 299 if (isMeta() && regionNode.getRegionLocation() == null) { 300 regionNode.setRegionLocation(serverName); 301 } else { 302 throw new UnexpectedStateException(String.format( 303 "Unexpected state=%s from server=%s; expected server=%s; %s; %s", 304 code, serverName, regionNode.getRegionLocation(), 305 this, regionNode.toShortString())); 306 } 307 } 308 309 reportTransition(env, regionNode, code, seqId); 310 311 // NOTE: This call adds this procedure back on the scheduler. 312 // This makes it so this procedure can run again. Another worker will take 313 // processing to the next stage. At an extreme, the other worker may run in 314 // parallel so DO NOT CHANGE any state hereafter! This should be last thing 315 // done in this processing step. 316 regionNode.getProcedureEvent().wake(env.getProcedureScheduler()); 317 } 318 319 protected boolean isServerOnline(final MasterProcedureEnv env, final RegionStateNode regionNode) { 320 return isServerOnline(env, regionNode.getRegionLocation()); 321 } 322 323 protected boolean isServerOnline(final MasterProcedureEnv env, final ServerName serverName) { 324 return env.getMasterServices().getServerManager().isServerOnline(serverName); 325 } 326 327 @Override 328 protected void toStringState(StringBuilder builder) { 329 super.toStringState(builder); 330 RegionTransitionState ts = this.transitionState; 331 if (!isFinished() && ts != null) { 332 builder.append(":").append(ts); 333 } 334 } 335 336 @Override 337 protected Procedure[] execute(final MasterProcedureEnv env) throws ProcedureSuspendedException { 338 final AssignmentManager am = env.getAssignmentManager(); 339 final RegionStateNode regionNode = getRegionState(env); 340 if (!am.addRegionInTransition(regionNode, this)) { 341 if (this.isOverride()) { 342 LOG.info("{} owned by pid={}, OVERRIDDEN by 'this' (pid={}, override=true).", 343 regionNode.getRegionInfo().getEncodedName(), 344 regionNode.getProcedure().getProcId(), getProcId()); 345 regionNode.unsetProcedure(regionNode.getProcedure()); 346 } else { 347 String msg = String.format("%s owned by pid=%d, CANNOT run 'this' (pid=%d).", 348 regionNode.getRegionInfo().getEncodedName(), 349 regionNode.getProcedure().getProcId(), getProcId()); 350 LOG.warn(msg); 351 setAbortFailure(getClass().getSimpleName(), msg); 352 return null; 353 } 354 } 355 try { 356 boolean retry; 357 do { 358 retry = false; 359 switch (transitionState) { 360 case REGION_TRANSITION_QUEUE: 361 // 1. push into the AM queue for balancer policy 362 if (!startTransition(env, regionNode)) { 363 // The operation figured it is done or it aborted; check getException() 364 am.removeRegionInTransition(getRegionState(env), this); 365 return null; 366 } 367 transitionState = RegionTransitionState.REGION_TRANSITION_DISPATCH; 368 if (regionNode.getProcedureEvent().suspendIfNotReady(this)) { 369 // Why this suspend? Because we want to ensure Store happens before proceed? 370 throw new ProcedureSuspendedException(); 371 } 372 break; 373 374 case REGION_TRANSITION_DISPATCH: 375 // 2. send the request to the target server 376 if (!updateTransition(env, regionNode)) { 377 // The operation figured it is done or it aborted; check getException() 378 am.removeRegionInTransition(regionNode, this); 379 return null; 380 } 381 if (transitionState != RegionTransitionState.REGION_TRANSITION_DISPATCH) { 382 retry = true; 383 break; 384 } 385 if (regionNode.getProcedureEvent().suspendIfNotReady(this)) { 386 throw new ProcedureSuspendedException(); 387 } 388 break; 389 390 case REGION_TRANSITION_FINISH: 391 // 3. wait assignment response. completion/failure 392 LOG.debug("Finishing {}; {}", this, regionNode.toShortString()); 393 finishTransition(env, regionNode); 394 am.removeRegionInTransition(regionNode, this); 395 return null; 396 } 397 } while (retry); 398 // If here, success so clear out the attempt counter so we start fresh each time we get stuck. 399 this.attempt = 0; 400 } catch (IOException e) { 401 long backoff = ProcedureUtil.getBackoffTimeMs(this.attempt++); 402 LOG.warn("Failed transition, suspend {}secs {}; {}; waiting on rectified condition fixed " + 403 "by other Procedure or operator intervention", backoff / 1000, this, 404 regionNode.toShortString(), e); 405 setTimeout(Math.toIntExact(backoff)); 406 setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT); 407 throw new ProcedureSuspendedException(); 408 } 409 410 return new Procedure[] {this}; 411 } 412 413 /** 414 * At end of timeout, wake ourselves up so we run again. 415 */ 416 @Override 417 protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) { 418 setState(ProcedureProtos.ProcedureState.RUNNABLE); 419 env.getProcedureScheduler().addFront(this); 420 return false; // 'false' means that this procedure handled the timeout 421 } 422 423 @Override 424 protected void rollback(final MasterProcedureEnv env) { 425 if (isRollbackSupported(transitionState)) { 426 // Nothing done up to this point. abort safely. 427 // This should happen when something like disableTable() is triggered. 428 env.getAssignmentManager().removeRegionInTransition(getRegionState(env), this); 429 return; 430 } 431 432 // There is no rollback for assignment unless we cancel the operation by 433 // dropping/disabling the table. 434 LOG.warn("Unhandled state {}; no rollback for assignment! Doing NOTHING!" + 435 " May need manual intervention; {}", 436 transitionState, this); 437 } 438 439 protected abstract boolean isRollbackSupported(final RegionTransitionState state); 440 441 @Override 442 protected boolean abort(final MasterProcedureEnv env) { 443 if (isRollbackSupported(transitionState)) { 444 aborted.set(true); 445 return true; 446 } 447 return false; 448 } 449 450 @Override 451 protected boolean waitInitialized(MasterProcedureEnv env) { 452 // Unless we are assigning meta, wait for meta to be available and loaded. 453 if (isMeta()) { 454 return false; 455 } 456 AssignmentManager am = env.getAssignmentManager(); 457 return am.waitMetaLoaded(this) || am.waitMetaAssigned(this, regionInfo); 458 } 459 460 @Override 461 protected LockState acquireLock(final MasterProcedureEnv env) { 462 // TODO: Revisit this and move it to the executor 463 if (env.getProcedureScheduler().waitRegion(this, getRegionInfo())) { 464 try { 465 // Enable TRACE on this class to see lock dump. Can be really large when cluster is big 466 // or big tables being enabled/disabled. 467 if (LOG.isTraceEnabled()) { 468 LOG.trace("{} pid={} {}", LockState.LOCK_EVENT_WAIT, getProcId(), 469 env.getProcedureScheduler().dumpLocks()); 470 } 471 } catch (IOException e) { 472 // ignore, just for logging 473 } 474 return LockState.LOCK_EVENT_WAIT; 475 } 476 return LockState.LOCK_ACQUIRED; 477 } 478 479 @Override 480 protected void releaseLock(final MasterProcedureEnv env) { 481 env.getProcedureScheduler().wakeRegion(this, getRegionInfo()); 482 } 483 484 @Override 485 protected boolean holdLock(final MasterProcedureEnv env) { 486 return true; 487 } 488 489 @Override 490 protected boolean shouldWaitClientAck(MasterProcedureEnv env) { 491 // The operation is triggered internally on the server 492 // the client does not know about this procedure. 493 return false; 494 } 495 496 /** 497 * Used by ServerCrashProcedure to see if this Assign/Unassign needs processing. 498 * @return ServerName the Assign or Unassign is going against. 499 */ 500 public abstract ServerName getServer(final MasterProcedureEnv env); 501 502 @Override 503 public void remoteOperationCompleted(MasterProcedureEnv env) { 504 // should not be called for region operation until we modified the open/close region procedure 505 throw new UnsupportedOperationException(); 506 } 507 508 @Override 509 public void remoteOperationFailed(MasterProcedureEnv env, RemoteProcedureException error) { 510 // should not be called for region operation until we modified the open/close region procedure 511 throw new UnsupportedOperationException(); 512 } 513 514 @Override 515 protected void bypass(MasterProcedureEnv env) { 516 // This override is just so I can write a note on how bypass is done in 517 // RTP. For RTP procedures -- i.e. assign/unassign -- if bypass is called, 518 // we intentionally do NOT cleanup our state. We leave a reference to the 519 // bypassed Procedure in the RegionStateNode. Doing this makes it so the 520 // RSN is in an odd state. The bypassed Procedure is finished but no one 521 // else can make progress on this RSN entity (see the #execute above where 522 // we check the RSN to see if an already registered procedure and if so, 523 // we exit without proceeding). This is done to intentionally block 524 // subsequent Procedures from running. Only a Procedure with the 'override' flag 525 // set can overwrite the RSN and make progress. 526 super.bypass(env); 527 } 528 529 boolean isOverride() { 530 return this.override; 531 } 532}