001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019
020package org.apache.hadoop.hbase.master.assignment;
021
022import java.io.IOException;
023
024import org.apache.hadoop.hbase.HConstants;
025import org.apache.hadoop.hbase.NotServingRegionException;
026import org.apache.hadoop.hbase.ServerName;
027import org.apache.hadoop.hbase.client.RegionInfo;
028import org.apache.hadoop.hbase.exceptions.UnexpectedStateException;
029import org.apache.hadoop.hbase.favored.FavoredNodesManager;
030import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
031import org.apache.hadoop.hbase.master.RegionState.State;
032import org.apache.hadoop.hbase.master.assignment.RegionStates.RegionStateNode;
033import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
034import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher.RegionCloseOperation;
035import org.apache.hadoop.hbase.master.procedure.ServerCrashException;
036import org.apache.hadoop.hbase.procedure2.ProcedureMetrics;
037import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
038import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation;
039import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException;
040import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
041import org.apache.yetus.audience.InterfaceAudience;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
045import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
046import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RegionTransitionState;
047import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.UnassignRegionStateData;
048import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
049
050/**
051 * Procedure that describes the unassignment of a single region.
052 * There can only be one RegionTransitionProcedure -- i.e. an assign or an unassign -- per region
053 * running at a time, since each procedure takes a lock on the region.
054 *
055 * <p>The Unassign starts by placing a "close region" request in the Remote Dispatcher
056 * queue, and the procedure will then go into a "waiting state" (suspend).
057 * The Remote Dispatcher will batch the various requests for that server and
058 * they will be sent to the RS for execution.
059 * The RS will complete the open operation by calling master.reportRegionStateTransition().
060 * The AM will intercept the transition report, and notify this procedure.
061 * The procedure will wakeup and finish the unassign by publishing its new state on meta.
062 * <p>If we are unable to contact the remote regionserver whether because of ConnectException
063 * or socket timeout, we will call expire on the server we were trying to contact. We will remain
064 * in suspended state waiting for a wake up from the ServerCrashProcedure that is processing the
065 * failed server. The basic idea is that if we notice a crashed server, then we have a
066 * responsibility; i.e. we should not let go of the region until we are sure the server that was
067 * hosting has had its crash processed. If we let go of the region before then, an assign might
068 * run before the logs have been split which would make for data loss.
069 *
070 * <p>TODO: Rather than this tricky coordination between SCP and this Procedure, instead, work on
071 * returning a SCP as our subprocedure; probably needs work on the framework to do this,
072 * especially if the SCP already created.
073 */
074@InterfaceAudience.Private
075public class UnassignProcedure extends RegionTransitionProcedure {
076  private static final Logger LOG = LoggerFactory.getLogger(UnassignProcedure.class);
077
078  /**
079   * Where to send the unassign RPC.
080   * this one may not accurate since another RTP may change this location for
081   * the region. The hostingServer will be updated in updateTransition
082   */
083  protected volatile ServerName hostingServer;
084  /**
085   * The Server we will subsequently assign the region too (can be null).
086   */
087  protected volatile ServerName destinationServer;
088
089  /**
090   * Whether deleting the region from in-memory states after unassigning the region.
091   */
092  private boolean removeAfterUnassigning;
093
094  public UnassignProcedure() {
095    // Required by the Procedure framework to create the procedure on replay
096    super();
097  }
098
099  public UnassignProcedure(final RegionInfo regionInfo, final ServerName hostingServer,
100      final boolean force, final boolean removeAfterUnassigning) {
101    this(regionInfo, hostingServer, null, force, removeAfterUnassigning);
102  }
103
104  public UnassignProcedure(final RegionInfo regionInfo,
105      final ServerName hostingServer, final ServerName destinationServer, final boolean force) {
106    this(regionInfo, hostingServer, destinationServer, force, false);
107  }
108
109  public UnassignProcedure(final RegionInfo regionInfo, final ServerName hostingServer,
110      final ServerName destinationServer, final boolean override,
111      final boolean removeAfterUnassigning) {
112    super(regionInfo, override);
113    this.hostingServer = hostingServer;
114    this.destinationServer = destinationServer;
115    this.removeAfterUnassigning = removeAfterUnassigning;
116
117    // we don't need REGION_TRANSITION_QUEUE, we jump directly to sending the request
118    setTransitionState(RegionTransitionState.REGION_TRANSITION_DISPATCH);
119  }
120
121  @Override
122  public TableOperationType getTableOperationType() {
123    return TableOperationType.REGION_UNASSIGN;
124  }
125
126  @Override
127  protected boolean isRollbackSupported(final RegionTransitionState state) {
128    switch (state) {
129      case REGION_TRANSITION_QUEUE:
130      case REGION_TRANSITION_DISPATCH:
131        return true;
132      default:
133        return false;
134    }
135  }
136
137  @Override
138  protected void serializeStateData(ProcedureStateSerializer serializer)
139      throws IOException {
140    UnassignRegionStateData.Builder state = UnassignRegionStateData.newBuilder()
141        .setTransitionState(getTransitionState())
142        .setRegionInfo(ProtobufUtil.toRegionInfo(getRegionInfo()));
143    if (this.hostingServer != null) {
144      state.setHostingServer(ProtobufUtil.toServerName(this.hostingServer));
145    }
146    if (this.destinationServer != null) {
147      state.setDestinationServer(ProtobufUtil.toServerName(destinationServer));
148    }
149    if (isOverride()) {
150      state.setForce(true);
151    }
152    if (removeAfterUnassigning) {
153      state.setRemoveAfterUnassigning(true);
154    }
155    if (getAttempt() > 0) {
156      state.setAttempt(getAttempt());
157    }
158    serializer.serialize(state.build());
159  }
160
161  @Override
162  protected void deserializeStateData(ProcedureStateSerializer serializer)
163      throws IOException {
164    final UnassignRegionStateData state =
165        serializer.deserialize(UnassignRegionStateData.class);
166    setTransitionState(state.getTransitionState());
167    setRegionInfo(ProtobufUtil.toRegionInfo(state.getRegionInfo()));
168    // The 'force' flag is the override flag in unassign.
169    setOverride(state.getForce());
170    this.hostingServer =
171        state.hasHostingServer()? ProtobufUtil.toServerName(state.getHostingServer()): null;
172    if (state.hasDestinationServer()) {
173      this.destinationServer = ProtobufUtil.toServerName(state.getDestinationServer());
174    }
175    removeAfterUnassigning = state.getRemoveAfterUnassigning();
176    if (state.hasAttempt()) {
177      setAttempt(state.getAttempt());
178    }
179  }
180
181  @Override
182  protected boolean startTransition(final MasterProcedureEnv env, final RegionStateNode regionNode) {
183    // nothing to do here. we skip the step in the constructor
184    // by jumping to REGION_TRANSITION_DISPATCH
185    throw new UnsupportedOperationException();
186  }
187
188  @Override
189  protected boolean updateTransition(final MasterProcedureEnv env, final RegionStateNode regionNode)
190        throws IOException {
191    // if the region is already closed or offline we can't do much...
192    if (regionNode.isInState(State.CLOSED, State.OFFLINE)) {
193      LOG.info("Not unassigned " + this + "; " + regionNode.toShortString());
194      return false;
195    }
196
197    // if we haven't started the operation yet, we can abort
198    if (aborted.get() && regionNode.isInState(State.OPEN)) {
199      setAbortFailure(getClass().getSimpleName(), "abort requested");
200      return false;
201    }
202
203    if (regionNode.getRegionLocation() != null && !regionNode
204        .getRegionLocation().equals(hostingServer)) {
205      LOG.info("HostingServer changed from {} to {} for {}", hostingServer,
206          regionNode.getRegionLocation(), this);
207      this.hostingServer = regionNode.getRegionLocation();
208    }
209
210
211    // Mark the region as CLOSING.
212    env.getAssignmentManager().markRegionAsClosing(regionNode);
213
214    // Add the close region operation to the server dispatch queue.
215    if (!addToRemoteDispatcher(env, regionNode.getRegionLocation())) {
216      // If addToRemoteDispatcher fails, it calls the callback #remoteCallFailed.
217    }
218
219    // Return true to keep the procedure running.
220    return true;
221  }
222
223  @Override
224  protected void finishTransition(final MasterProcedureEnv env, final RegionStateNode regionNode)
225      throws IOException {
226    AssignmentManager am = env.getAssignmentManager();
227    RegionInfo regionInfo = getRegionInfo();
228
229    if (!removeAfterUnassigning) {
230      am.markRegionAsClosed(regionNode);
231    } else {
232      // Remove from in-memory states
233      am.getRegionStates().deleteRegion(regionInfo);
234      env.getMasterServices().getServerManager().removeRegion(regionInfo);
235      FavoredNodesManager fnm = env.getMasterServices().getFavoredNodesManager();
236      if (fnm != null) {
237        fnm.deleteFavoredNodesForRegions(Lists.newArrayList(regionInfo));
238      }
239    }
240  }
241
242  @Override
243  public RemoteOperation remoteCallBuild(final MasterProcedureEnv env, final ServerName serverName) {
244    assert serverName.equals(getRegionState(env).getRegionLocation());
245    return new RegionCloseOperation(this, getRegionInfo(), this.destinationServer);
246  }
247
248  @Override
249  protected void reportTransition(final MasterProcedureEnv env, final RegionStateNode regionNode,
250      final TransitionCode code, final long seqId) throws UnexpectedStateException {
251    switch (code) {
252      case CLOSED:
253        setTransitionState(RegionTransitionState.REGION_TRANSITION_FINISH);
254        break;
255      default:
256        throw new UnexpectedStateException(String.format(
257          "Received report unexpected transition state=%s for region=%s server=%s, expected CLOSED.",
258          code, regionNode.getRegionInfo(), regionNode.getRegionLocation()));
259    }
260  }
261
262  /**
263   * Our remote call failed but there are a few states where it is safe to proceed with the
264   * unassign; e.g. if a server crash and it has had all of its WALs processed, then we can allow
265   * this unassign to go to completion.
266   * @return True if it is safe to proceed with the unassign.
267   */
268  private boolean isSafeToProceed(final MasterProcedureEnv env, final RegionStateNode regionNode,
269    final IOException exception) {
270    if (exception instanceof ServerCrashException) {
271      // This exception comes from ServerCrashProcedure AFTER log splitting. Its a signaling
272      // exception. SCP found this region as a RIT during its processing of the crash.  Its call
273      // into here says it is ok to let this procedure go complete.
274      LOG.info("Safe to let procedure move to next step; {}", this);
275      return true;
276    }
277    if (exception instanceof NotServingRegionException) {
278      LOG.warn("IS OK? ANY LOGS TO REPLAY; ACTING AS THOUGH ALL GOOD {}", regionNode, exception);
279      return true;
280    }
281    return false;
282  }
283
284  /**
285   * Set it up so when procedure is unsuspended, we'll move to the procedure finish.
286   */
287  protected void proceed(final MasterProcedureEnv env, final RegionStateNode regionNode) {
288    try {
289      reportTransition(env, regionNode, TransitionCode.CLOSED, HConstants.NO_SEQNUM);
290    } catch (UnexpectedStateException e) {
291      // Should never happen.
292      throw new RuntimeException(e);
293    }
294  }
295
296  /**
297   * @return If true, we will re-wake up this procedure; if false, the procedure stays suspended.
298   */
299  @Override
300  protected boolean remoteCallFailed(final MasterProcedureEnv env, final RegionStateNode regionNode,
301      final IOException exception) {
302    // Be careful reading the below; we do returns in middle of the method a few times.
303    if (isSafeToProceed(env, regionNode, exception)) {
304      proceed(env, regionNode);
305    } else if (exception instanceof RegionServerAbortedException ||
306        exception instanceof RegionServerStoppedException) {
307      // RS is aborting/stopping, we cannot offline the region since the region may need to do WAL
308      // recovery. Until we see the RS expiration, stay suspended; return false.
309      LOG.info("Ignoring; waiting on ServerCrashProcedure", exception);
310      return false;
311    } else if (exception instanceof ServerNotRunningYetException) {
312      // This should not happen. If it does, procedure will be woken-up and we'll retry.
313      // TODO: Needs a pause and backoff?
314      LOG.info("Retry", exception);
315    } else {
316      // We failed to RPC this server. Set it as expired.
317      ServerName serverName = regionNode.getRegionLocation();
318      LOG.warn("Expiring {}, {} {}; exception={}", serverName, this, regionNode.toShortString(),
319          exception.getClass().getSimpleName());
320      if (!env.getMasterServices().getServerManager().expireServer(serverName)) {
321        // Failed to queue an expire. Lots of possible reasons including it may be already expired.
322        // In ServerCrashProcedure, there is a handleRIT stage where we
323        // will iterator over all the RIT procedures for the related regions of a crashed RS and
324        // fail them with ServerCrashException. You can see the isSafeToProceed method above for
325        // more details.
326        // This can work for most cases, but since we do not hold the region lock in handleRIT,
327        // there could be race that we arrive here after the handleRIT stage of the SCP. So here we
328        // need to check whether it is safe to quit.
329        // Notice that, the first assumption is that we can only quit after the log splitting is
330        // done, as MRP can schedule an AssignProcedure right after us, and if the log splitting has
331        // not been done then there will be data loss. And in SCP, we will change the state from
332        // SPLITTING to OFFLINE(or SPLITTING_META_DONE for meta log processing) after finishing the
333        // log splitting, and then calling handleRIT, so checking the state here can be a safe
334        // fence. If the state is not OFFLINE(or SPLITTING_META_DONE), then we can just leave this
335        // procedure in suspended state as we can make sure that the handleRIT has not been executed
336        // yet and it will wake us up later. And if the state is OFFLINE(or SPLITTING_META_DONE), we
337        // can safely quit since there will be no data loss. There could be duplicated
338        // AssignProcedures for the same region but it is OK as we will do a check at the beginning
339        // of AssignProcedure to prevent double assign. And there we have region lock so there will
340        // be no race.
341        if (env.getAssignmentManager().isLogSplittingDone(serverName, isMeta())) {
342          // Its ok to proceed with this unassign.
343          LOG.info("{} is dead and processed; moving procedure to finished state; {}", serverName,
344            this);
345          proceed(env, regionNode);
346          // Return true; wake up the procedure so we can act on proceed.
347          return true;
348        }
349        LOG.info("Failed expiration and log splitting not done on {}", serverName);
350      }
351      // Return false so this procedure stays in suspended state. It will be woken up by the
352      // ServerCrashProcedure that was scheduled when we called #expireServer above. SCP calls
353      // #handleRIT which will call this method only the exception will be a ServerCrashException
354      // this time around (See above).
355      // TODO: Add a SCP as a new subprocedure that we now come to depend on.
356      return false;
357    }
358    return true;
359  }
360
361  @Override
362  public void toStringClassDetails(StringBuilder sb) {
363    super.toStringClassDetails(sb);
364    sb.append(", server=").append(this.hostingServer);
365  }
366
367  @Override
368  public ServerName getServer(final MasterProcedureEnv env) {
369    RegionStateNode node =
370        env.getAssignmentManager().getRegionStates().getRegionStateNode(this.getRegionInfo());
371    if (node == null) {
372      return null;
373    }
374    return node.getRegionLocation();
375  }
376
377  @Override
378  protected ProcedureMetrics getProcedureMetrics(MasterProcedureEnv env) {
379    return env.getAssignmentManager().getAssignmentManagerMetrics().getUnassignProcMetrics();
380  }
381}