001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019 020package org.apache.hadoop.hbase.master.assignment; 021 022import java.io.IOException; 023 024import org.apache.hadoop.hbase.HConstants; 025import org.apache.hadoop.hbase.NotServingRegionException; 026import org.apache.hadoop.hbase.ServerName; 027import org.apache.hadoop.hbase.client.RegionInfo; 028import org.apache.hadoop.hbase.exceptions.UnexpectedStateException; 029import org.apache.hadoop.hbase.favored.FavoredNodesManager; 030import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; 031import org.apache.hadoop.hbase.master.RegionState.State; 032import org.apache.hadoop.hbase.master.assignment.RegionStates.RegionStateNode; 033import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; 034import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher.RegionCloseOperation; 035import org.apache.hadoop.hbase.master.procedure.ServerCrashException; 036import org.apache.hadoop.hbase.procedure2.ProcedureMetrics; 037import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; 038import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation; 039import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException; 040import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; 041import org.apache.yetus.audience.InterfaceAudience; 042import org.slf4j.Logger; 043import org.slf4j.LoggerFactory; 044import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 045import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 046import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RegionTransitionState; 047import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.UnassignRegionStateData; 048import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; 049 050/** 051 * Procedure that describes the unassignment of a single region. 052 * There can only be one RegionTransitionProcedure -- i.e. an assign or an unassign -- per region 053 * running at a time, since each procedure takes a lock on the region. 054 * 055 * <p>The Unassign starts by placing a "close region" request in the Remote Dispatcher 056 * queue, and the procedure will then go into a "waiting state" (suspend). 057 * The Remote Dispatcher will batch the various requests for that server and 058 * they will be sent to the RS for execution. 059 * The RS will complete the open operation by calling master.reportRegionStateTransition(). 060 * The AM will intercept the transition report, and notify this procedure. 061 * The procedure will wakeup and finish the unassign by publishing its new state on meta. 062 * <p>If we are unable to contact the remote regionserver whether because of ConnectException 063 * or socket timeout, we will call expire on the server we were trying to contact. We will remain 064 * in suspended state waiting for a wake up from the ServerCrashProcedure that is processing the 065 * failed server. The basic idea is that if we notice a crashed server, then we have a 066 * responsibility; i.e. we should not let go of the region until we are sure the server that was 067 * hosting has had its crash processed. If we let go of the region before then, an assign might 068 * run before the logs have been split which would make for data loss. 069 * 070 * <p>TODO: Rather than this tricky coordination between SCP and this Procedure, instead, work on 071 * returning a SCP as our subprocedure; probably needs work on the framework to do this, 072 * especially if the SCP already created. 073 */ 074@InterfaceAudience.Private 075public class UnassignProcedure extends RegionTransitionProcedure { 076 private static final Logger LOG = LoggerFactory.getLogger(UnassignProcedure.class); 077 078 /** 079 * Where to send the unassign RPC. 080 * this one may not accurate since another RTP may change this location for 081 * the region. The hostingServer will be updated in updateTransition 082 */ 083 protected volatile ServerName hostingServer; 084 /** 085 * The Server we will subsequently assign the region too (can be null). 086 */ 087 protected volatile ServerName destinationServer; 088 089 /** 090 * Whether deleting the region from in-memory states after unassigning the region. 091 */ 092 private boolean removeAfterUnassigning; 093 094 public UnassignProcedure() { 095 // Required by the Procedure framework to create the procedure on replay 096 super(); 097 } 098 099 public UnassignProcedure(final RegionInfo regionInfo, final ServerName hostingServer, 100 final boolean force, final boolean removeAfterUnassigning) { 101 this(regionInfo, hostingServer, null, force, removeAfterUnassigning); 102 } 103 104 public UnassignProcedure(final RegionInfo regionInfo, 105 final ServerName hostingServer, final ServerName destinationServer, final boolean force) { 106 this(regionInfo, hostingServer, destinationServer, force, false); 107 } 108 109 public UnassignProcedure(final RegionInfo regionInfo, final ServerName hostingServer, 110 final ServerName destinationServer, final boolean override, 111 final boolean removeAfterUnassigning) { 112 super(regionInfo, override); 113 this.hostingServer = hostingServer; 114 this.destinationServer = destinationServer; 115 this.removeAfterUnassigning = removeAfterUnassigning; 116 117 // we don't need REGION_TRANSITION_QUEUE, we jump directly to sending the request 118 setTransitionState(RegionTransitionState.REGION_TRANSITION_DISPATCH); 119 } 120 121 @Override 122 public TableOperationType getTableOperationType() { 123 return TableOperationType.REGION_UNASSIGN; 124 } 125 126 @Override 127 protected boolean isRollbackSupported(final RegionTransitionState state) { 128 switch (state) { 129 case REGION_TRANSITION_QUEUE: 130 case REGION_TRANSITION_DISPATCH: 131 return true; 132 default: 133 return false; 134 } 135 } 136 137 @Override 138 protected void serializeStateData(ProcedureStateSerializer serializer) 139 throws IOException { 140 UnassignRegionStateData.Builder state = UnassignRegionStateData.newBuilder() 141 .setTransitionState(getTransitionState()) 142 .setRegionInfo(ProtobufUtil.toRegionInfo(getRegionInfo())); 143 if (this.hostingServer != null) { 144 state.setHostingServer(ProtobufUtil.toServerName(this.hostingServer)); 145 } 146 if (this.destinationServer != null) { 147 state.setDestinationServer(ProtobufUtil.toServerName(destinationServer)); 148 } 149 if (isOverride()) { 150 state.setForce(true); 151 } 152 if (removeAfterUnassigning) { 153 state.setRemoveAfterUnassigning(true); 154 } 155 if (getAttempt() > 0) { 156 state.setAttempt(getAttempt()); 157 } 158 serializer.serialize(state.build()); 159 } 160 161 @Override 162 protected void deserializeStateData(ProcedureStateSerializer serializer) 163 throws IOException { 164 final UnassignRegionStateData state = 165 serializer.deserialize(UnassignRegionStateData.class); 166 setTransitionState(state.getTransitionState()); 167 setRegionInfo(ProtobufUtil.toRegionInfo(state.getRegionInfo())); 168 // The 'force' flag is the override flag in unassign. 169 setOverride(state.getForce()); 170 this.hostingServer = 171 state.hasHostingServer()? ProtobufUtil.toServerName(state.getHostingServer()): null; 172 if (state.hasDestinationServer()) { 173 this.destinationServer = ProtobufUtil.toServerName(state.getDestinationServer()); 174 } 175 removeAfterUnassigning = state.getRemoveAfterUnassigning(); 176 if (state.hasAttempt()) { 177 setAttempt(state.getAttempt()); 178 } 179 } 180 181 @Override 182 protected boolean startTransition(final MasterProcedureEnv env, final RegionStateNode regionNode) { 183 // nothing to do here. we skip the step in the constructor 184 // by jumping to REGION_TRANSITION_DISPATCH 185 throw new UnsupportedOperationException(); 186 } 187 188 @Override 189 protected boolean updateTransition(final MasterProcedureEnv env, final RegionStateNode regionNode) 190 throws IOException { 191 // if the region is already closed or offline we can't do much... 192 if (regionNode.isInState(State.CLOSED, State.OFFLINE)) { 193 LOG.info("Not unassigned " + this + "; " + regionNode.toShortString()); 194 return false; 195 } 196 197 // if we haven't started the operation yet, we can abort 198 if (aborted.get() && regionNode.isInState(State.OPEN)) { 199 setAbortFailure(getClass().getSimpleName(), "abort requested"); 200 return false; 201 } 202 203 if (regionNode.getRegionLocation() != null && !regionNode 204 .getRegionLocation().equals(hostingServer)) { 205 LOG.info("HostingServer changed from {} to {} for {}", hostingServer, 206 regionNode.getRegionLocation(), this); 207 this.hostingServer = regionNode.getRegionLocation(); 208 } 209 210 211 // Mark the region as CLOSING. 212 env.getAssignmentManager().markRegionAsClosing(regionNode); 213 214 // Add the close region operation to the server dispatch queue. 215 if (!addToRemoteDispatcher(env, regionNode.getRegionLocation())) { 216 // If addToRemoteDispatcher fails, it calls the callback #remoteCallFailed. 217 } 218 219 // Return true to keep the procedure running. 220 return true; 221 } 222 223 @Override 224 protected void finishTransition(final MasterProcedureEnv env, final RegionStateNode regionNode) 225 throws IOException { 226 AssignmentManager am = env.getAssignmentManager(); 227 RegionInfo regionInfo = getRegionInfo(); 228 229 if (!removeAfterUnassigning) { 230 am.markRegionAsClosed(regionNode); 231 } else { 232 // Remove from in-memory states 233 am.getRegionStates().deleteRegion(regionInfo); 234 env.getMasterServices().getServerManager().removeRegion(regionInfo); 235 FavoredNodesManager fnm = env.getMasterServices().getFavoredNodesManager(); 236 if (fnm != null) { 237 fnm.deleteFavoredNodesForRegions(Lists.newArrayList(regionInfo)); 238 } 239 } 240 } 241 242 @Override 243 public RemoteOperation remoteCallBuild(final MasterProcedureEnv env, final ServerName serverName) { 244 assert serverName.equals(getRegionState(env).getRegionLocation()); 245 return new RegionCloseOperation(this, getRegionInfo(), this.destinationServer); 246 } 247 248 @Override 249 protected void reportTransition(final MasterProcedureEnv env, final RegionStateNode regionNode, 250 final TransitionCode code, final long seqId) throws UnexpectedStateException { 251 switch (code) { 252 case CLOSED: 253 setTransitionState(RegionTransitionState.REGION_TRANSITION_FINISH); 254 break; 255 default: 256 throw new UnexpectedStateException(String.format( 257 "Received report unexpected transition state=%s for region=%s server=%s, expected CLOSED.", 258 code, regionNode.getRegionInfo(), regionNode.getRegionLocation())); 259 } 260 } 261 262 /** 263 * Our remote call failed but there are a few states where it is safe to proceed with the 264 * unassign; e.g. if a server crash and it has had all of its WALs processed, then we can allow 265 * this unassign to go to completion. 266 * @return True if it is safe to proceed with the unassign. 267 */ 268 private boolean isSafeToProceed(final MasterProcedureEnv env, final RegionStateNode regionNode, 269 final IOException exception) { 270 if (exception instanceof ServerCrashException) { 271 // This exception comes from ServerCrashProcedure AFTER log splitting. Its a signaling 272 // exception. SCP found this region as a RIT during its processing of the crash. Its call 273 // into here says it is ok to let this procedure go complete. 274 LOG.info("Safe to let procedure move to next step; {}", this); 275 return true; 276 } 277 if (exception instanceof NotServingRegionException) { 278 LOG.warn("IS OK? ANY LOGS TO REPLAY; ACTING AS THOUGH ALL GOOD {}", regionNode, exception); 279 return true; 280 } 281 return false; 282 } 283 284 /** 285 * Set it up so when procedure is unsuspended, we'll move to the procedure finish. 286 */ 287 protected void proceed(final MasterProcedureEnv env, final RegionStateNode regionNode) { 288 try { 289 reportTransition(env, regionNode, TransitionCode.CLOSED, HConstants.NO_SEQNUM); 290 } catch (UnexpectedStateException e) { 291 // Should never happen. 292 throw new RuntimeException(e); 293 } 294 } 295 296 /** 297 * @return If true, we will re-wake up this procedure; if false, the procedure stays suspended. 298 */ 299 @Override 300 protected boolean remoteCallFailed(final MasterProcedureEnv env, final RegionStateNode regionNode, 301 final IOException exception) { 302 // Be careful reading the below; we do returns in middle of the method a few times. 303 if (isSafeToProceed(env, regionNode, exception)) { 304 proceed(env, regionNode); 305 } else if (exception instanceof RegionServerAbortedException || 306 exception instanceof RegionServerStoppedException) { 307 // RS is aborting/stopping, we cannot offline the region since the region may need to do WAL 308 // recovery. Until we see the RS expiration, stay suspended; return false. 309 LOG.info("Ignoring; waiting on ServerCrashProcedure", exception); 310 return false; 311 } else if (exception instanceof ServerNotRunningYetException) { 312 // This should not happen. If it does, procedure will be woken-up and we'll retry. 313 // TODO: Needs a pause and backoff? 314 LOG.info("Retry", exception); 315 } else { 316 // We failed to RPC this server. Set it as expired. 317 ServerName serverName = regionNode.getRegionLocation(); 318 LOG.warn("Expiring {}, {} {}; exception={}", serverName, this, regionNode.toShortString(), 319 exception.getClass().getSimpleName()); 320 if (!env.getMasterServices().getServerManager().expireServer(serverName)) { 321 // Failed to queue an expire. Lots of possible reasons including it may be already expired. 322 // In ServerCrashProcedure, there is a handleRIT stage where we 323 // will iterator over all the RIT procedures for the related regions of a crashed RS and 324 // fail them with ServerCrashException. You can see the isSafeToProceed method above for 325 // more details. 326 // This can work for most cases, but since we do not hold the region lock in handleRIT, 327 // there could be race that we arrive here after the handleRIT stage of the SCP. So here we 328 // need to check whether it is safe to quit. 329 // Notice that, the first assumption is that we can only quit after the log splitting is 330 // done, as MRP can schedule an AssignProcedure right after us, and if the log splitting has 331 // not been done then there will be data loss. And in SCP, we will change the state from 332 // SPLITTING to OFFLINE(or SPLITTING_META_DONE for meta log processing) after finishing the 333 // log splitting, and then calling handleRIT, so checking the state here can be a safe 334 // fence. If the state is not OFFLINE(or SPLITTING_META_DONE), then we can just leave this 335 // procedure in suspended state as we can make sure that the handleRIT has not been executed 336 // yet and it will wake us up later. And if the state is OFFLINE(or SPLITTING_META_DONE), we 337 // can safely quit since there will be no data loss. There could be duplicated 338 // AssignProcedures for the same region but it is OK as we will do a check at the beginning 339 // of AssignProcedure to prevent double assign. And there we have region lock so there will 340 // be no race. 341 if (env.getAssignmentManager().isLogSplittingDone(serverName, isMeta())) { 342 // Its ok to proceed with this unassign. 343 LOG.info("{} is dead and processed; moving procedure to finished state; {}", serverName, 344 this); 345 proceed(env, regionNode); 346 // Return true; wake up the procedure so we can act on proceed. 347 return true; 348 } 349 LOG.info("Failed expiration and log splitting not done on {}", serverName); 350 } 351 // Return false so this procedure stays in suspended state. It will be woken up by the 352 // ServerCrashProcedure that was scheduled when we called #expireServer above. SCP calls 353 // #handleRIT which will call this method only the exception will be a ServerCrashException 354 // this time around (See above). 355 // TODO: Add a SCP as a new subprocedure that we now come to depend on. 356 return false; 357 } 358 return true; 359 } 360 361 @Override 362 public void toStringClassDetails(StringBuilder sb) { 363 super.toStringClassDetails(sb); 364 sb.append(", server=").append(this.hostingServer); 365 } 366 367 @Override 368 public ServerName getServer(final MasterProcedureEnv env) { 369 RegionStateNode node = 370 env.getAssignmentManager().getRegionStates().getRegionStateNode(this.getRegionInfo()); 371 if (node == null) { 372 return null; 373 } 374 return node.getRegionLocation(); 375 } 376 377 @Override 378 protected ProcedureMetrics getProcedureMetrics(MasterProcedureEnv env) { 379 return env.getAssignmentManager().getAssignmentManagerMetrics().getUnassignProcMetrics(); 380 } 381}