001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.replication;
019
020import java.io.IOException;
021import org.apache.hadoop.fs.FileSystem;
022import org.apache.hadoop.fs.Path;
023import org.apache.hadoop.hbase.DoNotRetryIOException;
024import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
025import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
026import org.apache.hadoop.hbase.master.MasterFileSystem;
027import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
028import org.apache.hadoop.hbase.master.procedure.ReopenTableRegionsProcedure;
029import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
030import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
031import org.apache.hadoop.hbase.replication.ReplicationException;
032import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
033import org.apache.hadoop.hbase.replication.ReplicationUtils;
034import org.apache.hadoop.hbase.replication.SyncReplicationState;
035import org.apache.yetus.audience.InterfaceAudience;
036import org.slf4j.Logger;
037import org.slf4j.LoggerFactory;
038
039import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerSyncReplicationStateTransitionState;
040import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.TransitPeerSyncReplicationStateStateData;
041
042/**
043 * The procedure for transit current sync replication state for a synchronous replication peer.
044 */
045@InterfaceAudience.Private
046public class TransitPeerSyncReplicationStateProcedure
047  extends AbstractPeerProcedure<PeerSyncReplicationStateTransitionState> {
048
049  private static final Logger LOG =
050    LoggerFactory.getLogger(TransitPeerSyncReplicationStateProcedure.class);
051
052  protected SyncReplicationState fromState;
053
054  private SyncReplicationState toState;
055
056  private boolean enabled;
057
058  private boolean serial;
059
060  public TransitPeerSyncReplicationStateProcedure() {
061  }
062
063  public TransitPeerSyncReplicationStateProcedure(String peerId, SyncReplicationState state) {
064    super(peerId);
065    this.toState = state;
066  }
067
068  @Override
069  public PeerOperationType getPeerOperationType() {
070    return PeerOperationType.TRANSIT_SYNC_REPLICATION_STATE;
071  }
072
073  @Override
074  protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
075    super.serializeStateData(serializer);
076    TransitPeerSyncReplicationStateStateData.Builder builder =
077      TransitPeerSyncReplicationStateStateData.newBuilder()
078        .setToState(ReplicationPeerConfigUtil.toSyncReplicationState(toState));
079    if (fromState != null) {
080      builder.setFromState(ReplicationPeerConfigUtil.toSyncReplicationState(fromState));
081    }
082    serializer.serialize(builder.build());
083  }
084
085  @Override
086  protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
087    super.deserializeStateData(serializer);
088    TransitPeerSyncReplicationStateStateData data =
089      serializer.deserialize(TransitPeerSyncReplicationStateStateData.class);
090    toState = ReplicationPeerConfigUtil.toSyncReplicationState(data.getToState());
091    if (data.hasFromState()) {
092      fromState = ReplicationPeerConfigUtil.toSyncReplicationState(data.getFromState());
093    }
094  }
095
096  @Override
097  protected PeerSyncReplicationStateTransitionState getState(int stateId) {
098    return PeerSyncReplicationStateTransitionState.forNumber(stateId);
099  }
100
101  @Override
102  protected int getStateId(PeerSyncReplicationStateTransitionState state) {
103    return state.getNumber();
104  }
105
106  @Override
107  protected PeerSyncReplicationStateTransitionState getInitialState() {
108    return PeerSyncReplicationStateTransitionState.PRE_PEER_SYNC_REPLICATION_STATE_TRANSITION;
109  }
110
111  protected void preTransit(MasterProcedureEnv env) throws IOException {
112    MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
113    if (cpHost != null) {
114      cpHost.preTransitReplicationPeerSyncReplicationState(peerId, toState);
115    }
116    ReplicationPeerDescription desc =
117      env.getReplicationPeerManager().preTransitPeerSyncReplicationState(peerId, toState);
118    if (toState == SyncReplicationState.ACTIVE) {
119      Path remoteWALDirForPeer =
120        ReplicationUtils.getPeerRemoteWALDir(desc.getPeerConfig().getRemoteWALDir(), peerId);
121      // check whether the remote wal directory is present
122      if (
123        !remoteWALDirForPeer.getFileSystem(env.getMasterConfiguration()).exists(remoteWALDirForPeer)
124      ) {
125        throw new DoNotRetryIOException(
126          "The remote WAL directory " + remoteWALDirForPeer + " does not exist");
127      }
128    }
129    fromState = desc.getSyncReplicationState();
130    enabled = desc.isEnabled();
131    serial = desc.getPeerConfig().isSerial();
132  }
133
134  private void postTransit(MasterProcedureEnv env) throws IOException {
135    LOG.info(
136      "Successfully transit current cluster state from {} to {} for sync replication peer {}",
137      fromState, toState, peerId);
138    MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
139    if (cpHost != null) {
140      env.getMasterCoprocessorHost().postTransitReplicationPeerSyncReplicationState(peerId,
141        fromState, toState);
142    }
143  }
144
145  protected void reopenRegions(MasterProcedureEnv env) {
146    addChildProcedure(
147      env.getReplicationPeerManager().getPeerConfig(peerId).get().getTableCFsMap().keySet().stream()
148        .map(ReopenTableRegionsProcedure::new).toArray(ReopenTableRegionsProcedure[]::new));
149  }
150
151  protected void createDirForRemoteWAL(MasterProcedureEnv env) throws IOException {
152    MasterFileSystem mfs = env.getMasterFileSystem();
153    Path remoteWALDir = new Path(mfs.getWALRootDir(), ReplicationUtils.REMOTE_WAL_DIR_NAME);
154    Path remoteWALDirForPeer = ReplicationUtils.getPeerRemoteWALDir(remoteWALDir, peerId);
155    FileSystem walFs = mfs.getWALFileSystem();
156    if (walFs.exists(remoteWALDirForPeer)) {
157      LOG.warn("Wal dir {} already exists, usually this should not happen, continue anyway",
158        remoteWALDirForPeer);
159    } else if (!walFs.mkdirs(remoteWALDirForPeer)) {
160      throw new IOException("Failed to create remote wal dir " + remoteWALDirForPeer);
161    }
162  }
163
164  private void setNextStateAfterRefreshBegin() {
165    if (fromState.equals(SyncReplicationState.ACTIVE)) {
166      setNextState(toState.equals(SyncReplicationState.STANDBY)
167        ? PeerSyncReplicationStateTransitionState.REMOVE_ALL_REPLICATION_QUEUES_IN_PEER
168        : PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER);
169    } else if (fromState.equals(SyncReplicationState.DOWNGRADE_ACTIVE)) {
170      setNextState(toState.equals(SyncReplicationState.STANDBY)
171        ? PeerSyncReplicationStateTransitionState.REMOVE_ALL_REPLICATION_QUEUES_IN_PEER
172        : PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER);
173    } else {
174      assert toState.equals(SyncReplicationState.DOWNGRADE_ACTIVE);
175      // for serial peer, we need to reopen all the regions and then update the last pushed sequence
176      // id, before replaying any remote wals, so that the serial replication will not be stuck, and
177      // also guarantee the order when replicating the remote wal back.
178      setNextState(serial
179        ? PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER
180        : PeerSyncReplicationStateTransitionState.REPLAY_REMOTE_WAL_IN_PEER);
181    }
182  }
183
184  private void setNextStateAfterRefreshEnd() {
185    if (toState == SyncReplicationState.STANDBY) {
186      setNextState(enabled
187        ? PeerSyncReplicationStateTransitionState.SYNC_REPLICATION_SET_PEER_ENABLED
188        : PeerSyncReplicationStateTransitionState.CREATE_DIR_FOR_REMOTE_WAL);
189    } else if (fromState == SyncReplicationState.STANDBY) {
190      assert toState.equals(SyncReplicationState.DOWNGRADE_ACTIVE);
191      setNextState(serial && enabled
192        ? PeerSyncReplicationStateTransitionState.SYNC_REPLICATION_SET_PEER_ENABLED
193        : PeerSyncReplicationStateTransitionState.POST_PEER_SYNC_REPLICATION_STATE_TRANSITION);
194    } else {
195      setNextState(
196        PeerSyncReplicationStateTransitionState.POST_PEER_SYNC_REPLICATION_STATE_TRANSITION);
197    }
198  }
199
200  private void replayRemoteWAL(boolean serial) {
201    addChildProcedure(new RecoverStandbyProcedure(peerId, serial));
202  }
203
204  protected void setPeerNewSyncReplicationState(MasterProcedureEnv env)
205    throws ReplicationException {
206    if (
207      toState.equals(SyncReplicationState.STANDBY)
208        || (fromState.equals(SyncReplicationState.STANDBY) && serial) && enabled
209    ) {
210      // Disable the peer if we are going to transit to STANDBY state, as we need to remove
211      // all the pending replication files. If we do not disable the peer and delete the wal
212      // queues on zk directly, RS will get NoNode exception when updating the wal position
213      // and crash.
214      // Disable the peer if we are going to transit from STANDBY to DOWNGRADE_ACTIVE, and the
215      // replication is serial, as we need to update the lastPushedSequence id after we reopen all
216      // the regions, and for performance reason here we will update in batch, without using CAS, if
217      // we are still replicating at RS side, we may accidentally update the last pushed sequence id
218      // to a less value and cause the replication to be stuck.
219      env.getReplicationPeerManager().disablePeer(peerId);
220    }
221    env.getReplicationPeerManager().setPeerNewSyncReplicationState(peerId, toState);
222  }
223
224  protected void removeAllReplicationQueues(MasterProcedureEnv env) throws ReplicationException {
225    env.getReplicationPeerManager().removeAllQueues(peerId);
226  }
227
228  protected void transitPeerSyncReplicationState(MasterProcedureEnv env)
229    throws ReplicationException {
230    env.getReplicationPeerManager().transitPeerSyncReplicationState(peerId, toState);
231  }
232
233  @Override
234  protected Flow executeFromState(MasterProcedureEnv env,
235    PeerSyncReplicationStateTransitionState state) throws ProcedureSuspendedException {
236    switch (state) {
237      case PRE_PEER_SYNC_REPLICATION_STATE_TRANSITION:
238        try {
239          if (
240            env.getMasterServices().getProcedures().stream()
241              .filter(p -> p instanceof MigrateReplicationQueueFromZkToTableProcedure)
242              .anyMatch(p -> !p.isFinished())
243          ) {
244            LOG.info("There is a pending {}, give up execution of {}",
245              MigrateReplicationQueueFromZkToTableProcedure.class.getSimpleName(),
246              getClass().getSimpleName());
247            setFailure("master-transit-peer-sync-replication-state",
248              new DoNotRetryIOException("There is a pending "
249                + MigrateReplicationQueueFromZkToTableProcedure.class.getSimpleName()));
250            return Flow.NO_MORE_STATE;
251          }
252          checkPeerModificationEnabled(env);
253          preTransit(env);
254        } catch (IOException e) {
255          LOG.warn("Failed to call pre CP hook or the pre check is failed for peer {} "
256            + "when transiting sync replication peer state to {}, "
257            + "mark the procedure as failure and give up", peerId, toState, e);
258          setFailure("master-transit-peer-sync-replication-state", e);
259          return Flow.NO_MORE_STATE;
260        }
261        setNextState(PeerSyncReplicationStateTransitionState.SET_PEER_NEW_SYNC_REPLICATION_STATE);
262        return Flow.HAS_MORE_STATE;
263      case SET_PEER_NEW_SYNC_REPLICATION_STATE:
264        try {
265          setPeerNewSyncReplicationState(env);
266        } catch (ReplicationException e) {
267          throw suspend(env.getMasterConfiguration(),
268            backoff -> LOG.warn(
269              "Failed to update peer storage for peer {} when starting transiting sync "
270                + "replication peer state from {} to {}, sleep {} secs and retry",
271              peerId, fromState, toState, backoff / 1000, e));
272        }
273        resetRetry();
274        setNextState(
275          PeerSyncReplicationStateTransitionState.REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_BEGIN);
276        return Flow.HAS_MORE_STATE;
277      case REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_BEGIN:
278        addChildProcedure(env.getMasterServices().getServerManager().getOnlineServersList().stream()
279          .map(sn -> new RefreshPeerProcedure(peerId, getPeerOperationType(), sn, 0))
280          .toArray(RefreshPeerProcedure[]::new));
281        setNextStateAfterRefreshBegin();
282        return Flow.HAS_MORE_STATE;
283      case REOPEN_ALL_REGIONS_IN_PEER:
284        reopenRegions(env);
285        if (fromState.equals(SyncReplicationState.STANDBY)) {
286          assert serial;
287          setNextState(
288            PeerSyncReplicationStateTransitionState.SYNC_REPLICATION_UPDATE_LAST_PUSHED_SEQ_ID_FOR_SERIAL_PEER);
289        } else {
290          setNextState(
291            PeerSyncReplicationStateTransitionState.TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE);
292        }
293        return Flow.HAS_MORE_STATE;
294      case SYNC_REPLICATION_UPDATE_LAST_PUSHED_SEQ_ID_FOR_SERIAL_PEER:
295        try {
296          setLastPushedSequenceId(env, env.getReplicationPeerManager().getPeerConfig(peerId).get());
297        } catch (Exception e) {
298          throw suspend(env.getMasterConfiguration(),
299            backoff -> LOG.warn(
300              "Failed to update last pushed sequence id for peer {} when transiting sync "
301                + "replication peer state from {} to {}, sleep {} secs and retry",
302              peerId, fromState, toState, backoff / 1000, e));
303        }
304        resetRetry();
305        setNextState(PeerSyncReplicationStateTransitionState.REPLAY_REMOTE_WAL_IN_PEER);
306        return Flow.HAS_MORE_STATE;
307      case REPLAY_REMOTE_WAL_IN_PEER:
308        replayRemoteWAL(env.getReplicationPeerManager().getPeerConfig(peerId).get().isSerial());
309        setNextState(
310          PeerSyncReplicationStateTransitionState.TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE);
311        return Flow.HAS_MORE_STATE;
312      case REMOVE_ALL_REPLICATION_QUEUES_IN_PEER:
313        try {
314          removeAllReplicationQueues(env);
315        } catch (ReplicationException e) {
316          throw suspend(env.getMasterConfiguration(),
317            backoff -> LOG.warn(
318              "Failed to remove all replication queues peer {} when starting transiting"
319                + " sync replication peer state from {} to {}, sleep {} secs and retry",
320              peerId, fromState, toState, backoff / 1000, e));
321        }
322        resetRetry();
323        setNextState(fromState.equals(SyncReplicationState.ACTIVE)
324          ? PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER
325          : PeerSyncReplicationStateTransitionState.TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE);
326        return Flow.HAS_MORE_STATE;
327      case TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE:
328        try {
329          transitPeerSyncReplicationState(env);
330        } catch (ReplicationException e) {
331          throw suspend(env.getMasterConfiguration(),
332            backoff -> LOG.warn(
333              "Failed to update peer storage for peer {} when ending transiting sync "
334                + "replication peer state from {} to {}, sleep {} secs and retry",
335              peerId, fromState, toState, backoff / 1000, e));
336        }
337        resetRetry();
338        setNextState(
339          PeerSyncReplicationStateTransitionState.REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_END);
340        return Flow.HAS_MORE_STATE;
341      case REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_END:
342        addChildProcedure(env.getMasterServices().getServerManager().getOnlineServersList().stream()
343          .map(sn -> new RefreshPeerProcedure(peerId, getPeerOperationType(), sn, 1))
344          .toArray(RefreshPeerProcedure[]::new));
345        setNextStateAfterRefreshEnd();
346        return Flow.HAS_MORE_STATE;
347      case SYNC_REPLICATION_SET_PEER_ENABLED:
348        try {
349          enablePeer(env);
350        } catch (ReplicationException e) {
351          throw suspend(env.getMasterConfiguration(),
352            backoff -> LOG.warn(
353              "Failed to set peer enabled for peer {} when transiting sync replication peer "
354                + "state from {} to {}, sleep {} secs and retry",
355              peerId, fromState, toState, backoff / 1000, e));
356        }
357        resetRetry();
358        setNextState(
359          PeerSyncReplicationStateTransitionState.SYNC_REPLICATION_ENABLE_PEER_REFRESH_PEER_ON_RS);
360        return Flow.HAS_MORE_STATE;
361      case SYNC_REPLICATION_ENABLE_PEER_REFRESH_PEER_ON_RS:
362        refreshPeer(env, PeerOperationType.ENABLE);
363        setNextState(PeerSyncReplicationStateTransitionState.CREATE_DIR_FOR_REMOTE_WAL);
364        return Flow.HAS_MORE_STATE;
365      case CREATE_DIR_FOR_REMOTE_WAL:
366        try {
367          createDirForRemoteWAL(env);
368        } catch (IOException e) {
369          throw suspend(env.getMasterConfiguration(),
370            backoff -> LOG.warn(
371              "Failed to create remote wal dir for peer {} when transiting sync replication "
372                + "peer state from {} to {}, sleep {} secs and retry",
373              peerId, fromState, toState, backoff / 1000, e));
374        }
375        resetRetry();
376        setNextState(
377          PeerSyncReplicationStateTransitionState.POST_PEER_SYNC_REPLICATION_STATE_TRANSITION);
378        return Flow.HAS_MORE_STATE;
379      case POST_PEER_SYNC_REPLICATION_STATE_TRANSITION:
380        try {
381          postTransit(env);
382        } catch (IOException e) {
383          LOG.warn(
384            "Failed to call post CP hook for peer {} when transiting sync replication "
385              + "peer state from {} to {}, ignore since the procedure has already done",
386            peerId, fromState, toState, e);
387        }
388        return Flow.NO_MORE_STATE;
389      default:
390        throw new UnsupportedOperationException("unhandled state=" + state);
391    }
392  }
393}