001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019
020package org.apache.hadoop.hbase.master.assignment;
021
022import java.io.IOException;
023
024import org.apache.hadoop.hbase.HConstants;
025import org.apache.hadoop.hbase.NotServingRegionException;
026import org.apache.hadoop.hbase.ServerName;
027import org.apache.hadoop.hbase.client.RegionInfo;
028import org.apache.hadoop.hbase.exceptions.UnexpectedStateException;
029import org.apache.hadoop.hbase.favored.FavoredNodesManager;
030import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
031import org.apache.hadoop.hbase.master.RegionState.State;
032import org.apache.hadoop.hbase.master.assignment.RegionStates.RegionStateNode;
033import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
034import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher.RegionCloseOperation;
035import org.apache.hadoop.hbase.master.procedure.ServerCrashException;
036import org.apache.hadoop.hbase.procedure2.ProcedureMetrics;
037import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
038import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation;
039import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException;
040import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
041import org.apache.yetus.audience.InterfaceAudience;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
045import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
046import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RegionTransitionState;
047import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.UnassignRegionStateData;
048import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
049
050/**
051 * Procedure that describes the unassignment of a single region.
052 * There can only be one RegionTransitionProcedure -- i.e. an assign or an unassign -- per region
053 * running at a time, since each procedure takes a lock on the region.
054 *
055 * <p>The Unassign starts by placing a "close region" request in the Remote Dispatcher
056 * queue, and the procedure will then go into a "waiting state" (suspend).
057 * The Remote Dispatcher will batch the various requests for that server and
058 * they will be sent to the RS for execution.
059 * The RS will complete the open operation by calling master.reportRegionStateTransition().
060 * The AM will intercept the transition report, and notify this procedure.
061 * The procedure will wakeup and finish the unassign by publishing its new state on meta.
062 * <p>If we are unable to contact the remote regionserver whether because of ConnectException
063 * or socket timeout, we will call expire on the server we were trying to contact. We will remain
064 * in suspended state waiting for a wake up from the ServerCrashProcedure that is processing the
065 * failed server. The basic idea is that if we notice a crashed server, then we have a
066 * responsibility; i.e. we should not let go of the region until we are sure the server that was
067 * hosting has had its crash processed. If we let go of the region before then, an assign might
068 * run before the logs have been split which would make for data loss.
069 *
070 * <p>TODO: Rather than this tricky coordination between SCP and this Procedure, instead, work on
071 * returning a SCP as our subprocedure; probably needs work on the framework to do this,
072 * especially if the SCP already created.
073 */
074@InterfaceAudience.Private
075public class UnassignProcedure extends RegionTransitionProcedure {
076  private static final Logger LOG = LoggerFactory.getLogger(UnassignProcedure.class);
077
078  /**
079   * Where to send the unassign RPC.
080   * this one may not accurate since another RTP may change this location for
081   * the region. The hostingServer will be updated in updateTransition
082   */
083  protected volatile ServerName hostingServer;
084  /**
085   * The Server we will subsequently assign the region too (can be null).
086   */
087  protected volatile ServerName destinationServer;
088
089  /**
090   * Whether deleting the region from in-memory states after unassigning the region.
091   */
092  private boolean removeAfterUnassigning;
093
094  public UnassignProcedure() {
095    // Required by the Procedure framework to create the procedure on replay
096    super();
097  }
098
099  public UnassignProcedure(final RegionInfo regionInfo, final ServerName hostingServer,
100      final boolean force, final boolean removeAfterUnassigning) {
101    this(regionInfo, hostingServer, null, force, removeAfterUnassigning);
102  }
103
104  public UnassignProcedure(final RegionInfo regionInfo,
105      final ServerName hostingServer, final ServerName destinationServer, final boolean force) {
106    this(regionInfo, hostingServer, destinationServer, force, false);
107  }
108
109  public UnassignProcedure(final RegionInfo regionInfo, final ServerName hostingServer,
110      final ServerName destinationServer, final boolean override,
111      final boolean removeAfterUnassigning) {
112    super(regionInfo, override);
113    this.hostingServer = hostingServer;
114    this.destinationServer = destinationServer;
115    this.removeAfterUnassigning = removeAfterUnassigning;
116
117    // we don't need REGION_TRANSITION_QUEUE, we jump directly to sending the request
118    setTransitionState(RegionTransitionState.REGION_TRANSITION_DISPATCH);
119  }
120
121  @Override
122  public TableOperationType getTableOperationType() {
123    return TableOperationType.REGION_UNASSIGN;
124  }
125
126  @Override
127  protected boolean isRollbackSupported(final RegionTransitionState state) {
128    switch (state) {
129      case REGION_TRANSITION_QUEUE:
130      case REGION_TRANSITION_DISPATCH:
131        return true;
132      default:
133        return false;
134    }
135  }
136
137  @Override
138  protected void serializeStateData(ProcedureStateSerializer serializer)
139      throws IOException {
140    UnassignRegionStateData.Builder state = UnassignRegionStateData.newBuilder()
141        .setTransitionState(getTransitionState())
142        .setRegionInfo(ProtobufUtil.toRegionInfo(getRegionInfo()));
143    if (this.hostingServer != null) {
144      state.setHostingServer(ProtobufUtil.toServerName(this.hostingServer));
145    }
146    if (this.destinationServer != null) {
147      state.setDestinationServer(ProtobufUtil.toServerName(destinationServer));
148    }
149    if (isOverride()) {
150      state.setForce(true);
151    }
152    if (removeAfterUnassigning) {
153      state.setRemoveAfterUnassigning(true);
154    }
155    if (getAttempt() > 0) {
156      state.setAttempt(getAttempt());
157    }
158    serializer.serialize(state.build());
159  }
160
161  @Override
162  protected void deserializeStateData(ProcedureStateSerializer serializer)
163      throws IOException {
164    final UnassignRegionStateData state =
165        serializer.deserialize(UnassignRegionStateData.class);
166    setTransitionState(state.getTransitionState());
167    setRegionInfo(ProtobufUtil.toRegionInfo(state.getRegionInfo()));
168    // The 'force' flag is the override flag in unassign.
169    setOverride(state.getForce());
170    this.hostingServer =
171        state.hasHostingServer()? ProtobufUtil.toServerName(state.getHostingServer()): null;
172    if (state.hasDestinationServer()) {
173      this.destinationServer = ProtobufUtil.toServerName(state.getDestinationServer());
174    }
175    removeAfterUnassigning = state.getRemoveAfterUnassigning();
176    if (state.hasAttempt()) {
177      setAttempt(state.getAttempt());
178    }
179  }
180
181  @Override
182  protected boolean startTransition(final MasterProcedureEnv env, final RegionStateNode regionNode) {
183    // nothing to do here. we skip the step in the constructor
184    // by jumping to REGION_TRANSITION_DISPATCH
185    throw new UnsupportedOperationException();
186  }
187
188  @Override
189  protected boolean updateTransition(final MasterProcedureEnv env, final RegionStateNode regionNode)
190        throws IOException {
191    // if the region is already closed or offline we can't do much...
192    if (regionNode.isInState(State.CLOSED, State.OFFLINE)) {
193      LOG.info("Not unassigned " + this + "; " + regionNode.toShortString());
194      return false;
195    }
196
197    // if we haven't started the operation yet, we can abort
198    if (aborted.get() && regionNode.isInState(State.OPEN)) {
199      setAbortFailure(getClass().getSimpleName(), "abort requested");
200      return false;
201    }
202
203    if (regionNode.getRegionLocation() != null && !regionNode
204        .getRegionLocation().equals(hostingServer)) {
205      LOG.info("HostingServer changed from {} to {} for {}", hostingServer,
206          regionNode.getRegionLocation(), this);
207      this.hostingServer = regionNode.getRegionLocation();
208    }
209
210
211    // Mark the region as CLOSING.
212    env.getAssignmentManager().markRegionAsClosing(regionNode);
213
214    // Add the close region operation to the server dispatch queue.
215    if (!addToRemoteDispatcher(env, regionNode.getRegionLocation())) {
216      // If addToRemoteDispatcher fails, it calls the callback #remoteCallFailed.
217    }
218
219    // Return true to keep the procedure running.
220    return true;
221  }
222
223  @Override
224  protected void finishTransition(final MasterProcedureEnv env, final RegionStateNode regionNode)
225      throws IOException {
226    AssignmentManager am = env.getAssignmentManager();
227    RegionInfo regionInfo = getRegionInfo();
228
229    if (!removeAfterUnassigning) {
230      am.markRegionAsClosed(regionNode);
231    } else {
232      // Remove from in-memory states
233      am.getRegionStates().deleteRegion(regionInfo);
234      am.getRegionStates().removeRegionFromServer(regionNode.getRegionLocation(), regionNode);
235      env.getMasterServices().getServerManager().removeRegion(regionInfo);
236      FavoredNodesManager fnm = env.getMasterServices().getFavoredNodesManager();
237      if (fnm != null) {
238        fnm.deleteFavoredNodesForRegions(Lists.newArrayList(regionInfo));
239      }
240    }
241  }
242
243  @Override
244  public RemoteOperation remoteCallBuild(final MasterProcedureEnv env, final ServerName serverName) {
245    assert serverName.equals(getRegionState(env).getRegionLocation());
246    return new RegionCloseOperation(this, getRegionInfo(), this.destinationServer);
247  }
248
249  @Override
250  protected void reportTransition(final MasterProcedureEnv env, final RegionStateNode regionNode,
251      final TransitionCode code, final long seqId) throws UnexpectedStateException {
252    switch (code) {
253      case CLOSED:
254        setTransitionState(RegionTransitionState.REGION_TRANSITION_FINISH);
255        break;
256      default:
257        throw new UnexpectedStateException(String.format(
258          "Received report unexpected transition state=%s for region=%s server=%s, expected CLOSED.",
259          code, regionNode.getRegionInfo(), regionNode.getRegionLocation()));
260    }
261  }
262
263  /**
264   * Our remote call failed but there are a few states where it is safe to proceed with the
265   * unassign; e.g. if a server crash and it has had all of its WALs processed, then we can allow
266   * this unassign to go to completion.
267   * @return True if it is safe to proceed with the unassign.
268   */
269  private boolean isSafeToProceed(final MasterProcedureEnv env, final RegionStateNode regionNode,
270    final IOException exception) {
271    if (exception instanceof ServerCrashException) {
272      // This exception comes from ServerCrashProcedure AFTER log splitting. Its a signaling
273      // exception. SCP found this region as a RIT during its processing of the crash.  Its call
274      // into here says it is ok to let this procedure go complete.
275      LOG.info("Safe to let procedure move to next step; {}", this);
276      return true;
277    }
278    if (exception instanceof NotServingRegionException) {
279      LOG.warn("IS OK? ANY LOGS TO REPLAY; ACTING AS THOUGH ALL GOOD {}", regionNode, exception);
280      return true;
281    }
282    return false;
283  }
284
285  /**
286   * Set it up so when procedure is unsuspended, we'll move to the procedure finish.
287   */
288  protected void proceed(final MasterProcedureEnv env, final RegionStateNode regionNode) {
289    try {
290      reportTransition(env, regionNode, TransitionCode.CLOSED, HConstants.NO_SEQNUM);
291    } catch (UnexpectedStateException e) {
292      // Should never happen.
293      throw new RuntimeException(e);
294    }
295  }
296
297  /**
298   * @return If true, we will re-wake up this procedure; if false, the procedure stays suspended.
299   */
300  @Override
301  protected boolean remoteCallFailed(final MasterProcedureEnv env, final RegionStateNode regionNode,
302      final IOException exception) {
303    // Be careful reading the below; we do returns in middle of the method a few times.
304    if (isSafeToProceed(env, regionNode, exception)) {
305      proceed(env, regionNode);
306    } else if (exception instanceof RegionServerAbortedException ||
307        exception instanceof RegionServerStoppedException) {
308      // RS is aborting/stopping, we cannot offline the region since the region may need to do WAL
309      // recovery. Until we see the RS expiration, stay suspended; return false.
310      LOG.info("Ignoring; waiting on ServerCrashProcedure", exception);
311      return false;
312    } else if (exception instanceof ServerNotRunningYetException) {
313      // This should not happen. If it does, procedure will be woken-up and we'll retry.
314      // TODO: Needs a pause and backoff?
315      LOG.info("Retry", exception);
316    } else {
317      // We failed to RPC this server. Set it as expired.
318      ServerName serverName = regionNode.getRegionLocation();
319      LOG.warn("Expiring {}, {} {}; exception={}", serverName, this, regionNode.toShortString(),
320          exception.getClass().getSimpleName());
321      if (!env.getMasterServices().getServerManager().expireServer(serverName)) {
322        // Failed to queue an expire. Lots of possible reasons including it may be already expired.
323        // In ServerCrashProcedure, there is a handleRIT stage where we
324        // will iterator over all the RIT procedures for the related regions of a crashed RS and
325        // fail them with ServerCrashException. You can see the isSafeToProceed method above for
326        // more details.
327        // This can work for most cases, but since we do not hold the region lock in handleRIT,
328        // there could be race that we arrive here after the handleRIT stage of the SCP. So here we
329        // need to check whether it is safe to quit.
330        // Notice that, the first assumption is that we can only quit after the log splitting is
331        // done, as MRP can schedule an AssignProcedure right after us, and if the log splitting has
332        // not been done then there will be data loss. And in SCP, we will change the state from
333        // SPLITTING to OFFLINE(or SPLITTING_META_DONE for meta log processing) after finishing the
334        // log splitting, and then calling handleRIT, so checking the state here can be a safe
335        // fence. If the state is not OFFLINE(or SPLITTING_META_DONE), then we can just leave this
336        // procedure in suspended state as we can make sure that the handleRIT has not been executed
337        // yet and it will wake us up later. And if the state is OFFLINE(or SPLITTING_META_DONE), we
338        // can safely quit since there will be no data loss. There could be duplicated
339        // AssignProcedures for the same region but it is OK as we will do a check at the beginning
340        // of AssignProcedure to prevent double assign. And there we have region lock so there will
341        // be no race.
342        if (env.getAssignmentManager().isLogSplittingDone(serverName, isMeta())) {
343          // Its ok to proceed with this unassign.
344          LOG.info("{} is dead and processed; moving procedure to finished state; {}", serverName,
345            this);
346          proceed(env, regionNode);
347          // Return true; wake up the procedure so we can act on proceed.
348          return true;
349        }
350        LOG.info("Failed expiration and log splitting not done on {}", serverName);
351      }
352      // Return false so this procedure stays in suspended state. It will be woken up by the
353      // ServerCrashProcedure that was scheduled when we called #expireServer above. SCP calls
354      // #handleRIT which will call this method only the exception will be a ServerCrashException
355      // this time around (See above).
356      // TODO: Add a SCP as a new subprocedure that we now come to depend on.
357      return false;
358    }
359    return true;
360  }
361
362  @Override
363  public void toStringClassDetails(StringBuilder sb) {
364    super.toStringClassDetails(sb);
365    sb.append(", server=").append(this.hostingServer);
366  }
367
368  @Override
369  public ServerName getServer(final MasterProcedureEnv env) {
370    RegionStateNode node =
371        env.getAssignmentManager().getRegionStates().getRegionStateNode(this.getRegionInfo());
372    if (node == null) {
373      return null;
374    }
375    return node.getRegionLocation();
376  }
377
378  @Override
379  protected ProcedureMetrics getProcedureMetrics(MasterProcedureEnv env) {
380    return env.getAssignmentManager().getAssignmentManagerMetrics().getUnassignProcMetrics();
381  }
382}