001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.assignment; 019 020import java.io.IOException; 021import java.util.Optional; 022import java.util.concurrent.CompletableFuture; 023import org.apache.hadoop.hbase.HConstants; 024import org.apache.hadoop.hbase.ServerName; 025import org.apache.hadoop.hbase.TableName; 026import org.apache.hadoop.hbase.client.RegionInfo; 027import org.apache.hadoop.hbase.exceptions.UnexpectedStateException; 028import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; 029import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface; 030import org.apache.hadoop.hbase.procedure2.FailedRemoteDispatchException; 031import org.apache.hadoop.hbase.procedure2.Procedure; 032import org.apache.hadoop.hbase.procedure2.ProcedureEvent; 033import org.apache.hadoop.hbase.procedure2.ProcedureFutureUtil; 034import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; 035import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; 036import org.apache.hadoop.hbase.procedure2.ProcedureUtil; 037import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; 038import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher; 039import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure; 040import org.apache.hadoop.hbase.procedure2.RemoteProcedureException; 041import org.apache.hadoop.hbase.util.RetryCounter; 042import org.apache.yetus.audience.InterfaceAudience; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 047import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RegionRemoteProcedureBaseState; 048import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RegionRemoteProcedureBaseStateData; 049import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; 050import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState; 051import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; 052 053/** 054 * The base class for the remote procedures used to open/close a region. 055 * <p/> 056 * Notice that here we do not care about the result of the remote call, if the remote call is 057 * finished, either succeeded or not, we will always finish the procedure. The parent procedure 058 * should take care of the result and try to reschedule if the result is not good. 059 */ 060@InterfaceAudience.Private 061public abstract class RegionRemoteProcedureBase extends Procedure<MasterProcedureEnv> 062 implements TableProcedureInterface, RemoteProcedure<MasterProcedureEnv, ServerName> { 063 064 private static final Logger LOG = LoggerFactory.getLogger(RegionRemoteProcedureBase.class); 065 066 protected RegionInfo region; 067 068 protected ServerName targetServer; 069 070 private RegionRemoteProcedureBaseState state = 071 RegionRemoteProcedureBaseState.REGION_REMOTE_PROCEDURE_DISPATCH; 072 073 private TransitionCode transitionCode; 074 075 private long seqId; 076 077 private RetryCounter retryCounter; 078 079 private CompletableFuture<Void> future; 080 081 protected RegionRemoteProcedureBase() { 082 } 083 084 protected RegionRemoteProcedureBase(TransitRegionStateProcedure parent, RegionInfo region, 085 ServerName targetServer) { 086 this.region = region; 087 this.targetServer = targetServer; 088 parent.attachRemoteProc(this); 089 } 090 091 @Override 092 public Optional<RemoteProcedureDispatcher.RemoteOperation> remoteCallBuild(MasterProcedureEnv env, 093 ServerName remote) { 094 // REPORT_SUCCEED means that this remote open/close request already executed in RegionServer. 095 // So return empty operation and RSProcedureDispatcher no need to send it again. 096 if (state == RegionRemoteProcedureBaseState.REGION_REMOTE_PROCEDURE_REPORT_SUCCEED) { 097 return Optional.empty(); 098 } 099 return Optional.of(newRemoteOperation(env)); 100 } 101 102 protected abstract RemoteProcedureDispatcher.RemoteOperation 103 newRemoteOperation(MasterProcedureEnv env); 104 105 @Override 106 public void remoteOperationCompleted(MasterProcedureEnv env, byte[] remoteResultData) { 107 // should not be called since we use reportRegionStateTransition to report the result 108 throw new UnsupportedOperationException(); 109 } 110 111 @Override 112 public void remoteOperationFailed(MasterProcedureEnv env, RemoteProcedureException error) { 113 // should not be called since we use reportRegionStateTransition to report the result 114 throw new UnsupportedOperationException(); 115 } 116 117 private RegionStateNode getRegionNode(MasterProcedureEnv env) { 118 return env.getAssignmentManager().getRegionStates().getRegionStateNode(region); 119 } 120 121 @Override 122 public void remoteCallFailed(MasterProcedureEnv env, ServerName remote, IOException exception) { 123 RegionStateNode regionNode = getRegionNode(env); 124 regionNode.lock(); 125 try { 126 if (!env.getMasterServices().getServerManager().isServerOnline(remote)) { 127 // the SCP will interrupt us, give up 128 LOG.debug("{} for region {}, targetServer {} is dead, SCP will interrupt us, give up", this, 129 regionNode, remote); 130 return; 131 } 132 if (state != RegionRemoteProcedureBaseState.REGION_REMOTE_PROCEDURE_DISPATCH) { 133 // not sure how can this happen but anyway let's add a check here to avoid waking the wrong 134 // procedure... 135 LOG.warn("{} for region {}, targetServer={} has already been woken up, ignore", this, 136 regionNode, remote); 137 return; 138 } 139 LOG.warn("The remote operation {} for region {} to server {} failed", this, regionNode, 140 remote, exception); 141 // It is OK to not persist the state here, as we do not need to change the region state if the 142 // remote call is failed. If the master crashed before we actually execute the procedure and 143 // persist the new state, it is fine to retry on the same target server again. 144 state = RegionRemoteProcedureBaseState.REGION_REMOTE_PROCEDURE_DISPATCH_FAIL; 145 regionNode.getProcedureEvent().wake(env.getProcedureScheduler()); 146 } finally { 147 regionNode.unlock(); 148 } 149 } 150 151 @Override 152 public TableName getTableName() { 153 return region.getTable(); 154 } 155 156 @Override 157 protected boolean waitInitialized(MasterProcedureEnv env) { 158 if (isCriticalSystemTable()) { 159 return false; 160 } 161 if (TableName.isMetaTableName(getTableName())) { 162 return false; 163 } 164 // First we need meta to be loaded, and second, if meta is not online then we will likely to 165 // fail when updating meta so we wait until it is assigned. 166 AssignmentManager am = env.getAssignmentManager(); 167 return am.waitMetaLoaded(this) || am.waitMetaAssigned(this, region); 168 } 169 170 @Override 171 protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException { 172 throw new UnsupportedOperationException(); 173 } 174 175 @Override 176 protected boolean abort(MasterProcedureEnv env) { 177 return false; 178 } 179 180 // do some checks to see if the report is valid 181 protected abstract void checkTransition(RegionStateNode regionNode, TransitionCode transitionCode, 182 long seqId) throws UnexpectedStateException; 183 184 // change the in memory state of the regionNode, but do not update meta. 185 protected abstract void updateTransitionWithoutPersistingToMeta(MasterProcedureEnv env, 186 RegionStateNode regionNode, TransitionCode transitionCode, long seqId) throws IOException; 187 188 // A bit strange but the procedure store will throw RuntimeException if we can not persist the 189 // state, so upper layer should take care of this... 190 private void persistAndWake(MasterProcedureEnv env, RegionStateNode regionNode) { 191 // The synchronization here is to guard with ProcedureExecutor.executeRollback, as here we will 192 // not hold the procedure execution lock, but we should not persist a procedure in ROLLEDBACK 193 // state to the procedure store. 194 // The ProcedureStore.update must be inside the lock, so here the check for procedure state and 195 // update could be atomic. In ProcedureExecutor.cleanupAfterRollbackOneStep, we will set the 196 // state to ROLLEDBACK, which will hold the same lock too as the Procedure.setState method is 197 // synchronized. This is the key to keep us safe. 198 synchronized (this) { 199 if (getState() == ProcedureState.ROLLEDBACK) { 200 LOG.warn("Procedure {} has already been rolled back, skip persistent", this); 201 return; 202 } 203 env.getMasterServices().getMasterProcedureExecutor().getStore().update(this); 204 } 205 regionNode.getProcedureEvent().wake(env.getProcedureScheduler()); 206 } 207 208 // should be called with RegionStateNode locked, to avoid race with the execute method below 209 void reportTransition(MasterProcedureEnv env, RegionStateNode regionNode, ServerName serverName, 210 TransitionCode transitionCode, long seqId) throws IOException { 211 if (state != RegionRemoteProcedureBaseState.REGION_REMOTE_PROCEDURE_DISPATCH) { 212 // should be a retry 213 return; 214 } 215 if (!targetServer.equals(serverName)) { 216 throw new UnexpectedStateException("Received report from " + serverName + ", expected " 217 + targetServer + ", " + regionNode + ", proc=" + this); 218 } 219 checkTransition(regionNode, transitionCode, seqId); 220 // this state means we have received the report from RS, does not mean the result is fine, as we 221 // may received a FAILED_OPEN. 222 this.state = RegionRemoteProcedureBaseState.REGION_REMOTE_PROCEDURE_REPORT_SUCCEED; 223 this.transitionCode = transitionCode; 224 this.seqId = seqId; 225 // Persist the transition code and openSeqNum(if provided). 226 // We should not update the hbase:meta directly as this may cause races when master restarts, 227 // as the old active master may incorrectly report back to RS and cause the new master to hang 228 // on a OpenRegionProcedure forever. See HBASE-22060 and HBASE-22074 for more details. 229 boolean succ = false; 230 try { 231 persistAndWake(env, regionNode); 232 succ = true; 233 } finally { 234 if (!succ) { 235 this.state = RegionRemoteProcedureBaseState.REGION_REMOTE_PROCEDURE_DISPATCH; 236 this.transitionCode = null; 237 this.seqId = HConstants.NO_SEQNUM; 238 } 239 } 240 try { 241 updateTransitionWithoutPersistingToMeta(env, regionNode, transitionCode, seqId); 242 } catch (IOException e) { 243 throw new AssertionError("should not happen", e); 244 } 245 } 246 247 void serverCrashed(MasterProcedureEnv env, RegionStateNode regionNode, ServerName serverName) { 248 if (state == RegionRemoteProcedureBaseState.REGION_REMOTE_PROCEDURE_SERVER_CRASH) { 249 // should be a retry 250 return; 251 } 252 RegionRemoteProcedureBaseState oldState = state; 253 // it is possible that the state is in REGION_REMOTE_PROCEDURE_SERVER_CRASH, think of this 254 // sequence 255 // 1. region is open on the target server and the above reportTransition call is succeeded 256 // 2. before we are woken up and update the meta, the target server crashes, and then we arrive 257 // here 258 this.state = RegionRemoteProcedureBaseState.REGION_REMOTE_PROCEDURE_SERVER_CRASH; 259 boolean succ = false; 260 try { 261 persistAndWake(env, regionNode); 262 succ = true; 263 } finally { 264 if (!succ) { 265 this.state = oldState; 266 } 267 } 268 } 269 270 protected abstract void restoreSucceedState(AssignmentManager am, RegionStateNode regionNode, 271 long seqId) throws IOException; 272 273 void stateLoaded(AssignmentManager am, RegionStateNode regionNode) { 274 if (state == RegionRemoteProcedureBaseState.REGION_REMOTE_PROCEDURE_REPORT_SUCCEED) { 275 try { 276 restoreSucceedState(am, regionNode, seqId); 277 } catch (IOException e) { 278 // should not happen as we are just restoring the state 279 throw new AssertionError(e); 280 } 281 } 282 } 283 284 private TransitRegionStateProcedure getParent(MasterProcedureEnv env) { 285 return (TransitRegionStateProcedure) env.getMasterServices().getMasterProcedureExecutor() 286 .getProcedure(getParentProcId()); 287 } 288 289 private void unattach(MasterProcedureEnv env) { 290 getParent(env).unattachRemoteProc(this); 291 } 292 293 private CompletableFuture<Void> getFuture() { 294 return future; 295 } 296 297 private void setFuture(CompletableFuture<Void> f) { 298 future = f; 299 } 300 301 @Override 302 protected void beforeExec(MasterProcedureEnv env) throws ProcedureSuspendedException { 303 RegionStateNode regionNode = getRegionNode(env); 304 if (!regionNode.isLockedBy(this)) { 305 // The wake up action will be called under the lock inside RegionStateNode for implementing 306 // RegionStateNodeLock, so if we call ProcedureUtil.wakeUp where we will acquire the procedure 307 // execution lock directly, it may cause dead lock since in normal case procedure execution 308 // case, we will acquire the procedure execution lock first and then acquire the lock inside 309 // RegionStateNodeLock. This is the reason why we need to schedule the task to a thread pool 310 // and execute asynchronously. 311 regionNode.lock(this, 312 () -> env.getAsyncTaskExecutor().execute(() -> ProcedureFutureUtil.wakeUp(this, env))); 313 } 314 } 315 316 @Override 317 protected void afterExec(MasterProcedureEnv env) { 318 // only release the lock if there is no pending updating meta operation 319 if (future == null) { 320 RegionStateNode regionNode = getRegionNode(env); 321 // in beforeExec, we may throw ProcedureSuspendedException which means we do not get the lock, 322 // in this case we should not call unlock 323 if (regionNode.isLockedBy(this)) { 324 regionNode.unlock(this); 325 } 326 } 327 } 328 329 @Override 330 protected Procedure<MasterProcedureEnv>[] execute(MasterProcedureEnv env) 331 throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { 332 RegionStateNode regionNode = getRegionNode(env); 333 try { 334 switch (state) { 335 case REGION_REMOTE_PROCEDURE_DISPATCH: { 336 // The code which wakes us up also needs to lock the RSN so here we do not need to 337 // synchronize on the event. 338 ProcedureEvent<?> event = regionNode.getProcedureEvent(); 339 try { 340 env.getRemoteDispatcher().addOperationToNode(targetServer, this); 341 } catch (FailedRemoteDispatchException e) { 342 LOG.warn("Can not add remote operation {} for region {} to server {}, this usually " 343 + "because the server is alread dead, give up and mark the procedure as complete, " 344 + "the parent procedure will take care of this.", this, region, targetServer, e); 345 unattach(env); 346 return null; 347 } 348 event.suspend(); 349 event.suspendIfNotReady(this); 350 throw new ProcedureSuspendedException(); 351 } 352 case REGION_REMOTE_PROCEDURE_REPORT_SUCCEED: 353 if ( 354 ProcedureFutureUtil.checkFuture(this, this::getFuture, this::setFuture, 355 () -> unattach(env)) 356 ) { 357 return null; 358 } 359 ProcedureFutureUtil.suspendIfNecessary(this, this::setFuture, 360 env.getAssignmentManager().persistToMeta(regionNode), env, () -> unattach(env)); 361 return null; 362 case REGION_REMOTE_PROCEDURE_DISPATCH_FAIL: 363 // the remote call is failed so we do not need to change the region state, just return. 364 unattach(env); 365 return null; 366 case REGION_REMOTE_PROCEDURE_SERVER_CRASH: 367 if ( 368 ProcedureFutureUtil.checkFuture(this, this::getFuture, this::setFuture, 369 () -> unattach(env)) 370 ) { 371 return null; 372 } 373 ProcedureFutureUtil.suspendIfNecessary(this, this::setFuture, 374 env.getAssignmentManager().regionClosedAbnormally(regionNode), env, 375 () -> unattach(env)); 376 return null; 377 default: 378 throw new IllegalStateException("Unknown state: " + state); 379 } 380 } catch (IOException e) { 381 if (retryCounter == null) { 382 retryCounter = ProcedureUtil.createRetryCounter(env.getMasterConfiguration()); 383 } 384 long backoff = retryCounter.getBackoffTimeAndIncrementAttempts(); 385 LOG.warn("Failed updating meta, suspend {}secs {}; {};", backoff / 1000, this, regionNode, e); 386 throw suspend(Math.toIntExact(backoff), true); 387 } 388 } 389 390 @Override 391 protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) { 392 setState(ProcedureProtos.ProcedureState.RUNNABLE); 393 env.getProcedureScheduler().addFront(this); 394 return false; // 'false' means that this procedure handled the timeout 395 } 396 397 @Override 398 public boolean storeInDispatchedQueue() { 399 return false; 400 } 401 402 @Override 403 protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { 404 RegionRemoteProcedureBaseStateData.Builder builder = 405 RegionRemoteProcedureBaseStateData.newBuilder().setRegion(ProtobufUtil.toRegionInfo(region)) 406 .setTargetServer(ProtobufUtil.toServerName(targetServer)).setState(state); 407 if (transitionCode != null) { 408 builder.setTransitionCode(transitionCode); 409 builder.setSeqId(seqId); 410 } 411 serializer.serialize(builder.build()); 412 } 413 414 @Override 415 protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { 416 RegionRemoteProcedureBaseStateData data = 417 serializer.deserialize(RegionRemoteProcedureBaseStateData.class); 418 region = ProtobufUtil.toRegionInfo(data.getRegion()); 419 targetServer = ProtobufUtil.toServerName(data.getTargetServer()); 420 // 'state' may not be present if we are reading an 'old' form of this pb Message. 421 if (data.hasState()) { 422 state = data.getState(); 423 } 424 if (data.hasTransitionCode()) { 425 transitionCode = data.getTransitionCode(); 426 seqId = data.getSeqId(); 427 } 428 } 429 430 @Override 431 protected void afterReplay(MasterProcedureEnv env) { 432 getParent(env).attachRemoteProc(this); 433 } 434 435 @Override 436 public String getProcName() { 437 return getClass().getSimpleName() + " " + region.getEncodedName(); 438 } 439 440 @Override 441 protected void toStringClassDetails(StringBuilder builder) { 442 builder.append(getProcName()); 443 if (targetServer != null) { 444 builder.append(", server="); 445 builder.append(this.targetServer); 446 } 447 if (this.retryCounter != null) { 448 builder.append(", retry="); 449 builder.append(this.retryCounter); 450 } 451 } 452}