001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.procedure;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Arrays;
023import java.util.Collections;
024import java.util.Iterator;
025import java.util.List;
026import org.apache.hadoop.hbase.ServerName;
027import org.apache.hadoop.hbase.TableName;
028import org.apache.hadoop.hbase.client.RegionInfo;
029import org.apache.hadoop.hbase.client.RegionInfoBuilder;
030import org.apache.hadoop.hbase.client.RegionReplicaUtil;
031import org.apache.hadoop.hbase.master.MasterServices;
032import org.apache.hadoop.hbase.master.MasterWalManager;
033import org.apache.hadoop.hbase.master.assignment.AssignProcedure;
034import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
035import org.apache.hadoop.hbase.master.assignment.RegionTransitionProcedure;
036import org.apache.hadoop.hbase.procedure2.ProcedureMetrics;
037import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
038import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
039import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
040import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
041import org.apache.yetus.audience.InterfaceAudience;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044
045import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
046import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
047import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ServerCrashState;
048
049/**
050 * Handle crashed server. This is a port to ProcedureV2 of what used to be euphemistically called
051 * ServerShutdownHandler.
052 *
053 * <p>The procedure flow varies dependent on whether meta is assigned and if we are to split logs.
054 *
055 * <p>We come in here after ServerManager has noticed a server has expired. Procedures
056 * queued on the rpc should have been notified about fail and should be concurrently
057 * getting themselves ready to assign elsewhere.
058 */
059@InterfaceAudience.Private
060public class ServerCrashProcedure
061    extends StateMachineProcedure<MasterProcedureEnv, ServerCrashState>
062    implements ServerProcedureInterface {
063  private static final Logger LOG = LoggerFactory.getLogger(ServerCrashProcedure.class);
064
065  /**
066   * Name of the crashed server to process.
067   */
068  private ServerName serverName;
069
070  /**
071   * Whether DeadServer knows that we are processing it.
072   */
073  private boolean notifiedDeadServer = false;
074
075  /**
076   * Regions that were on the crashed server.
077   */
078  private List<RegionInfo> regionsOnCrashedServer;
079
080  private boolean carryingMeta = false;
081  private boolean shouldSplitWal;
082
083  /**
084   * Call this constructor queuing up a Procedure.
085   * @param serverName Name of the crashed server.
086   * @param shouldSplitWal True if we should split WALs as part of crashed server processing.
087   * @param carryingMeta True if carrying hbase:meta table region.
088   */
089  public ServerCrashProcedure(final MasterProcedureEnv env, final ServerName serverName,
090      final boolean shouldSplitWal, final boolean carryingMeta) {
091    this.serverName = serverName;
092    this.shouldSplitWal = shouldSplitWal;
093    this.carryingMeta = carryingMeta;
094    this.setOwner(env.getRequestUser());
095  }
096
097  /**
098   * Used when deserializing from a procedure store; we'll construct one of these then call
099   * #deserializeStateData(InputStream). Do not use directly.
100   */
101  public ServerCrashProcedure() {
102    super();
103  }
104
105  @Override
106  protected Flow executeFromState(MasterProcedureEnv env, ServerCrashState state)
107      throws ProcedureSuspendedException, ProcedureYieldException {
108    final MasterServices services = env.getMasterServices();
109    // HBASE-14802
110    // If we have not yet notified that we are processing a dead server, we should do now.
111    if (!notifiedDeadServer) {
112      services.getServerManager().getDeadServers().notifyServer(serverName);
113      notifiedDeadServer = true;
114    }
115
116    try {
117      switch (state) {
118        case SERVER_CRASH_START:
119          LOG.info("Start " + this);
120          // If carrying meta, process it first. Else, get list of regions on crashed server.
121          if (this.carryingMeta) {
122            setNextState(ServerCrashState.SERVER_CRASH_SPLIT_META_LOGS);
123          } else {
124            setNextState(ServerCrashState.SERVER_CRASH_GET_REGIONS);
125          }
126          break;
127        case SERVER_CRASH_SPLIT_META_LOGS:
128          splitMetaLogs(env);
129          setNextState(ServerCrashState.SERVER_CRASH_ASSIGN_META);
130          break;
131        case SERVER_CRASH_ASSIGN_META:
132          handleRIT(env, Arrays.asList(RegionInfoBuilder.FIRST_META_REGIONINFO));
133          addChildProcedure(env.getAssignmentManager()
134            .createAssignProcedure(RegionInfoBuilder.FIRST_META_REGIONINFO));
135          setNextState(ServerCrashState.SERVER_CRASH_GET_REGIONS);
136          break;
137        case SERVER_CRASH_PROCESS_META:
138          // not used any more but still leave it here to keep compatible as there maybe old SCP
139          // which is stored in ProcedureStore which has this state.
140          processMeta(env);
141          setNextState(ServerCrashState.SERVER_CRASH_GET_REGIONS);
142          break;
143        case SERVER_CRASH_GET_REGIONS:
144          // If hbase:meta is not assigned, yield.
145          if (env.getAssignmentManager().waitMetaLoaded(this)) {
146            throw new ProcedureSuspendedException();
147          }
148          this.regionsOnCrashedServer = services.getAssignmentManager().getRegionStates()
149            .getServerRegionInfoSet(serverName);
150          // Where to go next? Depends on whether we should split logs at all or
151          // if we should do distributed log splitting.
152          if (!this.shouldSplitWal) {
153            setNextState(ServerCrashState.SERVER_CRASH_ASSIGN);
154          } else {
155            setNextState(ServerCrashState.SERVER_CRASH_SPLIT_LOGS);
156          }
157          break;
158        case SERVER_CRASH_SPLIT_LOGS:
159          splitLogs(env);
160          setNextState(ServerCrashState.SERVER_CRASH_ASSIGN);
161          break;
162        case SERVER_CRASH_ASSIGN:
163          // If no regions to assign, skip assign and skip to the finish.
164          // Filter out meta regions. Those are handled elsewhere in this procedure.
165          // Filter changes this.regionsOnCrashedServer.
166          if (filterDefaultMetaRegions(regionsOnCrashedServer)) {
167            if (LOG.isTraceEnabled()) {
168              LOG.trace("Assigning regions " +
169                RegionInfo.getShortNameToLog(regionsOnCrashedServer) + ", " + this +
170                "; cycles=" + getCycles());
171            }
172            // Handle RIT against crashed server. Will cancel any ongoing assigns/unassigns.
173            // Returns list of regions we need to reassign.
174            // NOTE: there is nothing to stop a dispatch happening AFTER this point. Check for the
175            // condition if a dispatch RPC fails inside in AssignProcedure/UnassignProcedure.
176            // AssignProcedure just keeps retrying. UnassignProcedure is more complicated. See where
177            // it does the check by calling am#isLogSplittingDone.
178            List<RegionInfo> toAssign = handleRIT(env, regionsOnCrashedServer);
179            AssignmentManager am = env.getAssignmentManager();
180            // Do not create assigns for Regions on disabling or disabled Tables.
181            // We do this inside in the AssignProcedure.
182            int size = toAssign.size();
183            if (toAssign.removeIf(r -> !AssignProcedure.assign(env.getMasterServices(), r))) {
184              LOG.debug("Dropped {} assigns because against disabling/disabled tables",
185                  size - toAssign.size());
186            }
187            // CreateAssignProcedure will try to use the old location for the region deploy.
188            addChildProcedure(am.createAssignProcedures(toAssign));
189            setNextState(ServerCrashState.SERVER_CRASH_HANDLE_RIT2);
190          } else {
191            setNextState(ServerCrashState.SERVER_CRASH_FINISH);
192          }
193          break;
194        case SERVER_CRASH_HANDLE_RIT2:
195          // Noop. Left in place because we used to call handleRIT here for a second time
196          // but no longer necessary since HBASE-20634.
197          setNextState(ServerCrashState.SERVER_CRASH_FINISH);
198          break;
199        case SERVER_CRASH_FINISH:
200          services.getAssignmentManager().getRegionStates().removeServer(serverName);
201          services.getServerManager().getDeadServers().finish(serverName);
202          return Flow.NO_MORE_STATE;
203        default:
204          throw new UnsupportedOperationException("unhandled state=" + state);
205      }
206    } catch (IOException e) {
207      LOG.warn("Failed state=" + state + ", retry " + this + "; cycles=" + getCycles(), e);
208    }
209    return Flow.HAS_MORE_STATE;
210  }
211
212  private void processMeta(final MasterProcedureEnv env) throws IOException {
213    LOG.debug("{}; processing hbase:meta", this);
214
215    // Assign meta if still carrying it. Check again: region may be assigned because of RIT timeout
216    final AssignmentManager am = env.getMasterServices().getAssignmentManager();
217    for (RegionInfo hri: am.getRegionStates().getServerRegionInfoSet(serverName)) {
218      if (!isDefaultMetaRegion(hri)) {
219        continue;
220      }
221      addChildProcedure(new RecoverMetaProcedure(serverName, this.shouldSplitWal));
222    }
223  }
224
225  private boolean filterDefaultMetaRegions(final List<RegionInfo> regions) {
226    if (regions == null) return false;
227    regions.removeIf(this::isDefaultMetaRegion);
228    return !regions.isEmpty();
229  }
230
231  private boolean isDefaultMetaRegion(final RegionInfo hri) {
232    return hri.getTable().equals(TableName.META_TABLE_NAME) &&
233      RegionReplicaUtil.isDefaultReplica(hri);
234  }
235
236  private void splitMetaLogs(MasterProcedureEnv env) throws IOException {
237    LOG.debug("Splitting meta WALs {}", this);
238    MasterWalManager mwm = env.getMasterServices().getMasterWalManager();
239    AssignmentManager am = env.getMasterServices().getAssignmentManager();
240    am.getRegionStates().metaLogSplitting(serverName);
241    mwm.splitMetaLog(serverName);
242    am.getRegionStates().metaLogSplit(serverName);
243    LOG.debug("Done splitting meta WALs {}", this);
244  }
245
246  private void splitLogs(final MasterProcedureEnv env) throws IOException {
247    LOG.debug("Splitting WALs {}", this);
248    MasterWalManager mwm = env.getMasterServices().getMasterWalManager();
249    AssignmentManager am = env.getMasterServices().getAssignmentManager();
250    // TODO: For Matteo. Below BLOCKs!!!! Redo so can relinquish executor while it is running.
251    // PROBLEM!!! WE BLOCK HERE.
252    am.getRegionStates().logSplitting(this.serverName);
253    mwm.splitLog(this.serverName);
254    if (!carryingMeta) {
255      mwm.archiveMetaLog(this.serverName);
256    }
257    am.getRegionStates().logSplit(this.serverName);
258    LOG.debug("Done splitting WALs {}", this);
259  }
260
261  @Override
262  protected void rollbackState(MasterProcedureEnv env, ServerCrashState state)
263  throws IOException {
264    // Can't rollback.
265    throw new UnsupportedOperationException("unhandled state=" + state);
266  }
267
268  @Override
269  protected ServerCrashState getState(int stateId) {
270    return ServerCrashState.forNumber(stateId);
271  }
272
273  @Override
274  protected int getStateId(ServerCrashState state) {
275    return state.getNumber();
276  }
277
278  @Override
279  protected ServerCrashState getInitialState() {
280    return ServerCrashState.SERVER_CRASH_START;
281  }
282
283  @Override
284  protected boolean abort(MasterProcedureEnv env) {
285    // TODO
286    return false;
287  }
288
289  @Override
290  protected LockState acquireLock(final MasterProcedureEnv env) {
291    if (env.getProcedureScheduler().waitServerExclusiveLock(this, getServerName())) {
292      return LockState.LOCK_EVENT_WAIT;
293    }
294    return LockState.LOCK_ACQUIRED;
295  }
296
297  @Override
298  protected void releaseLock(final MasterProcedureEnv env) {
299    env.getProcedureScheduler().wakeServerExclusiveLock(this, getServerName());
300  }
301
302  @Override
303  public void toStringClassDetails(StringBuilder sb) {
304    sb.append(getClass().getSimpleName());
305    sb.append(" server=");
306    sb.append(serverName);
307    sb.append(", splitWal=");
308    sb.append(shouldSplitWal);
309    sb.append(", meta=");
310    sb.append(carryingMeta);
311  }
312
313  @Override
314  protected void serializeStateData(ProcedureStateSerializer serializer)
315      throws IOException {
316    super.serializeStateData(serializer);
317
318    MasterProcedureProtos.ServerCrashStateData.Builder state =
319      MasterProcedureProtos.ServerCrashStateData.newBuilder().
320      setServerName(ProtobufUtil.toServerName(this.serverName)).
321      setCarryingMeta(this.carryingMeta).
322      setShouldSplitWal(this.shouldSplitWal);
323    if (this.regionsOnCrashedServer != null && !this.regionsOnCrashedServer.isEmpty()) {
324      for (RegionInfo hri: this.regionsOnCrashedServer) {
325        state.addRegionsOnCrashedServer(ProtobufUtil.toRegionInfo(hri));
326      }
327    }
328    serializer.serialize(state.build());
329  }
330
331  @Override
332  protected void deserializeStateData(ProcedureStateSerializer serializer)
333      throws IOException {
334    super.deserializeStateData(serializer);
335
336    MasterProcedureProtos.ServerCrashStateData state =
337        serializer.deserialize(MasterProcedureProtos.ServerCrashStateData.class);
338    this.serverName = ProtobufUtil.toServerName(state.getServerName());
339    this.carryingMeta = state.hasCarryingMeta()? state.getCarryingMeta(): false;
340    // shouldSplitWAL has a default over in pb so this invocation will always work.
341    this.shouldSplitWal = state.getShouldSplitWal();
342    int size = state.getRegionsOnCrashedServerCount();
343    if (size > 0) {
344      this.regionsOnCrashedServer = new ArrayList<>(size);
345      for (org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionInfo ri: state.getRegionsOnCrashedServerList()) {
346        this.regionsOnCrashedServer.add(ProtobufUtil.toRegionInfo(ri));
347      }
348    }
349  }
350
351  @Override
352  public ServerName getServerName() {
353    return this.serverName;
354  }
355
356  @Override
357  public boolean hasMetaTableRegion() {
358    return this.carryingMeta;
359  }
360
361  @Override
362  public ServerOperationType getServerOperationType() {
363    return ServerOperationType.CRASH_HANDLER;
364  }
365
366  /**
367   * For this procedure, yield at end of each successful flow step so that all crashed servers
368   * can make progress rather than do the default which has each procedure running to completion
369   * before we move to the next. For crashed servers, especially if running with distributed log
370   * replay, we will want all servers to come along; we do not want the scenario where a server is
371   * stuck waiting for regions to online so it can replay edits.
372   */
373  @Override
374  protected boolean isYieldBeforeExecuteFromState(MasterProcedureEnv env, ServerCrashState state) {
375    return true;
376  }
377
378  @Override
379  protected boolean shouldWaitClientAck(MasterProcedureEnv env) {
380    // The operation is triggered internally on the server
381    // the client does not know about this procedure.
382    return false;
383  }
384
385  /**
386   * Handle any outstanding RIT that are up against this.serverName, the crashed server.
387   * Notify them of crash. Remove assign entries from the passed in <code>regions</code>
388   * otherwise we have two assigns going on and they will fight over who has lock.
389   * Notify Unassigns. If unable to unassign because server went away, unassigns block waiting
390   * on the below callback from a ServerCrashProcedure before proceeding.
391   * @param regions Regions on the Crashed Server.
392   * @return List of regions we should assign to new homes (not same as regions on crashed server).
393   */
394  private List<RegionInfo> handleRIT(final MasterProcedureEnv env, List<RegionInfo> regions) {
395    if (regions == null || regions.isEmpty()) {
396      return Collections.emptyList();
397    }
398    AssignmentManager am = env.getMasterServices().getAssignmentManager();
399    List<RegionInfo> toAssign = new ArrayList<RegionInfo>(regions);
400    // Get an iterator so can remove items.
401    final Iterator<RegionInfo> it = toAssign.iterator();
402    ServerCrashException sce = null;
403    while (it.hasNext()) {
404      final RegionInfo hri = it.next();
405      RegionTransitionProcedure rtp = am.getRegionStates().getRegionTransitionProcedure(hri);
406      if (rtp == null) {
407        continue;
408      }
409      // Make sure the RIT is against this crashed server. In the case where there are many
410      // processings of a crashed server -- backed up for whatever reason (slow WAL split) --
411      // then a previous SCP may have already failed an assign, etc., and it may have a new
412      // location target; DO NOT fail these else we make for assign flux.
413      ServerName rtpServerName = rtp.getServer(env);
414      if (rtpServerName == null) {
415        LOG.warn("RIT with ServerName null! " + rtp);
416        continue;
417      }
418      if (!rtpServerName.equals(this.serverName)) continue;
419      LOG.info("pid=" + getProcId() + " found RIT " + rtp + "; " +
420        rtp.getRegionState(env).toShortString());
421      // Notify RIT on server crash.
422      if (sce == null) {
423        sce = new ServerCrashException(getProcId(), getServerName());
424      }
425      if(rtp.remoteCallFailed(env, this.serverName, sce)) {
426        // If an assign, remove from passed-in list of regions so we subsequently do not create
427        // a new assign; the exisitng assign after the call to remoteCallFailed will recalibrate
428        // and assign to a server other than the crashed one; no need to create new assign.
429        // If an unassign, do not return this region; the above cancel will wake up the unassign and
430        // it will complete. Done.
431        it.remove();
432      }
433
434    }
435    return toAssign;
436  }
437
438  @Override
439  protected ProcedureMetrics getProcedureMetrics(MasterProcedureEnv env) {
440    return env.getMasterServices().getMasterMetrics().getServerCrashProcMetrics();
441  }
442}