001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.replication;
019
020import java.io.IOException;
021import java.util.Arrays;
022import java.util.List;
023import org.apache.hadoop.fs.Path;
024import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
025import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
026import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
027import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
028import org.apache.yetus.audience.InterfaceAudience;
029import org.slf4j.Logger;
030import org.slf4j.LoggerFactory;
031
032import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RecoverStandbyState;
033import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RecoverStandbyStateData;
034
035/**
036 * The procedure for replaying all the remote wals for transitting a sync replication peer from
037 * STANDBY to DOWNGRADE_ACTIVE.
038 */
039@InterfaceAudience.Private
040public class RecoverStandbyProcedure extends AbstractPeerNoLockProcedure<RecoverStandbyState> {
041
042  private static final Logger LOG = LoggerFactory.getLogger(RecoverStandbyProcedure.class);
043
044  private boolean serial;
045
046  public RecoverStandbyProcedure() {
047  }
048
049  public RecoverStandbyProcedure(String peerId, boolean serial) {
050    super(peerId);
051    this.serial = serial;
052  }
053
054  @Override
055  protected Flow executeFromState(MasterProcedureEnv env, RecoverStandbyState state)
056    throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
057    SyncReplicationReplayWALManager syncReplicationReplayWALManager =
058      env.getMasterServices().getSyncReplicationReplayWALManager();
059    switch (state) {
060      case RENAME_SYNC_REPLICATION_WALS_DIR:
061        try {
062          syncReplicationReplayWALManager.renameToPeerReplayWALDir(peerId);
063        } catch (IOException e) {
064          LOG.warn("Failed to rename remote wal dir for peer id={}", peerId, e);
065          setFailure("master-recover-standby", e);
066          return Flow.NO_MORE_STATE;
067        }
068        setNextState(RecoverStandbyState.REGISTER_PEER_TO_WORKER_STORAGE);
069        return Flow.HAS_MORE_STATE;
070      case REGISTER_PEER_TO_WORKER_STORAGE:
071        syncReplicationReplayWALManager.registerPeer(peerId);
072        setNextState(RecoverStandbyState.DISPATCH_WALS);
073        return Flow.HAS_MORE_STATE;
074      case DISPATCH_WALS:
075        dispathWals(syncReplicationReplayWALManager);
076        setNextState(RecoverStandbyState.UNREGISTER_PEER_FROM_WORKER_STORAGE);
077        return Flow.HAS_MORE_STATE;
078      case UNREGISTER_PEER_FROM_WORKER_STORAGE:
079        syncReplicationReplayWALManager.unregisterPeer(peerId);
080        setNextState(RecoverStandbyState.SNAPSHOT_SYNC_REPLICATION_WALS_DIR);
081        return Flow.HAS_MORE_STATE;
082      case SNAPSHOT_SYNC_REPLICATION_WALS_DIR:
083        try {
084          syncReplicationReplayWALManager.renameToPeerSnapshotWALDir(peerId);
085        } catch (IOException e) {
086          LOG.warn("Failed to cleanup replay wals dir for peer id={}, , retry", peerId, e);
087          throw new ProcedureYieldException();
088        }
089        return Flow.NO_MORE_STATE;
090      default:
091        throw new UnsupportedOperationException("unhandled state=" + state);
092    }
093  }
094
095  // TODO: dispatch wals by region server when serial is true and sort wals
096  private void dispathWals(SyncReplicationReplayWALManager syncReplicationReplayWALManager)
097    throws ProcedureYieldException {
098    try {
099      List<Path> wals = syncReplicationReplayWALManager.getReplayWALsAndCleanUpUnusedFiles(peerId);
100      addChildProcedure(wals.stream()
101        .map(wal -> new SyncReplicationReplayWALProcedure(peerId,
102          Arrays.asList(syncReplicationReplayWALManager.removeWALRootPath(wal))))
103        .toArray(SyncReplicationReplayWALProcedure[]::new));
104    } catch (IOException e) {
105      LOG.warn("Failed to get replay wals for peer id={}, , retry", peerId, e);
106      throw new ProcedureYieldException();
107    }
108  }
109
110  @Override
111  protected RecoverStandbyState getState(int stateId) {
112    return RecoverStandbyState.forNumber(stateId);
113  }
114
115  @Override
116  protected int getStateId(RecoverStandbyState state) {
117    return state.getNumber();
118  }
119
120  @Override
121  protected RecoverStandbyState getInitialState() {
122    return RecoverStandbyState.RENAME_SYNC_REPLICATION_WALS_DIR;
123  }
124
125  @Override
126  public PeerOperationType getPeerOperationType() {
127    return PeerOperationType.RECOVER_STANDBY;
128  }
129
130  @Override
131  protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
132    super.serializeStateData(serializer);
133    serializer.serialize(RecoverStandbyStateData.newBuilder().setSerial(serial).build());
134  }
135
136  @Override
137  protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
138    super.deserializeStateData(serializer);
139    RecoverStandbyStateData data = serializer.deserialize(RecoverStandbyStateData.class);
140    serial = data.getSerial();
141  }
142
143  @Override
144  protected void afterReplay(MasterProcedureEnv env) {
145    // For these two states, we need to register the peer to the replay manager, as the state are
146    // only kept in memory and will be lost after restarting. And in
147    // SyncReplicationReplayWALProcedure.afterReplay we will reconstruct the used workers.
148    switch (getCurrentState()) {
149      case DISPATCH_WALS:
150      case UNREGISTER_PEER_FROM_WORKER_STORAGE:
151        env.getMasterServices().getSyncReplicationReplayWALManager().registerPeer(peerId);
152        break;
153      default:
154        break;
155    }
156  }
157}