001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.procedure;
019
020import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK;
021import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK;
022
023import java.io.IOException;
024import java.util.ArrayList;
025import java.util.Arrays;
026import java.util.List;
027import org.apache.hadoop.hbase.ServerName;
028import org.apache.hadoop.hbase.client.RegionInfo;
029import org.apache.hadoop.hbase.client.RegionInfoBuilder;
030import org.apache.hadoop.hbase.client.RegionReplicaUtil;
031import org.apache.hadoop.hbase.client.TableState;
032import org.apache.hadoop.hbase.master.MasterServices;
033import org.apache.hadoop.hbase.master.MasterWalManager;
034import org.apache.hadoop.hbase.master.SplitWALManager;
035import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
036import org.apache.hadoop.hbase.master.assignment.RegionStateNode;
037import org.apache.hadoop.hbase.master.assignment.TransitRegionStateProcedure;
038import org.apache.hadoop.hbase.monitoring.MonitoredTask;
039import org.apache.hadoop.hbase.monitoring.TaskMonitor;
040import org.apache.hadoop.hbase.procedure2.Procedure;
041import org.apache.hadoop.hbase.procedure2.ProcedureMetrics;
042import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
043import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
044import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
045import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
046import org.apache.yetus.audience.InterfaceAudience;
047import org.slf4j.Logger;
048import org.slf4j.LoggerFactory;
049
050import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
051import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
052import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ServerCrashState;
053
054/**
055 * Handle crashed server. This is a port to ProcedureV2 of what used to be euphemistically called
056 * ServerShutdownHandler.
057 *
058 * <p>The procedure flow varies dependent on whether meta is assigned and if we are to split logs.
059 *
060 * <p>We come in here after ServerManager has noticed a server has expired. Procedures
061 * queued on the rpc should have been notified about fail and should be concurrently
062 * getting themselves ready to assign elsewhere.
063 */
064@InterfaceAudience.Private
065public class ServerCrashProcedure
066    extends StateMachineProcedure<MasterProcedureEnv, ServerCrashState>
067    implements ServerProcedureInterface {
068  private static final Logger LOG = LoggerFactory.getLogger(ServerCrashProcedure.class);
069
070  /**
071   * Name of the crashed server to process.
072   */
073  private ServerName serverName;
074
075  /**
076   * Whether DeadServer knows that we are processing it.
077   */
078  private boolean notifiedDeadServer = false;
079
080  /**
081   * Regions that were on the crashed server.
082   */
083  private List<RegionInfo> regionsOnCrashedServer;
084
085  private boolean carryingMeta = false;
086  private boolean shouldSplitWal;
087  private MonitoredTask status;
088  // currentRunningState is updated when ServerCrashProcedure get scheduled, child procedures update
089  // progress will not update the state because the actual state is overwritten by its next state
090  private ServerCrashState currentRunningState = getInitialState();
091
092  /**
093   * Call this constructor queuing up a Procedure.
094   * @param serverName Name of the crashed server.
095   * @param shouldSplitWal True if we should split WALs as part of crashed server processing.
096   * @param carryingMeta True if carrying hbase:meta table region.
097   */
098  public ServerCrashProcedure(final MasterProcedureEnv env, final ServerName serverName,
099      final boolean shouldSplitWal, final boolean carryingMeta) {
100    this.serverName = serverName;
101    this.shouldSplitWal = shouldSplitWal;
102    this.carryingMeta = carryingMeta;
103    this.setOwner(env.getRequestUser());
104  }
105
106  /**
107   * Used when deserializing from a procedure store; we'll construct one of these then call
108   * #deserializeStateData(InputStream). Do not use directly.
109   */
110  public ServerCrashProcedure() {
111  }
112
113  public boolean isInRecoverMetaState() {
114    return getCurrentState() == ServerCrashState.SERVER_CRASH_PROCESS_META;
115  }
116
117  @Override
118  protected Flow executeFromState(MasterProcedureEnv env, ServerCrashState state)
119      throws ProcedureSuspendedException, ProcedureYieldException {
120    final MasterServices services = env.getMasterServices();
121    final AssignmentManager am = env.getAssignmentManager();
122    updateProgress(true);
123    // HBASE-14802 If we have not yet notified that we are processing a dead server, do so now.
124    // This adds server to the DeadServer processing list but not to the DeadServers list.
125    // Server gets removed from processing list below on procedure successful finish.
126    if (!notifiedDeadServer) {
127      services.getServerManager().getDeadServers().notifyServer(serverName);
128      notifiedDeadServer = true;
129    }
130
131    switch (state) {
132      case SERVER_CRASH_START:
133      case SERVER_CRASH_SPLIT_META_LOGS:
134      case SERVER_CRASH_DELETE_SPLIT_META_WALS_DIR:
135      case SERVER_CRASH_ASSIGN_META:
136        break;
137      default:
138        // If hbase:meta is not assigned, yield.
139        if (env.getAssignmentManager().waitMetaLoaded(this)) {
140          throw new ProcedureSuspendedException();
141        }
142    }
143    try {
144      switch (state) {
145        case SERVER_CRASH_START:
146          LOG.info("Start " + this);
147          // If carrying meta, process it first. Else, get list of regions on crashed server.
148          if (this.carryingMeta) {
149            setNextState(ServerCrashState.SERVER_CRASH_SPLIT_META_LOGS);
150          } else {
151            setNextState(ServerCrashState.SERVER_CRASH_GET_REGIONS);
152          }
153          break;
154        case SERVER_CRASH_SPLIT_META_LOGS:
155          if (env.getMasterConfiguration().getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK,
156            DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)) {
157            splitMetaLogs(env);
158            setNextState(ServerCrashState.SERVER_CRASH_ASSIGN_META);
159          } else {
160            am.getRegionStates().metaLogSplitting(serverName);
161            addChildProcedure(createSplittingWalProcedures(env, true));
162            setNextState(ServerCrashState.SERVER_CRASH_DELETE_SPLIT_META_WALS_DIR);
163          }
164          break;
165        case SERVER_CRASH_DELETE_SPLIT_META_WALS_DIR:
166          if(isSplittingDone(env, true)){
167            cleanupSplitDir(env);
168            setNextState(ServerCrashState.SERVER_CRASH_ASSIGN_META);
169            am.getRegionStates().metaLogSplit(serverName);
170          } else {
171            setNextState(ServerCrashState.SERVER_CRASH_SPLIT_META_LOGS);
172          }
173          break;
174        case SERVER_CRASH_ASSIGN_META:
175          assignRegions(env, Arrays.asList(RegionInfoBuilder.FIRST_META_REGIONINFO));
176          setNextState(ServerCrashState.SERVER_CRASH_GET_REGIONS);
177          break;
178        case SERVER_CRASH_GET_REGIONS:
179          this.regionsOnCrashedServer = getRegionsOnCrashedServer(env);
180          // Where to go next? Depends on whether we should split logs at all or
181          // if we should do distributed log splitting.
182          if (regionsOnCrashedServer != null) {
183            LOG.info("{} had {} regions", serverName, regionsOnCrashedServer.size());
184            if (LOG.isTraceEnabled()) {
185              this.regionsOnCrashedServer.stream().forEach(ri -> LOG.trace(ri.getShortNameToLog()));
186            }
187          }
188          if (!this.shouldSplitWal) {
189            setNextState(ServerCrashState.SERVER_CRASH_ASSIGN);
190          } else {
191            setNextState(ServerCrashState.SERVER_CRASH_SPLIT_LOGS);
192          }
193          break;
194        case SERVER_CRASH_SPLIT_LOGS:
195          if (env.getMasterConfiguration().getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK,
196            DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)) {
197            splitLogs(env);
198            setNextState(ServerCrashState.SERVER_CRASH_ASSIGN);
199          } else {
200            am.getRegionStates().logSplitting(this.serverName);
201            addChildProcedure(createSplittingWalProcedures(env, false));
202            setNextState(ServerCrashState.SERVER_CRASH_DELETE_SPLIT_WALS_DIR);
203          }
204          break;
205        case SERVER_CRASH_DELETE_SPLIT_WALS_DIR:
206          if (isSplittingDone(env, false)) {
207            cleanupSplitDir(env);
208            setNextState(ServerCrashState.SERVER_CRASH_ASSIGN);
209            am.getRegionStates().logSplit(this.serverName);
210          } else {
211            setNextState(ServerCrashState.SERVER_CRASH_SPLIT_LOGS);
212          }
213          break;
214        case SERVER_CRASH_ASSIGN:
215          // If no regions to assign, skip assign and skip to the finish.
216          // Filter out meta regions. Those are handled elsewhere in this procedure.
217          // Filter changes this.regionsOnCrashedServer.
218          if (filterDefaultMetaRegions()) {
219            if (LOG.isTraceEnabled()) {
220              LOG
221                .trace("Assigning regions " + RegionInfo.getShortNameToLog(regionsOnCrashedServer) +
222                  ", " + this + "; cycles=" + getCycles());
223            }
224            assignRegions(env, regionsOnCrashedServer);
225          }
226          setNextState(ServerCrashState.SERVER_CRASH_FINISH);
227          break;
228        case SERVER_CRASH_HANDLE_RIT2:
229          // Noop. Left in place because we used to call handleRIT here for a second time
230          // but no longer necessary since HBASE-20634.
231          setNextState(ServerCrashState.SERVER_CRASH_FINISH);
232          break;
233        case SERVER_CRASH_FINISH:
234          LOG.info("removed crashed server {} after splitting done", serverName);
235          services.getAssignmentManager().getRegionStates().removeServer(serverName);
236          services.getServerManager().getDeadServers().finish(serverName);
237          updateProgress(true);
238          return Flow.NO_MORE_STATE;
239        default:
240          throw new UnsupportedOperationException("unhandled state=" + state);
241      }
242    } catch (IOException e) {
243      LOG.warn("Failed state=" + state + ", retry " + this + "; cycles=" + getCycles(), e);
244    }
245    return Flow.HAS_MORE_STATE;
246  }
247
248  /**
249   * @return List of Regions on crashed server.
250   */
251  List<RegionInfo> getRegionsOnCrashedServer(MasterProcedureEnv env) {
252    return env.getMasterServices().getAssignmentManager().getRegionsOnServer(serverName);
253  }
254
255  private void cleanupSplitDir(MasterProcedureEnv env) {
256    SplitWALManager splitWALManager = env.getMasterServices().getSplitWALManager();
257    try {
258      splitWALManager.deleteWALDir(serverName);
259    } catch (IOException e) {
260      LOG.warn("remove WAL directory of server {} failed, ignore...", serverName, e);
261    }
262  }
263
264  private boolean isSplittingDone(MasterProcedureEnv env, boolean splitMeta) {
265    LOG.debug("check if splitting WALs of {} done? isMeta: {}", serverName, splitMeta);
266    SplitWALManager splitWALManager = env.getMasterServices().getSplitWALManager();
267    try {
268      return splitWALManager.getWALsToSplit(serverName, splitMeta).size() == 0;
269    } catch (IOException e) {
270      LOG.warn("get filelist of serverName {} failed, retry...", serverName, e);
271      return false;
272    }
273  }
274
275  private Procedure[] createSplittingWalProcedures(MasterProcedureEnv env, boolean splitMeta)
276      throws IOException {
277    LOG.info("Splitting WALs {}, isMeta: {}", this, splitMeta);
278    SplitWALManager splitWALManager = env.getMasterServices().getSplitWALManager();
279    List<Procedure> procedures = splitWALManager.splitWALs(serverName, splitMeta);
280    return procedures.toArray(new Procedure[procedures.size()]);
281  }
282
283  private boolean filterDefaultMetaRegions() {
284    if (regionsOnCrashedServer == null) {
285      return false;
286    }
287    regionsOnCrashedServer.removeIf(this::isDefaultMetaRegion);
288    return !regionsOnCrashedServer.isEmpty();
289  }
290
291  private boolean isDefaultMetaRegion(RegionInfo hri) {
292    return hri.isMetaRegion() && RegionReplicaUtil.isDefaultReplica(hri);
293  }
294
295  private void splitMetaLogs(MasterProcedureEnv env) throws IOException {
296    LOG.debug("Splitting meta WALs {}", this);
297    MasterWalManager mwm = env.getMasterServices().getMasterWalManager();
298    AssignmentManager am = env.getMasterServices().getAssignmentManager();
299    am.getRegionStates().metaLogSplitting(serverName);
300    mwm.splitMetaLog(serverName);
301    am.getRegionStates().metaLogSplit(serverName);
302    LOG.debug("Done splitting meta WALs {}", this);
303  }
304
305  private void splitLogs(final MasterProcedureEnv env) throws IOException {
306    LOG.debug("Splitting WALs {}", this);
307    MasterWalManager mwm = env.getMasterServices().getMasterWalManager();
308    AssignmentManager am = env.getMasterServices().getAssignmentManager();
309    // TODO: For Matteo. Below BLOCKs!!!! Redo so can relinquish executor while it is running.
310    // PROBLEM!!! WE BLOCK HERE.
311    am.getRegionStates().logSplitting(this.serverName);
312    mwm.splitLog(this.serverName);
313    if (!carryingMeta) {
314      mwm.archiveMetaLog(this.serverName);
315    }
316    am.getRegionStates().logSplit(this.serverName);
317    LOG.debug("Done splitting WALs {}", this);
318  }
319
320  void updateProgress(boolean updateState) {
321    String msg = "Processing ServerCrashProcedure of " + serverName;
322    if (status == null) {
323      status = TaskMonitor.get().createStatus(msg);
324      return;
325    }
326    if (currentRunningState == ServerCrashState.SERVER_CRASH_FINISH) {
327      status.markComplete(msg + " done");
328      return;
329    }
330    if (updateState) {
331      currentRunningState = getCurrentState();
332    }
333    int childrenLatch = getChildrenLatch();
334    status.setStatus(msg + " current State " + currentRunningState
335        + (childrenLatch > 0 ? "; remaining num of running child procedures = " + childrenLatch
336            : ""));
337  }
338
339  @Override
340  protected void rollbackState(MasterProcedureEnv env, ServerCrashState state)
341  throws IOException {
342    // Can't rollback.
343    throw new UnsupportedOperationException("unhandled state=" + state);
344  }
345
346  @Override
347  protected ServerCrashState getState(int stateId) {
348    return ServerCrashState.forNumber(stateId);
349  }
350
351  @Override
352  protected int getStateId(ServerCrashState state) {
353    return state.getNumber();
354  }
355
356  @Override
357  protected ServerCrashState getInitialState() {
358    return ServerCrashState.SERVER_CRASH_START;
359  }
360
361  @Override
362  protected boolean abort(MasterProcedureEnv env) {
363    // TODO
364    return false;
365  }
366
367  @Override
368  protected LockState acquireLock(final MasterProcedureEnv env) {
369    if (env.getProcedureScheduler().waitServerExclusiveLock(this, getServerName())) {
370      return LockState.LOCK_EVENT_WAIT;
371    }
372    return LockState.LOCK_ACQUIRED;
373  }
374
375  @Override
376  protected void releaseLock(final MasterProcedureEnv env) {
377    env.getProcedureScheduler().wakeServerExclusiveLock(this, getServerName());
378  }
379
380  @Override
381  public void toStringClassDetails(StringBuilder sb) {
382    sb.append(getClass().getSimpleName());
383    sb.append(" server=");
384    sb.append(serverName);
385    sb.append(", splitWal=");
386    sb.append(shouldSplitWal);
387    sb.append(", meta=");
388    sb.append(carryingMeta);
389  }
390
391  @Override
392  protected void serializeStateData(ProcedureStateSerializer serializer)
393      throws IOException {
394    super.serializeStateData(serializer);
395
396    MasterProcedureProtos.ServerCrashStateData.Builder state =
397      MasterProcedureProtos.ServerCrashStateData.newBuilder().
398      setServerName(ProtobufUtil.toServerName(this.serverName)).
399      setCarryingMeta(this.carryingMeta).
400      setShouldSplitWal(this.shouldSplitWal);
401    if (this.regionsOnCrashedServer != null && !this.regionsOnCrashedServer.isEmpty()) {
402      for (RegionInfo hri: this.regionsOnCrashedServer) {
403        state.addRegionsOnCrashedServer(ProtobufUtil.toRegionInfo(hri));
404      }
405    }
406    serializer.serialize(state.build());
407  }
408
409  @Override
410  protected void deserializeStateData(ProcedureStateSerializer serializer)
411      throws IOException {
412    super.deserializeStateData(serializer);
413
414    MasterProcedureProtos.ServerCrashStateData state =
415        serializer.deserialize(MasterProcedureProtos.ServerCrashStateData.class);
416    this.serverName = ProtobufUtil.toServerName(state.getServerName());
417    this.carryingMeta = state.hasCarryingMeta()? state.getCarryingMeta(): false;
418    // shouldSplitWAL has a default over in pb so this invocation will always work.
419    this.shouldSplitWal = state.getShouldSplitWal();
420    int size = state.getRegionsOnCrashedServerCount();
421    if (size > 0) {
422      this.regionsOnCrashedServer = new ArrayList<>(size);
423      for (org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionInfo ri: state.getRegionsOnCrashedServerList()) {
424        this.regionsOnCrashedServer.add(ProtobufUtil.toRegionInfo(ri));
425      }
426    }
427    updateProgress(false);
428  }
429
430  @Override
431  public ServerName getServerName() {
432    return this.serverName;
433  }
434
435  @Override
436  public boolean hasMetaTableRegion() {
437    return this.carryingMeta;
438  }
439
440  @Override
441  public ServerOperationType getServerOperationType() {
442    return ServerOperationType.CRASH_HANDLER;
443  }
444
445
446  @Override
447  protected boolean shouldWaitClientAck(MasterProcedureEnv env) {
448    // The operation is triggered internally on the server
449    // the client does not know about this procedure.
450    return false;
451  }
452
453  /**
454   * Assign the regions on the crashed RS to other Rses.
455   * <p/>
456   * In this method we will go through all the RegionStateNodes of the give regions to find out
457   * whether there is already an TRSP for the region, if so we interrupt it and let it retry on
458   * other server, otherwise we will schedule a TRSP to bring the region online.
459   * <p/>
460   * We will also check whether the table for a region is enabled, if not, we will skip assigning
461   * it.
462   */
463  private void assignRegions(MasterProcedureEnv env, List<RegionInfo> regions) throws IOException {
464    AssignmentManager am = env.getMasterServices().getAssignmentManager();
465    for (RegionInfo region : regions) {
466      RegionStateNode regionNode = am.getRegionStates().getOrCreateRegionStateNode(region);
467      regionNode.lock();
468      try {
469        // This is possible, as when a server is dead, TRSP will fail to schedule a RemoteProcedure
470        // to us and then try to assign the region to a new RS. And before it has updated the region
471        // location to the new RS, we may have already called the am.getRegionsOnServer so we will
472        // consider the region is still on us. And then before we arrive here, the TRSP could have
473        // updated the region location, or even finished itself, so the region is no longer on us
474        // any more, we should not try to assign it again. Please see HBASE-23594 for more details.
475        if (!serverName.equals(regionNode.getRegionLocation())) {
476          LOG.info("{} found a region {} which is no longer on us {}, give up assigning...", this,
477            regionNode, serverName);
478          continue;
479        }
480        if (regionNode.getProcedure() != null) {
481          LOG.info("{} found RIT {}; {}", this, regionNode.getProcedure(), regionNode);
482          regionNode.getProcedure().serverCrashed(env, regionNode, getServerName());
483          continue;
484        }
485        if (env.getMasterServices().getTableStateManager()
486          .isTableState(regionNode.getTable(), TableState.State.DISABLING)) {
487          // We need to change the state here otherwise the TRSP scheduled by DTP will try to
488          // close the region from a dead server and will never succeed. Please see HBASE-23636
489          // for more details.
490          env.getAssignmentManager().regionClosedAbnormally(regionNode);
491          LOG.info("{} found table disabling for region {}, set it state to ABNORMALLY_CLOSED.",
492            this, regionNode);
493          continue;
494        }
495        if (env.getMasterServices().getTableStateManager()
496          .isTableState(regionNode.getTable(), TableState.State.DISABLED)) {
497          // This should not happen, table disabled but has regions on server.
498          LOG.warn("Found table disabled for region {}, procDetails: {}", regionNode, this);
499          continue;
500        }
501        // force to assign to a new candidate server, see HBASE-23035 for more details.
502        TransitRegionStateProcedure proc =
503          TransitRegionStateProcedure.assign(env, region, true, null);
504        regionNode.setProcedure(proc);
505        addChildProcedure(proc);
506      } finally {
507        regionNode.unlock();
508      }
509    }
510  }
511
512  @Override
513  protected ProcedureMetrics getProcedureMetrics(MasterProcedureEnv env) {
514    return env.getMasterServices().getMasterMetrics().getServerCrashProcMetrics();
515  }
516
517  @Override
518  protected boolean holdLock(MasterProcedureEnv env) {
519    return true;
520  }
521
522  public static void updateProgress(MasterProcedureEnv env, long parentId) {
523    if (parentId == NO_PROC_ID) {
524      return;
525    }
526    Procedure parentProcedure =
527        env.getMasterServices().getMasterProcedureExecutor().getProcedure(parentId);
528    if (parentProcedure != null && parentProcedure instanceof ServerCrashProcedure) {
529      ((ServerCrashProcedure) parentProcedure).updateProgress(false);
530    }
531  }
532}