001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019 020package org.apache.hadoop.hbase.master.assignment; 021 022import java.io.IOException; 023 024import org.apache.hadoop.hbase.HConstants; 025import org.apache.hadoop.hbase.NotServingRegionException; 026import org.apache.hadoop.hbase.ServerName; 027import org.apache.hadoop.hbase.client.RegionInfo; 028import org.apache.hadoop.hbase.exceptions.UnexpectedStateException; 029import org.apache.hadoop.hbase.favored.FavoredNodesManager; 030import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; 031import org.apache.hadoop.hbase.master.RegionState.State; 032import org.apache.hadoop.hbase.master.assignment.RegionStates.RegionStateNode; 033import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; 034import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher.RegionCloseOperation; 035import org.apache.hadoop.hbase.master.procedure.ServerCrashException; 036import org.apache.hadoop.hbase.procedure2.ProcedureMetrics; 037import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; 038import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation; 039import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException; 040import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; 041import org.apache.yetus.audience.InterfaceAudience; 042import org.slf4j.Logger; 043import org.slf4j.LoggerFactory; 044import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 045import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 046import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RegionTransitionState; 047import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.UnassignRegionStateData; 048import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; 049 050/** 051 * Procedure that describes the unassignment of a single region. 052 * There can only be one RegionTransitionProcedure -- i.e. an assign or an unassign -- per region 053 * running at a time, since each procedure takes a lock on the region. 054 * 055 * <p>The Unassign starts by placing a "close region" request in the Remote Dispatcher 056 * queue, and the procedure will then go into a "waiting state" (suspend). 057 * The Remote Dispatcher will batch the various requests for that server and 058 * they will be sent to the RS for execution. 059 * The RS will complete the open operation by calling master.reportRegionStateTransition(). 060 * The AM will intercept the transition report, and notify this procedure. 061 * The procedure will wakeup and finish the unassign by publishing its new state on meta. 062 * <p>If we are unable to contact the remote regionserver whether because of ConnectException 063 * or socket timeout, we will call expire on the server we were trying to contact. We will remain 064 * in suspended state waiting for a wake up from the ServerCrashProcedure that is processing the 065 * failed server. The basic idea is that if we notice a crashed server, then we have a 066 * responsibility; i.e. we should not let go of the region until we are sure the server that was 067 * hosting has had its crash processed. If we let go of the region before then, an assign might 068 * run before the logs have been split which would make for data loss. 069 * 070 * <p>TODO: Rather than this tricky coordination between SCP and this Procedure, instead, work on 071 * returning a SCP as our subprocedure; probably needs work on the framework to do this, 072 * especially if the SCP already created. 073 */ 074@InterfaceAudience.Private 075public class UnassignProcedure extends RegionTransitionProcedure { 076 private static final Logger LOG = LoggerFactory.getLogger(UnassignProcedure.class); 077 078 /** 079 * Where to send the unassign RPC. 080 * this one may not accurate since another RTP may change this location for 081 * the region. The hostingServer will be updated in updateTransition 082 */ 083 protected volatile ServerName hostingServer; 084 /** 085 * The Server we will subsequently assign the region too (can be null). 086 */ 087 protected volatile ServerName destinationServer; 088 089 /** 090 * Whether deleting the region from in-memory states after unassigning the region. 091 */ 092 private boolean removeAfterUnassigning; 093 094 public UnassignProcedure() { 095 // Required by the Procedure framework to create the procedure on replay 096 super(); 097 } 098 099 public UnassignProcedure(final RegionInfo regionInfo, final ServerName hostingServer, 100 final boolean force, final boolean removeAfterUnassigning) { 101 this(regionInfo, hostingServer, null, force, removeAfterUnassigning); 102 } 103 104 public UnassignProcedure(final RegionInfo regionInfo, 105 final ServerName hostingServer, final ServerName destinationServer, final boolean force) { 106 this(regionInfo, hostingServer, destinationServer, force, false); 107 } 108 109 public UnassignProcedure(final RegionInfo regionInfo, final ServerName hostingServer, 110 final ServerName destinationServer, final boolean override, 111 final boolean removeAfterUnassigning) { 112 super(regionInfo, override); 113 this.hostingServer = hostingServer; 114 this.destinationServer = destinationServer; 115 this.removeAfterUnassigning = removeAfterUnassigning; 116 117 // we don't need REGION_TRANSITION_QUEUE, we jump directly to sending the request 118 setTransitionState(RegionTransitionState.REGION_TRANSITION_DISPATCH); 119 } 120 121 @Override 122 public TableOperationType getTableOperationType() { 123 return TableOperationType.REGION_UNASSIGN; 124 } 125 126 @Override 127 protected boolean isRollbackSupported(final RegionTransitionState state) { 128 switch (state) { 129 case REGION_TRANSITION_QUEUE: 130 case REGION_TRANSITION_DISPATCH: 131 return true; 132 default: 133 return false; 134 } 135 } 136 137 @Override 138 protected void serializeStateData(ProcedureStateSerializer serializer) 139 throws IOException { 140 UnassignRegionStateData.Builder state = UnassignRegionStateData.newBuilder() 141 .setTransitionState(getTransitionState()) 142 .setRegionInfo(ProtobufUtil.toRegionInfo(getRegionInfo())); 143 if (this.hostingServer != null) { 144 state.setHostingServer(ProtobufUtil.toServerName(this.hostingServer)); 145 } 146 if (this.destinationServer != null) { 147 state.setDestinationServer(ProtobufUtil.toServerName(destinationServer)); 148 } 149 if (isOverride()) { 150 state.setForce(true); 151 } 152 if (removeAfterUnassigning) { 153 state.setRemoveAfterUnassigning(true); 154 } 155 if (getAttempt() > 0) { 156 state.setAttempt(getAttempt()); 157 } 158 serializer.serialize(state.build()); 159 } 160 161 @Override 162 protected void deserializeStateData(ProcedureStateSerializer serializer) 163 throws IOException { 164 final UnassignRegionStateData state = 165 serializer.deserialize(UnassignRegionStateData.class); 166 setTransitionState(state.getTransitionState()); 167 setRegionInfo(ProtobufUtil.toRegionInfo(state.getRegionInfo())); 168 // The 'force' flag is the override flag in unassign. 169 setOverride(state.getForce()); 170 this.hostingServer = 171 state.hasHostingServer()? ProtobufUtil.toServerName(state.getHostingServer()): null; 172 if (state.hasDestinationServer()) { 173 this.destinationServer = ProtobufUtil.toServerName(state.getDestinationServer()); 174 } 175 removeAfterUnassigning = state.getRemoveAfterUnassigning(); 176 if (state.hasAttempt()) { 177 setAttempt(state.getAttempt()); 178 } 179 } 180 181 @Override 182 protected boolean startTransition(final MasterProcedureEnv env, final RegionStateNode regionNode) { 183 // nothing to do here. we skip the step in the constructor 184 // by jumping to REGION_TRANSITION_DISPATCH 185 throw new UnsupportedOperationException(); 186 } 187 188 @Override 189 protected boolean updateTransition(final MasterProcedureEnv env, final RegionStateNode regionNode) 190 throws IOException { 191 // if the region is already closed or offline we can't do much... 192 if (regionNode.isInState(State.CLOSED, State.OFFLINE)) { 193 LOG.info("Not unassigned " + this + "; " + regionNode.toShortString()); 194 return false; 195 } 196 197 // if we haven't started the operation yet, we can abort 198 if (aborted.get() && regionNode.isInState(State.OPEN)) { 199 setAbortFailure(getClass().getSimpleName(), "abort requested"); 200 return false; 201 } 202 203 if (regionNode.getRegionLocation() != null && !regionNode 204 .getRegionLocation().equals(hostingServer)) { 205 LOG.info("HostingServer changed from {} to {} for {}", hostingServer, 206 regionNode.getRegionLocation(), this); 207 this.hostingServer = regionNode.getRegionLocation(); 208 } 209 210 211 // Mark the region as CLOSING. 212 env.getAssignmentManager().markRegionAsClosing(regionNode); 213 214 // Add the close region operation to the server dispatch queue. 215 if (!addToRemoteDispatcher(env, regionNode.getRegionLocation())) { 216 // If addToRemoteDispatcher fails, it calls the callback #remoteCallFailed. 217 } 218 219 // Return true to keep the procedure running. 220 return true; 221 } 222 223 @Override 224 protected void finishTransition(final MasterProcedureEnv env, final RegionStateNode regionNode) 225 throws IOException { 226 AssignmentManager am = env.getAssignmentManager(); 227 RegionInfo regionInfo = getRegionInfo(); 228 229 if (!removeAfterUnassigning) { 230 am.markRegionAsClosed(regionNode); 231 } else { 232 // Remove from in-memory states 233 am.getRegionStates().deleteRegion(regionInfo); 234 am.getRegionStates().removeRegionFromServer(regionNode.getRegionLocation(), regionNode); 235 env.getMasterServices().getServerManager().removeRegion(regionInfo); 236 FavoredNodesManager fnm = env.getMasterServices().getFavoredNodesManager(); 237 if (fnm != null) { 238 fnm.deleteFavoredNodesForRegions(Lists.newArrayList(regionInfo)); 239 } 240 } 241 } 242 243 @Override 244 public RemoteOperation remoteCallBuild(final MasterProcedureEnv env, final ServerName serverName) { 245 assert serverName.equals(getRegionState(env).getRegionLocation()); 246 return new RegionCloseOperation(this, getRegionInfo(), this.destinationServer); 247 } 248 249 @Override 250 protected void reportTransition(final MasterProcedureEnv env, final RegionStateNode regionNode, 251 final TransitionCode code, final long seqId) throws UnexpectedStateException { 252 switch (code) { 253 case CLOSED: 254 setTransitionState(RegionTransitionState.REGION_TRANSITION_FINISH); 255 break; 256 default: 257 throw new UnexpectedStateException(String.format( 258 "Received report unexpected transition state=%s for region=%s server=%s, expected CLOSED.", 259 code, regionNode.getRegionInfo(), regionNode.getRegionLocation())); 260 } 261 } 262 263 /** 264 * Our remote call failed but there are a few states where it is safe to proceed with the 265 * unassign; e.g. if a server crash and it has had all of its WALs processed, then we can allow 266 * this unassign to go to completion. 267 * @return True if it is safe to proceed with the unassign. 268 */ 269 private boolean isSafeToProceed(final MasterProcedureEnv env, final RegionStateNode regionNode, 270 final IOException exception) { 271 if (exception instanceof ServerCrashException) { 272 // This exception comes from ServerCrashProcedure AFTER log splitting. Its a signaling 273 // exception. SCP found this region as a RIT during its processing of the crash. Its call 274 // into here says it is ok to let this procedure go complete. 275 LOG.info("Safe to let procedure move to next step; {}", this); 276 return true; 277 } 278 if (exception instanceof NotServingRegionException) { 279 LOG.warn("IS OK? ANY LOGS TO REPLAY; ACTING AS THOUGH ALL GOOD {}", regionNode, exception); 280 return true; 281 } 282 return false; 283 } 284 285 /** 286 * Set it up so when procedure is unsuspended, we'll move to the procedure finish. 287 */ 288 protected void proceed(final MasterProcedureEnv env, final RegionStateNode regionNode) { 289 try { 290 reportTransition(env, regionNode, TransitionCode.CLOSED, HConstants.NO_SEQNUM); 291 } catch (UnexpectedStateException e) { 292 // Should never happen. 293 throw new RuntimeException(e); 294 } 295 } 296 297 /** 298 * @return If true, we will re-wake up this procedure; if false, the procedure stays suspended. 299 */ 300 @Override 301 protected boolean remoteCallFailed(final MasterProcedureEnv env, final RegionStateNode regionNode, 302 final IOException exception) { 303 // Be careful reading the below; we do returns in middle of the method a few times. 304 if (isSafeToProceed(env, regionNode, exception)) { 305 proceed(env, regionNode); 306 } else if (exception instanceof RegionServerAbortedException || 307 exception instanceof RegionServerStoppedException) { 308 // RS is aborting/stopping, we cannot offline the region since the region may need to do WAL 309 // recovery. Until we see the RS expiration, stay suspended; return false. 310 LOG.info("Ignoring; waiting on ServerCrashProcedure", exception); 311 return false; 312 } else if (exception instanceof ServerNotRunningYetException) { 313 // This should not happen. If it does, procedure will be woken-up and we'll retry. 314 // TODO: Needs a pause and backoff? 315 LOG.info("Retry", exception); 316 } else { 317 // We failed to RPC this server. Set it as expired. 318 ServerName serverName = regionNode.getRegionLocation(); 319 LOG.warn("Expiring {}, {} {}; exception={}", serverName, this, regionNode.toShortString(), 320 exception.getClass().getSimpleName()); 321 if (!env.getMasterServices().getServerManager().expireServer(serverName)) { 322 // Failed to queue an expire. Lots of possible reasons including it may be already expired. 323 // In ServerCrashProcedure, there is a handleRIT stage where we 324 // will iterator over all the RIT procedures for the related regions of a crashed RS and 325 // fail them with ServerCrashException. You can see the isSafeToProceed method above for 326 // more details. 327 // This can work for most cases, but since we do not hold the region lock in handleRIT, 328 // there could be race that we arrive here after the handleRIT stage of the SCP. So here we 329 // need to check whether it is safe to quit. 330 // Notice that, the first assumption is that we can only quit after the log splitting is 331 // done, as MRP can schedule an AssignProcedure right after us, and if the log splitting has 332 // not been done then there will be data loss. And in SCP, we will change the state from 333 // SPLITTING to OFFLINE(or SPLITTING_META_DONE for meta log processing) after finishing the 334 // log splitting, and then calling handleRIT, so checking the state here can be a safe 335 // fence. If the state is not OFFLINE(or SPLITTING_META_DONE), then we can just leave this 336 // procedure in suspended state as we can make sure that the handleRIT has not been executed 337 // yet and it will wake us up later. And if the state is OFFLINE(or SPLITTING_META_DONE), we 338 // can safely quit since there will be no data loss. There could be duplicated 339 // AssignProcedures for the same region but it is OK as we will do a check at the beginning 340 // of AssignProcedure to prevent double assign. And there we have region lock so there will 341 // be no race. 342 if (env.getAssignmentManager().isLogSplittingDone(serverName, isMeta())) { 343 // Its ok to proceed with this unassign. 344 LOG.info("{} is dead and processed; moving procedure to finished state; {}", serverName, 345 this); 346 proceed(env, regionNode); 347 // Return true; wake up the procedure so we can act on proceed. 348 return true; 349 } 350 LOG.info("Failed expiration and log splitting not done on {}", serverName); 351 } 352 // Return false so this procedure stays in suspended state. It will be woken up by the 353 // ServerCrashProcedure that was scheduled when we called #expireServer above. SCP calls 354 // #handleRIT which will call this method only the exception will be a ServerCrashException 355 // this time around (See above). 356 // TODO: Add a SCP as a new subprocedure that we now come to depend on. 357 return false; 358 } 359 return true; 360 } 361 362 @Override 363 public void toStringClassDetails(StringBuilder sb) { 364 super.toStringClassDetails(sb); 365 sb.append(", server=").append(this.hostingServer); 366 } 367 368 @Override 369 public ServerName getServer(final MasterProcedureEnv env) { 370 RegionStateNode node = 371 env.getAssignmentManager().getRegionStates().getRegionStateNode(this.getRegionInfo()); 372 if (node == null) { 373 return null; 374 } 375 return node.getRegionLocation(); 376 } 377 378 @Override 379 protected ProcedureMetrics getProcedureMetrics(MasterProcedureEnv env) { 380 return env.getAssignmentManager().getAssignmentManagerMetrics().getUnassignProcMetrics(); 381 } 382}