001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019 020package org.apache.hadoop.hbase.master.assignment; 021 022import java.io.IOException; 023import java.util.concurrent.atomic.AtomicBoolean; 024 025import org.apache.hadoop.hbase.ServerName; 026import org.apache.hadoop.hbase.TableName; 027import org.apache.hadoop.hbase.client.RegionInfo; 028import org.apache.hadoop.hbase.exceptions.UnexpectedStateException; 029import org.apache.hadoop.hbase.master.assignment.RegionStates.RegionStateNode; 030import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; 031import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface; 032import org.apache.hadoop.hbase.procedure2.FailedRemoteDispatchException; 033import org.apache.hadoop.hbase.procedure2.Procedure; 034import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; 035import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; 036import org.apache.hadoop.hbase.procedure2.ProcedureUtil; 037import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation; 038import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure; 039import org.apache.yetus.audience.InterfaceAudience; 040import org.slf4j.Logger; 041import org.slf4j.LoggerFactory; 042 043import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 044 045import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RegionTransitionState; 046import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; 047import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; 048 049/** 050 * Base class for the Assign and Unassign Procedure. 051 * 052 * Locking: 053 * Takes exclusive lock on the region being assigned/unassigned. Thus, there can only be one 054 * RegionTransitionProcedure per region running at a time (see MasterProcedureScheduler). 055 * 056 * <p>This procedure is asynchronous and responds to external events. 057 * The AssignmentManager will notify this procedure when the RS completes 058 * the operation and reports the transitioned state 059 * (see the Assign and Unassign class for more detail).</p> 060 * 061 * <p>Procedures move from the REGION_TRANSITION_QUEUE state when they are 062 * first submitted, to the REGION_TRANSITION_DISPATCH state when the request 063 * to remote server is sent and the Procedure is suspended waiting on external 064 * event to be woken again. Once the external event is triggered, Procedure 065 * moves to the REGION_TRANSITION_FINISH state.</p> 066 * 067 * <p>NOTE: {@link AssignProcedure} and {@link UnassignProcedure} should not be thought of 068 * as being asymmetric, at least currently. 069 * <ul> 070 * <li>{@link AssignProcedure} moves through all the above described states and implements methods 071 * associated with each while {@link UnassignProcedure} starts at state 072 * REGION_TRANSITION_DISPATCH and state REGION_TRANSITION_QUEUE is not supported.</li> 073 * 074 * <li>When any step in {@link AssignProcedure} fails, failure handler 075 * AssignProcedure#handleFailure(MasterProcedureEnv, RegionStateNode) re-attempts the 076 * assignment by setting the procedure state to REGION_TRANSITION_QUEUE and forces 077 * assignment to a different target server by setting {@link AssignProcedure#forceNewPlan}. When 078 * the number of attempts reaches threshold configuration 'hbase.assignment.maximum.attempts', 079 * the procedure is aborted. For {@link UnassignProcedure}, similar re-attempts are 080 * intentionally not implemented. It is a 'one shot' procedure. See its class doc for how it 081 * handles failure. 082 * </li> 083 * <li>If we find a region in an 'unexpected' state, we'll complain and retry with backoff forever. 084 * The 'unexpected' state needs to be fixed either by another running Procedure or by operator 085 * intervention (Regions in 'unexpected' state indicates bug or unexpected transition type). 086 * For this to work, subclasses need to persist the 'attempt' counter kept in this class when 087 * they do serializeStateData and restore it inside their deserializeStateData, just as they do 088 * for {@link #regionInfo}. 089 * </li> 090 * </ul> 091 * </p> 092 * 093 * <p>TODO: Considering it is a priority doing all we can to get make a region available as soon as 094 * possible, re-attempting with any target makes sense if specified target fails in case of 095 * {@link AssignProcedure}. For {@link UnassignProcedure}, our concern is preventing data loss 096 * on failed unassign. See class doc for explanation. 097 */ 098@InterfaceAudience.Private 099public abstract class RegionTransitionProcedure 100 extends Procedure<MasterProcedureEnv> 101 implements TableProcedureInterface, 102 RemoteProcedure<MasterProcedureEnv, ServerName> { 103 private static final Logger LOG = LoggerFactory.getLogger(RegionTransitionProcedure.class); 104 105 protected final AtomicBoolean aborted = new AtomicBoolean(false); 106 107 private RegionTransitionState transitionState = RegionTransitionState.REGION_TRANSITION_QUEUE; 108 /** 109 * This data member must be persisted. Expectation is that it is done by subclasses in their 110 * {@link #serializeStateData(ProcedureStateSerializer)} call, restoring {@link #regionInfo} 111 * in their {@link #deserializeStateData(ProcedureStateSerializer)} method. 112 */ 113 private RegionInfo regionInfo; 114 115 /** 116 * this data member must also be persisted. 117 * @see #regionInfo 118 */ 119 private boolean override; 120 121 /** 122 * Like {@link #regionInfo}, the expectation is that subclasses persist the value of this 123 * data member. It is used doing backoff when Procedure gets stuck. 124 */ 125 private int attempt; 126 127 // Required by the Procedure framework to create the procedure on replay 128 public RegionTransitionProcedure() {} 129 130 public RegionTransitionProcedure(final RegionInfo regionInfo, boolean override) { 131 this.regionInfo = regionInfo; 132 this.override = override; 133 } 134 135 @VisibleForTesting 136 public RegionInfo getRegionInfo() { 137 return regionInfo; 138 } 139 140 /** 141 * This setter is for subclasses to call in their 142 * {@link #deserializeStateData(ProcedureStateSerializer)} method. Expectation is that 143 * subclasses will persist `regioninfo` in their 144 * {@link #serializeStateData(ProcedureStateSerializer)} method and then restore `regionInfo` on 145 * deserialization by calling this. 146 */ 147 protected void setRegionInfo(final RegionInfo regionInfo) { 148 this.regionInfo = regionInfo; 149 } 150 151 /** 152 * This setter is for subclasses to call in their 153 * {@link #deserializeStateData(ProcedureStateSerializer)} method. Expectation is that 154 * subclasses will persist `override` in their 155 * {@link #serializeStateData(ProcedureStateSerializer)} method and then restore `override` on 156 * deserialization by calling this. 157 */ 158 protected void setOverride(boolean override) { 159 this.override = override; 160 } 161 162 163 /** 164 * This setter is for subclasses to call in their 165 * {@link #deserializeStateData(ProcedureStateSerializer)} method. 166 * @see #setRegionInfo(RegionInfo) 167 */ 168 protected void setAttempt(int attempt) { 169 this.attempt = attempt; 170 } 171 172 protected int getAttempt() { 173 return this.attempt; 174 } 175 176 @Override 177 public TableName getTableName() { 178 RegionInfo hri = getRegionInfo(); 179 return hri != null? hri.getTable(): null; 180 } 181 182 public boolean isMeta() { 183 return TableName.isMetaTableName(getTableName()); 184 } 185 186 @Override 187 public void toStringClassDetails(final StringBuilder sb) { 188 sb.append(getClass().getSimpleName()); 189 sb.append(" table="); 190 sb.append(getTableName()); 191 sb.append(", region="); 192 sb.append(getRegionInfo() == null? null: getRegionInfo().getEncodedName()); 193 if (isOverride()) { 194 // Only log if set. 195 sb.append(", override="); 196 sb.append(isOverride()); 197 } 198 } 199 200 public RegionStateNode getRegionState(final MasterProcedureEnv env) { 201 return env.getAssignmentManager().getRegionStates().getOrCreateRegionStateNode(getRegionInfo()); 202 } 203 204 void setTransitionState(final RegionTransitionState state) { 205 this.transitionState = state; 206 } 207 208 protected RegionTransitionState getTransitionState() { 209 return transitionState; 210 } 211 212 protected abstract boolean startTransition(MasterProcedureEnv env, RegionStateNode regionNode) 213 throws IOException, ProcedureSuspendedException; 214 215 /** 216 * Called when the Procedure is in the REGION_TRANSITION_DISPATCH state. 217 * In here we do the RPC call to OPEN/CLOSE the region. The suspending of 218 * the thread so it sleeps until it gets update that the OPEN/CLOSE has 219 * succeeded is complicated. Read the implementations to learn more. 220 */ 221 protected abstract boolean updateTransition(MasterProcedureEnv env, RegionStateNode regionNode) 222 throws IOException, ProcedureSuspendedException; 223 224 protected abstract void finishTransition(MasterProcedureEnv env, RegionStateNode regionNode) 225 throws IOException, ProcedureSuspendedException; 226 227 protected abstract void reportTransition(MasterProcedureEnv env, 228 RegionStateNode regionNode, TransitionCode code, long seqId) throws UnexpectedStateException; 229 230 @Override 231 public abstract RemoteOperation remoteCallBuild(MasterProcedureEnv env, ServerName serverName); 232 233 /** 234 * @return True if processing of fail is complete; the procedure will be woken from its suspend 235 * and we'll go back to running through procedure steps: 236 * otherwise if false we leave the procedure in suspended state. 237 */ 238 protected abstract boolean remoteCallFailed(MasterProcedureEnv env, 239 RegionStateNode regionNode, IOException exception); 240 241 @Override 242 public void remoteCallCompleted(final MasterProcedureEnv env, 243 final ServerName serverName, final RemoteOperation response) { 244 // Ignore the response? reportTransition() is the one that count? 245 } 246 247 @Override 248 public synchronized boolean remoteCallFailed(final MasterProcedureEnv env, 249 final ServerName serverName, final IOException exception) { 250 final RegionStateNode regionNode = getRegionState(env); 251 LOG.warn("Remote call failed {} {}", this, regionNode.toShortString(), exception); 252 if (remoteCallFailed(env, regionNode, exception)) { 253 // NOTE: This call to wakeEvent puts this Procedure back on the scheduler. 254 // Thereafter, another Worker can be in here so DO NOT MESS WITH STATE beyond 255 // this method. Just get out of this current processing quickly. 256 regionNode.getProcedureEvent().wake(env.getProcedureScheduler()); 257 return true; 258 } 259 return false; 260 // else leave the procedure in suspended state; it is waiting on another call to this callback 261 } 262 263 /** 264 * Be careful! At the end of this method, the procedure has either succeeded 265 * and this procedure has been set into a suspended state OR, we failed and 266 * this procedure has been put back on the scheduler ready for another worker 267 * to pick it up. In both cases, we need to exit the current Worker processing 268 * immediately! 269 * @return True if we successfully dispatched the call and false if we failed; 270 * if failed, we need to roll back any setup done for the dispatch. 271 */ 272 protected boolean addToRemoteDispatcher(final MasterProcedureEnv env, 273 final ServerName targetServer) { 274 LOG.info("Dispatch {}", this); 275 276 // Put this procedure into suspended mode to wait on report of state change 277 // from remote regionserver. Means Procedure associated ProcedureEvent is marked not 'ready'. 278 getRegionState(env).getProcedureEvent().suspend(); 279 280 // Tricky because the below call to addOperationToNode can fail. If it fails, we need to 281 // backtrack on stuff like the 'suspend' done above -- tricky as the 'wake' requests us -- and 282 // ditto up in the caller; it needs to undo state changes. Inside in remoteCallFailed, it does 283 // wake to undo the above suspend. 284 try { 285 env.getRemoteDispatcher().addOperationToNode(targetServer, this); 286 } catch (FailedRemoteDispatchException frde) { 287 remoteCallFailed(env, targetServer, frde); 288 return false; 289 } 290 return true; 291 } 292 293 protected void reportTransition(final MasterProcedureEnv env, final ServerName serverName, 294 final TransitionCode code, final long seqId) throws UnexpectedStateException { 295 final RegionStateNode regionNode = getRegionState(env); 296 if (LOG.isDebugEnabled()) { 297 LOG.debug("Received report " + code + " seqId=" + seqId + ", " + 298 this + "; " + regionNode.toShortString()); 299 } 300 if (!serverName.equals(regionNode.getRegionLocation())) { 301 if (isMeta() && regionNode.getRegionLocation() == null) { 302 regionNode.setRegionLocation(serverName); 303 } else { 304 throw new UnexpectedStateException(String.format( 305 "Unexpected state=%s from server=%s; expected server=%s; %s; %s", 306 code, serverName, regionNode.getRegionLocation(), 307 this, regionNode.toShortString())); 308 } 309 } 310 311 reportTransition(env, regionNode, code, seqId); 312 313 // NOTE: This call adds this procedure back on the scheduler. 314 // This makes it so this procedure can run again. Another worker will take 315 // processing to the next stage. At an extreme, the other worker may run in 316 // parallel so DO NOT CHANGE any state hereafter! This should be last thing 317 // done in this processing step. 318 regionNode.getProcedureEvent().wake(env.getProcedureScheduler()); 319 } 320 321 protected boolean isServerOnline(final MasterProcedureEnv env, final RegionStateNode regionNode) { 322 return isServerOnline(env, regionNode.getRegionLocation()); 323 } 324 325 protected boolean isServerOnline(final MasterProcedureEnv env, final ServerName serverName) { 326 return env.getMasterServices().getServerManager().isServerOnline(serverName); 327 } 328 329 @Override 330 protected void toStringState(StringBuilder builder) { 331 super.toStringState(builder); 332 RegionTransitionState ts = this.transitionState; 333 if (!isFinished() && ts != null) { 334 builder.append(":").append(ts); 335 } 336 } 337 338 @Override 339 protected Procedure[] execute(final MasterProcedureEnv env) throws ProcedureSuspendedException { 340 final AssignmentManager am = env.getAssignmentManager(); 341 final RegionStateNode regionNode = getRegionState(env); 342 if (!am.addRegionInTransition(regionNode, this)) { 343 if (this.isOverride()) { 344 LOG.info("{} owned by pid={}, OVERRIDDEN by 'this' (pid={}, override=true).", 345 regionNode.getRegionInfo().getEncodedName(), 346 regionNode.getProcedure().getProcId(), getProcId()); 347 regionNode.unsetProcedure(regionNode.getProcedure()); 348 } else { 349 String msg = String.format("%s owned by pid=%d, CANNOT run 'this' (pid=%d).", 350 regionNode.getRegionInfo().getEncodedName(), 351 regionNode.getProcedure().getProcId(), getProcId()); 352 LOG.warn(msg); 353 setAbortFailure(getClass().getSimpleName(), msg); 354 return null; 355 } 356 } 357 try { 358 boolean retry; 359 do { 360 retry = false; 361 switch (transitionState) { 362 case REGION_TRANSITION_QUEUE: 363 // 1. push into the AM queue for balancer policy 364 if (!startTransition(env, regionNode)) { 365 // The operation figured it is done or it aborted; check getException() 366 am.removeRegionInTransition(getRegionState(env), this); 367 return null; 368 } 369 transitionState = RegionTransitionState.REGION_TRANSITION_DISPATCH; 370 if (regionNode.getProcedureEvent().suspendIfNotReady(this)) { 371 // Why this suspend? Because we want to ensure Store happens before proceed? 372 throw new ProcedureSuspendedException(); 373 } 374 break; 375 376 case REGION_TRANSITION_DISPATCH: 377 // 2. send the request to the target server 378 if (!updateTransition(env, regionNode)) { 379 // The operation figured it is done or it aborted; check getException() 380 am.removeRegionInTransition(regionNode, this); 381 return null; 382 } 383 if (transitionState != RegionTransitionState.REGION_TRANSITION_DISPATCH) { 384 retry = true; 385 break; 386 } 387 if (regionNode.getProcedureEvent().suspendIfNotReady(this)) { 388 throw new ProcedureSuspendedException(); 389 } 390 break; 391 392 case REGION_TRANSITION_FINISH: 393 // 3. wait assignment response. completion/failure 394 LOG.debug("Finishing {}; {}", this, regionNode.toShortString()); 395 finishTransition(env, regionNode); 396 am.removeRegionInTransition(regionNode, this); 397 return null; 398 } 399 } while (retry); 400 // If here, success so clear out the attempt counter so we start fresh each time we get stuck. 401 this.attempt = 0; 402 } catch (IOException e) { 403 long backoff = ProcedureUtil.getBackoffTimeMs(this.attempt++); 404 LOG.warn("Failed transition, suspend {}secs {}; {}; waiting on rectified condition fixed " + 405 "by other Procedure or operator intervention", backoff / 1000, this, 406 regionNode.toShortString(), e); 407 setTimeout(Math.toIntExact(backoff)); 408 setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT); 409 throw new ProcedureSuspendedException(); 410 } 411 412 return new Procedure[] {this}; 413 } 414 415 /** 416 * At end of timeout, wake ourselves up so we run again. 417 */ 418 @Override 419 protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) { 420 setState(ProcedureProtos.ProcedureState.RUNNABLE); 421 env.getProcedureScheduler().addFront(this); 422 return false; // 'false' means that this procedure handled the timeout 423 } 424 425 @Override 426 protected void rollback(final MasterProcedureEnv env) { 427 if (isRollbackSupported(transitionState)) { 428 // Nothing done up to this point. abort safely. 429 // This should happen when something like disableTable() is triggered. 430 env.getAssignmentManager().removeRegionInTransition(getRegionState(env), this); 431 return; 432 } 433 434 // There is no rollback for assignment unless we cancel the operation by 435 // dropping/disabling the table. 436 LOG.warn("Unhandled state {}; no rollback for assignment! Doing NOTHING!" + 437 " May need manual intervention; {}", 438 transitionState, this); 439 } 440 441 protected abstract boolean isRollbackSupported(final RegionTransitionState state); 442 443 @Override 444 protected boolean abort(final MasterProcedureEnv env) { 445 if (isRollbackSupported(transitionState)) { 446 aborted.set(true); 447 return true; 448 } 449 return false; 450 } 451 452 @Override 453 protected boolean waitInitialized(MasterProcedureEnv env) { 454 // Unless we are assigning meta, wait for meta to be available and loaded. 455 if (isMeta()) { 456 return false; 457 } 458 AssignmentManager am = env.getAssignmentManager(); 459 return am.waitMetaLoaded(this) || am.waitMetaAssigned(this, regionInfo); 460 } 461 462 @Override 463 protected LockState acquireLock(final MasterProcedureEnv env) { 464 // TODO: Revisit this and move it to the executor 465 if (env.getProcedureScheduler().waitRegion(this, getRegionInfo())) { 466 try { 467 // Enable TRACE on this class to see lock dump. Can be really large when cluster is big 468 // or big tables being enabled/disabled. 469 if (LOG.isTraceEnabled()) { 470 LOG.trace("{} pid={} {}", LockState.LOCK_EVENT_WAIT, getProcId(), 471 env.getProcedureScheduler().dumpLocks()); 472 } 473 } catch (IOException e) { 474 // ignore, just for logging 475 } 476 return LockState.LOCK_EVENT_WAIT; 477 } 478 return LockState.LOCK_ACQUIRED; 479 } 480 481 @Override 482 protected void releaseLock(final MasterProcedureEnv env) { 483 env.getProcedureScheduler().wakeRegion(this, getRegionInfo()); 484 } 485 486 @Override 487 protected boolean holdLock(final MasterProcedureEnv env) { 488 return true; 489 } 490 491 @Override 492 protected boolean shouldWaitClientAck(MasterProcedureEnv env) { 493 // The operation is triggered internally on the server 494 // the client does not know about this procedure. 495 return false; 496 } 497 498 /** 499 * Used by ServerCrashProcedure to see if this Assign/Unassign needs processing. 500 * @return ServerName the Assign or Unassign is going against. 501 */ 502 public abstract ServerName getServer(final MasterProcedureEnv env); 503 504 @Override 505 protected void bypass(MasterProcedureEnv env) { 506 // This override is just so I can write a note on how bypass is done in 507 // RTP. For RTP procedures -- i.e. assign/unassign -- if bypass is called, 508 // we intentionally do NOT cleanup our state. We leave a reference to the 509 // bypassed Procedure in the RegionStateNode. Doing this makes it so the 510 // RSN is in an odd state. The bypassed Procedure is finished but no one 511 // else can make progress on this RSN entity (see the #execute above where 512 // we check the RSN to see if an already registered procedure and if so, 513 // we exit without proceeding). This is done to intentionally block 514 // subsequent Procedures from running. Only a Procedure with the 'override' flag 515 // set can overwrite the RSN and make progress. 516 super.bypass(env); 517 } 518 519 boolean isOverride() { 520 return this.override; 521 } 522}