001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.replication;
019
020import java.io.IOException;
021import org.apache.hadoop.fs.FileSystem;
022import org.apache.hadoop.fs.Path;
023import org.apache.hadoop.hbase.DoNotRetryIOException;
024import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
025import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
026import org.apache.hadoop.hbase.master.MasterFileSystem;
027import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
028import org.apache.hadoop.hbase.master.procedure.ReopenTableRegionsProcedure;
029import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
030import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
031import org.apache.hadoop.hbase.procedure2.StateMachineProcedure.Flow;
032import org.apache.hadoop.hbase.replication.ReplicationException;
033import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
034import org.apache.hadoop.hbase.replication.ReplicationUtils;
035import org.apache.hadoop.hbase.replication.SyncReplicationState;
036import org.apache.yetus.audience.InterfaceAudience;
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039
040import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerSyncReplicationStateTransitionState;
041import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.TransitPeerSyncReplicationStateStateData;
042
043/**
044 * The procedure for transit current sync replication state for a synchronous replication peer.
045 */
046@InterfaceAudience.Private
047public class TransitPeerSyncReplicationStateProcedure
048  extends AbstractPeerProcedure<PeerSyncReplicationStateTransitionState> {
049
050  private static final Logger LOG =
051    LoggerFactory.getLogger(TransitPeerSyncReplicationStateProcedure.class);
052
053  protected SyncReplicationState fromState;
054
055  private SyncReplicationState toState;
056
057  private boolean enabled;
058
059  private boolean serial;
060
061  public TransitPeerSyncReplicationStateProcedure() {
062  }
063
064  public TransitPeerSyncReplicationStateProcedure(String peerId, SyncReplicationState state) {
065    super(peerId);
066    this.toState = state;
067  }
068
069  @Override
070  public PeerOperationType getPeerOperationType() {
071    return PeerOperationType.TRANSIT_SYNC_REPLICATION_STATE;
072  }
073
074  @Override
075  protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
076    super.serializeStateData(serializer);
077    TransitPeerSyncReplicationStateStateData.Builder builder =
078      TransitPeerSyncReplicationStateStateData.newBuilder()
079        .setToState(ReplicationPeerConfigUtil.toSyncReplicationState(toState));
080    if (fromState != null) {
081      builder.setFromState(ReplicationPeerConfigUtil.toSyncReplicationState(fromState));
082    }
083    serializer.serialize(builder.build());
084  }
085
086  @Override
087  protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
088    super.deserializeStateData(serializer);
089    TransitPeerSyncReplicationStateStateData data =
090      serializer.deserialize(TransitPeerSyncReplicationStateStateData.class);
091    toState = ReplicationPeerConfigUtil.toSyncReplicationState(data.getToState());
092    if (data.hasFromState()) {
093      fromState = ReplicationPeerConfigUtil.toSyncReplicationState(data.getFromState());
094    }
095  }
096
097  @Override
098  protected PeerSyncReplicationStateTransitionState getState(int stateId) {
099    return PeerSyncReplicationStateTransitionState.forNumber(stateId);
100  }
101
102  @Override
103  protected int getStateId(PeerSyncReplicationStateTransitionState state) {
104    return state.getNumber();
105  }
106
107  @Override
108  protected PeerSyncReplicationStateTransitionState getInitialState() {
109    return PeerSyncReplicationStateTransitionState.PRE_PEER_SYNC_REPLICATION_STATE_TRANSITION;
110  }
111
112  protected void preTransit(MasterProcedureEnv env) throws IOException {
113    MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
114    if (cpHost != null) {
115      cpHost.preTransitReplicationPeerSyncReplicationState(peerId, toState);
116    }
117    ReplicationPeerDescription desc =
118      env.getReplicationPeerManager().preTransitPeerSyncReplicationState(peerId, toState);
119    if (toState == SyncReplicationState.ACTIVE) {
120      Path remoteWALDirForPeer =
121        ReplicationUtils.getPeerRemoteWALDir(desc.getPeerConfig().getRemoteWALDir(), peerId);
122      // check whether the remote wal directory is present
123      if (
124        !remoteWALDirForPeer.getFileSystem(env.getMasterConfiguration()).exists(remoteWALDirForPeer)
125      ) {
126        throw new DoNotRetryIOException(
127          "The remote WAL directory " + remoteWALDirForPeer + " does not exist");
128      }
129    }
130    fromState = desc.getSyncReplicationState();
131    enabled = desc.isEnabled();
132    serial = desc.getPeerConfig().isSerial();
133  }
134
135  private void postTransit(MasterProcedureEnv env) throws IOException {
136    LOG.info(
137      "Successfully transit current cluster state from {} to {} for sync replication peer {}",
138      fromState, toState, peerId);
139    MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
140    if (cpHost != null) {
141      env.getMasterCoprocessorHost().postTransitReplicationPeerSyncReplicationState(peerId,
142        fromState, toState);
143    }
144  }
145
146  protected void reopenRegions(MasterProcedureEnv env) {
147    addChildProcedure(
148      env.getReplicationPeerManager().getPeerConfig(peerId).get().getTableCFsMap().keySet().stream()
149        .map(ReopenTableRegionsProcedure::new).toArray(ReopenTableRegionsProcedure[]::new));
150  }
151
152  protected void createDirForRemoteWAL(MasterProcedureEnv env) throws IOException {
153    MasterFileSystem mfs = env.getMasterFileSystem();
154    Path remoteWALDir = new Path(mfs.getWALRootDir(), ReplicationUtils.REMOTE_WAL_DIR_NAME);
155    Path remoteWALDirForPeer = ReplicationUtils.getPeerRemoteWALDir(remoteWALDir, peerId);
156    FileSystem walFs = mfs.getWALFileSystem();
157    if (walFs.exists(remoteWALDirForPeer)) {
158      LOG.warn("Wal dir {} already exists, usually this should not happen, continue anyway",
159        remoteWALDirForPeer);
160    } else if (!walFs.mkdirs(remoteWALDirForPeer)) {
161      throw new IOException("Failed to create remote wal dir " + remoteWALDirForPeer);
162    }
163  }
164
165  private void setNextStateAfterRefreshBegin() {
166    if (fromState.equals(SyncReplicationState.ACTIVE)) {
167      setNextState(toState.equals(SyncReplicationState.STANDBY)
168        ? PeerSyncReplicationStateTransitionState.REMOVE_ALL_REPLICATION_QUEUES_IN_PEER
169        : PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER);
170    } else if (fromState.equals(SyncReplicationState.DOWNGRADE_ACTIVE)) {
171      setNextState(toState.equals(SyncReplicationState.STANDBY)
172        ? PeerSyncReplicationStateTransitionState.REMOVE_ALL_REPLICATION_QUEUES_IN_PEER
173        : PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER);
174    } else {
175      assert toState.equals(SyncReplicationState.DOWNGRADE_ACTIVE);
176      // for serial peer, we need to reopen all the regions and then update the last pushed sequence
177      // id, before replaying any remote wals, so that the serial replication will not be stuck, and
178      // also guarantee the order when replicating the remote wal back.
179      setNextState(serial
180        ? PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER
181        : PeerSyncReplicationStateTransitionState.REPLAY_REMOTE_WAL_IN_PEER);
182    }
183  }
184
185  private void setNextStateAfterRefreshEnd() {
186    if (toState == SyncReplicationState.STANDBY) {
187      setNextState(enabled
188        ? PeerSyncReplicationStateTransitionState.SYNC_REPLICATION_SET_PEER_ENABLED
189        : PeerSyncReplicationStateTransitionState.CREATE_DIR_FOR_REMOTE_WAL);
190    } else if (fromState == SyncReplicationState.STANDBY) {
191      assert toState.equals(SyncReplicationState.DOWNGRADE_ACTIVE);
192      setNextState(serial && enabled
193        ? PeerSyncReplicationStateTransitionState.SYNC_REPLICATION_SET_PEER_ENABLED
194        : PeerSyncReplicationStateTransitionState.POST_PEER_SYNC_REPLICATION_STATE_TRANSITION);
195    } else {
196      setNextState(
197        PeerSyncReplicationStateTransitionState.POST_PEER_SYNC_REPLICATION_STATE_TRANSITION);
198    }
199  }
200
201  private void replayRemoteWAL(boolean serial) {
202    addChildProcedure(new RecoverStandbyProcedure(peerId, serial));
203  }
204
205  protected void setPeerNewSyncReplicationState(MasterProcedureEnv env)
206    throws ReplicationException {
207    if (
208      toState.equals(SyncReplicationState.STANDBY)
209        || (fromState.equals(SyncReplicationState.STANDBY) && serial) && enabled
210    ) {
211      // Disable the peer if we are going to transit to STANDBY state, as we need to remove
212      // all the pending replication files. If we do not disable the peer and delete the wal
213      // queues on zk directly, RS will get NoNode exception when updating the wal position
214      // and crash.
215      // Disable the peer if we are going to transit from STANDBY to DOWNGRADE_ACTIVE, and the
216      // replication is serial, as we need to update the lastPushedSequence id after we reopen all
217      // the regions, and for performance reason here we will update in batch, without using CAS, if
218      // we are still replicating at RS side, we may accidentally update the last pushed sequence id
219      // to a less value and cause the replication to be stuck.
220      env.getReplicationPeerManager().disablePeer(peerId);
221    }
222    env.getReplicationPeerManager().setPeerNewSyncReplicationState(peerId, toState);
223  }
224
225  protected void removeAllReplicationQueues(MasterProcedureEnv env) throws ReplicationException {
226    env.getReplicationPeerManager().removeAllQueues(peerId);
227  }
228
229  protected void transitPeerSyncReplicationState(MasterProcedureEnv env)
230    throws ReplicationException {
231    env.getReplicationPeerManager().transitPeerSyncReplicationState(peerId, toState);
232  }
233
234  @Override
235  protected Flow executeFromState(MasterProcedureEnv env,
236    PeerSyncReplicationStateTransitionState state) throws ProcedureSuspendedException {
237    switch (state) {
238      case PRE_PEER_SYNC_REPLICATION_STATE_TRANSITION:
239        try {
240          if (
241            env.getMasterServices().getProcedures().stream()
242              .filter(p -> p instanceof MigrateReplicationQueueFromZkToTableProcedure)
243              .anyMatch(p -> !p.isFinished())
244          ) {
245            LOG.info("There is a pending {}, give up execution of {}",
246              MigrateReplicationQueueFromZkToTableProcedure.class.getSimpleName(),
247              getClass().getSimpleName());
248            setFailure("master-transit-peer-sync-replication-state",
249              new DoNotRetryIOException("There is a pending "
250                + MigrateReplicationQueueFromZkToTableProcedure.class.getSimpleName()));
251            return Flow.NO_MORE_STATE;
252          }
253          checkPeerModificationEnabled(env);
254          preTransit(env);
255        } catch (IOException e) {
256          LOG.warn("Failed to call pre CP hook or the pre check is failed for peer {} "
257            + "when transiting sync replication peer state to {}, "
258            + "mark the procedure as failure and give up", peerId, toState, e);
259          setFailure("master-transit-peer-sync-replication-state", e);
260          return Flow.NO_MORE_STATE;
261        }
262        setNextState(PeerSyncReplicationStateTransitionState.SET_PEER_NEW_SYNC_REPLICATION_STATE);
263        return Flow.HAS_MORE_STATE;
264      case SET_PEER_NEW_SYNC_REPLICATION_STATE:
265        try {
266          setPeerNewSyncReplicationState(env);
267        } catch (ReplicationException e) {
268          throw suspend(env.getMasterConfiguration(),
269            backoff -> LOG.warn(
270              "Failed to update peer storage for peer {} when starting transiting sync "
271                + "replication peer state from {} to {}, sleep {} secs and retry",
272              peerId, fromState, toState, backoff / 1000, e));
273        }
274        resetRetry();
275        setNextState(
276          PeerSyncReplicationStateTransitionState.REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_BEGIN);
277        return Flow.HAS_MORE_STATE;
278      case REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_BEGIN:
279        addChildProcedure(env.getMasterServices().getServerManager().getOnlineServersList().stream()
280          .map(sn -> new RefreshPeerProcedure(peerId, getPeerOperationType(), sn, 0))
281          .toArray(RefreshPeerProcedure[]::new));
282        setNextStateAfterRefreshBegin();
283        return Flow.HAS_MORE_STATE;
284      case REOPEN_ALL_REGIONS_IN_PEER:
285        reopenRegions(env);
286        if (fromState.equals(SyncReplicationState.STANDBY)) {
287          assert serial;
288          setNextState(
289            PeerSyncReplicationStateTransitionState.SYNC_REPLICATION_UPDATE_LAST_PUSHED_SEQ_ID_FOR_SERIAL_PEER);
290        } else {
291          setNextState(
292            PeerSyncReplicationStateTransitionState.TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE);
293        }
294        return Flow.HAS_MORE_STATE;
295      case SYNC_REPLICATION_UPDATE_LAST_PUSHED_SEQ_ID_FOR_SERIAL_PEER:
296        try {
297          setLastPushedSequenceId(env, env.getReplicationPeerManager().getPeerConfig(peerId).get());
298        } catch (Exception e) {
299          throw suspend(env.getMasterConfiguration(),
300            backoff -> LOG.warn(
301              "Failed to update last pushed sequence id for peer {} when transiting sync "
302                + "replication peer state from {} to {}, sleep {} secs and retry",
303              peerId, fromState, toState, backoff / 1000, e));
304        }
305        resetRetry();
306        setNextState(PeerSyncReplicationStateTransitionState.REPLAY_REMOTE_WAL_IN_PEER);
307        return Flow.HAS_MORE_STATE;
308      case REPLAY_REMOTE_WAL_IN_PEER:
309        replayRemoteWAL(env.getReplicationPeerManager().getPeerConfig(peerId).get().isSerial());
310        setNextState(
311          PeerSyncReplicationStateTransitionState.TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE);
312        return Flow.HAS_MORE_STATE;
313      case REMOVE_ALL_REPLICATION_QUEUES_IN_PEER:
314        try {
315          removeAllReplicationQueues(env);
316        } catch (ReplicationException e) {
317          throw suspend(env.getMasterConfiguration(),
318            backoff -> LOG.warn(
319              "Failed to remove all replication queues peer {} when starting transiting"
320                + " sync replication peer state from {} to {}, sleep {} secs and retry",
321              peerId, fromState, toState, backoff / 1000, e));
322        }
323        resetRetry();
324        setNextState(fromState.equals(SyncReplicationState.ACTIVE)
325          ? PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER
326          : PeerSyncReplicationStateTransitionState.TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE);
327        return Flow.HAS_MORE_STATE;
328      case TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE:
329        try {
330          transitPeerSyncReplicationState(env);
331        } catch (ReplicationException e) {
332          throw suspend(env.getMasterConfiguration(),
333            backoff -> LOG.warn(
334              "Failed to update peer storage for peer {} when ending transiting sync "
335                + "replication peer state from {} to {}, sleep {} secs and retry",
336              peerId, fromState, toState, backoff / 1000, e));
337        }
338        resetRetry();
339        setNextState(
340          PeerSyncReplicationStateTransitionState.REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_END);
341        return Flow.HAS_MORE_STATE;
342      case REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_END:
343        addChildProcedure(env.getMasterServices().getServerManager().getOnlineServersList().stream()
344          .map(sn -> new RefreshPeerProcedure(peerId, getPeerOperationType(), sn, 1))
345          .toArray(RefreshPeerProcedure[]::new));
346        setNextStateAfterRefreshEnd();
347        return Flow.HAS_MORE_STATE;
348      case SYNC_REPLICATION_SET_PEER_ENABLED:
349        try {
350          enablePeer(env);
351        } catch (ReplicationException e) {
352          throw suspend(env.getMasterConfiguration(),
353            backoff -> LOG.warn(
354              "Failed to set peer enabled for peer {} when transiting sync replication peer "
355                + "state from {} to {}, sleep {} secs and retry",
356              peerId, fromState, toState, backoff / 1000, e));
357        }
358        resetRetry();
359        setNextState(
360          PeerSyncReplicationStateTransitionState.SYNC_REPLICATION_ENABLE_PEER_REFRESH_PEER_ON_RS);
361        return Flow.HAS_MORE_STATE;
362      case SYNC_REPLICATION_ENABLE_PEER_REFRESH_PEER_ON_RS:
363        refreshPeer(env, PeerOperationType.ENABLE);
364        setNextState(PeerSyncReplicationStateTransitionState.CREATE_DIR_FOR_REMOTE_WAL);
365        return Flow.HAS_MORE_STATE;
366      case CREATE_DIR_FOR_REMOTE_WAL:
367        try {
368          createDirForRemoteWAL(env);
369        } catch (IOException e) {
370          throw suspend(env.getMasterConfiguration(),
371            backoff -> LOG.warn(
372              "Failed to create remote wal dir for peer {} when transiting sync replication "
373                + "peer state from {} to {}, sleep {} secs and retry",
374              peerId, fromState, toState, backoff / 1000, e));
375        }
376        resetRetry();
377        setNextState(
378          PeerSyncReplicationStateTransitionState.POST_PEER_SYNC_REPLICATION_STATE_TRANSITION);
379        return Flow.HAS_MORE_STATE;
380      case POST_PEER_SYNC_REPLICATION_STATE_TRANSITION:
381        try {
382          postTransit(env);
383        } catch (IOException e) {
384          LOG.warn(
385            "Failed to call post CP hook for peer {} when transiting sync replication "
386              + "peer state from {} to {}, ignore since the procedure has already done",
387            peerId, fromState, toState, e);
388        }
389        return Flow.NO_MORE_STATE;
390      default:
391        throw new UnsupportedOperationException("unhandled state=" + state);
392    }
393  }
394}