001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.replication;
019
020import java.io.IOException;
021import org.apache.hadoop.fs.FileSystem;
022import org.apache.hadoop.fs.Path;
023import org.apache.hadoop.hbase.DoNotRetryIOException;
024import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
025import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
026import org.apache.hadoop.hbase.master.MasterFileSystem;
027import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
028import org.apache.hadoop.hbase.master.procedure.ReopenTableRegionsProcedure;
029import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
030import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
031import org.apache.hadoop.hbase.replication.ReplicationException;
032import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
033import org.apache.hadoop.hbase.replication.ReplicationUtils;
034import org.apache.hadoop.hbase.replication.SyncReplicationState;
035import org.apache.yetus.audience.InterfaceAudience;
036import org.slf4j.Logger;
037import org.slf4j.LoggerFactory;
038
039import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
040
041import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerSyncReplicationStateTransitionState;
042import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.TransitPeerSyncReplicationStateStateData;
043
044/**
045 * The procedure for transit current sync replication state for a synchronous replication peer.
046 */
047@InterfaceAudience.Private
048public class TransitPeerSyncReplicationStateProcedure
049    extends AbstractPeerProcedure<PeerSyncReplicationStateTransitionState> {
050
051  private static final Logger LOG =
052    LoggerFactory.getLogger(TransitPeerSyncReplicationStateProcedure.class);
053
054  protected SyncReplicationState fromState;
055
056  private SyncReplicationState toState;
057
058  private boolean enabled;
059
060  private boolean serial;
061
062  public TransitPeerSyncReplicationStateProcedure() {
063  }
064
065  public TransitPeerSyncReplicationStateProcedure(String peerId, SyncReplicationState state) {
066    super(peerId);
067    this.toState = state;
068  }
069
070  @Override
071  public PeerOperationType getPeerOperationType() {
072    return PeerOperationType.TRANSIT_SYNC_REPLICATION_STATE;
073  }
074
075  @Override
076  protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
077    super.serializeStateData(serializer);
078    TransitPeerSyncReplicationStateStateData.Builder builder =
079      TransitPeerSyncReplicationStateStateData.newBuilder()
080        .setToState(ReplicationPeerConfigUtil.toSyncReplicationState(toState));
081    if (fromState != null) {
082      builder.setFromState(ReplicationPeerConfigUtil.toSyncReplicationState(fromState));
083    }
084    serializer.serialize(builder.build());
085  }
086
087  @Override
088  protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
089    super.deserializeStateData(serializer);
090    TransitPeerSyncReplicationStateStateData data =
091      serializer.deserialize(TransitPeerSyncReplicationStateStateData.class);
092    toState = ReplicationPeerConfigUtil.toSyncReplicationState(data.getToState());
093    if (data.hasFromState()) {
094      fromState = ReplicationPeerConfigUtil.toSyncReplicationState(data.getFromState());
095    }
096  }
097
098  @Override
099  protected PeerSyncReplicationStateTransitionState getState(int stateId) {
100    return PeerSyncReplicationStateTransitionState.forNumber(stateId);
101  }
102
103  @Override
104  protected int getStateId(PeerSyncReplicationStateTransitionState state) {
105    return state.getNumber();
106  }
107
108  @Override
109  protected PeerSyncReplicationStateTransitionState getInitialState() {
110    return PeerSyncReplicationStateTransitionState.PRE_PEER_SYNC_REPLICATION_STATE_TRANSITION;
111  }
112
113  @VisibleForTesting
114  protected void preTransit(MasterProcedureEnv env) throws IOException {
115    MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
116    if (cpHost != null) {
117      cpHost.preTransitReplicationPeerSyncReplicationState(peerId, toState);
118    }
119    ReplicationPeerDescription desc =
120      env.getReplicationPeerManager().preTransitPeerSyncReplicationState(peerId, toState);
121    if (toState == SyncReplicationState.ACTIVE) {
122      Path remoteWALDirForPeer =
123        ReplicationUtils.getPeerRemoteWALDir(desc.getPeerConfig().getRemoteWALDir(), peerId);
124      // check whether the remote wal directory is present
125      if (!remoteWALDirForPeer.getFileSystem(env.getMasterConfiguration())
126        .exists(remoteWALDirForPeer)) {
127        throw new DoNotRetryIOException(
128          "The remote WAL directory " + remoteWALDirForPeer + " does not exist");
129      }
130    }
131    fromState = desc.getSyncReplicationState();
132    enabled = desc.isEnabled();
133    serial = desc.getPeerConfig().isSerial();
134  }
135
136  private void postTransit(MasterProcedureEnv env) throws IOException {
137    LOG.info(
138      "Successfully transit current cluster state from {} to {} for sync replication peer {}",
139      fromState, toState, peerId);
140    MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
141    if (cpHost != null) {
142      env.getMasterCoprocessorHost().postTransitReplicationPeerSyncReplicationState(peerId,
143        fromState, toState);
144    }
145  }
146
147  @VisibleForTesting
148  protected void reopenRegions(MasterProcedureEnv env) {
149    addChildProcedure(
150      env.getReplicationPeerManager().getPeerConfig(peerId).get().getTableCFsMap().keySet().stream()
151        .map(ReopenTableRegionsProcedure::new).toArray(ReopenTableRegionsProcedure[]::new));
152  }
153
154  @VisibleForTesting
155  protected void createDirForRemoteWAL(MasterProcedureEnv env) throws IOException {
156    MasterFileSystem mfs = env.getMasterFileSystem();
157    Path remoteWALDir = new Path(mfs.getWALRootDir(), ReplicationUtils.REMOTE_WAL_DIR_NAME);
158    Path remoteWALDirForPeer = ReplicationUtils.getPeerRemoteWALDir(remoteWALDir, peerId);
159    FileSystem walFs = mfs.getWALFileSystem();
160    if (walFs.exists(remoteWALDirForPeer)) {
161      LOG.warn("Wal dir {} already exists, usually this should not happen, continue anyway",
162        remoteWALDirForPeer);
163    } else if (!walFs.mkdirs(remoteWALDirForPeer)) {
164      throw new IOException("Failed to create remote wal dir " + remoteWALDirForPeer);
165    }
166  }
167
168  private void setNextStateAfterRefreshBegin() {
169    if (fromState.equals(SyncReplicationState.ACTIVE)) {
170      setNextState(toState.equals(SyncReplicationState.STANDBY)
171        ? PeerSyncReplicationStateTransitionState.REMOVE_ALL_REPLICATION_QUEUES_IN_PEER
172        : PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER);
173    } else if (fromState.equals(SyncReplicationState.DOWNGRADE_ACTIVE)) {
174      setNextState(toState.equals(SyncReplicationState.STANDBY)
175        ? PeerSyncReplicationStateTransitionState.REMOVE_ALL_REPLICATION_QUEUES_IN_PEER
176        : PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER);
177    } else {
178      assert toState.equals(SyncReplicationState.DOWNGRADE_ACTIVE);
179      // for serial peer, we need to reopen all the regions and then update the last pushed sequence
180      // id, before replaying any remote wals, so that the serial replication will not be stuck, and
181      // also guarantee the order when replicating the remote wal back.
182      setNextState(serial ? PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER
183        : PeerSyncReplicationStateTransitionState.REPLAY_REMOTE_WAL_IN_PEER);
184    }
185  }
186
187  private void setNextStateAfterRefreshEnd() {
188    if (toState == SyncReplicationState.STANDBY) {
189      setNextState(
190        enabled ? PeerSyncReplicationStateTransitionState.SYNC_REPLICATION_SET_PEER_ENABLED
191          : PeerSyncReplicationStateTransitionState.CREATE_DIR_FOR_REMOTE_WAL);
192    } else if (fromState == SyncReplicationState.STANDBY) {
193      assert toState.equals(SyncReplicationState.DOWNGRADE_ACTIVE);
194      setNextState(serial && enabled
195        ? PeerSyncReplicationStateTransitionState.SYNC_REPLICATION_SET_PEER_ENABLED
196        : PeerSyncReplicationStateTransitionState.POST_PEER_SYNC_REPLICATION_STATE_TRANSITION);
197    } else {
198      setNextState(
199        PeerSyncReplicationStateTransitionState.POST_PEER_SYNC_REPLICATION_STATE_TRANSITION);
200    }
201  }
202
203  private void replayRemoteWAL(boolean serial) {
204    addChildProcedure(new RecoverStandbyProcedure(peerId, serial));
205  }
206
207  @VisibleForTesting
208  protected void setPeerNewSyncReplicationState(MasterProcedureEnv env)
209      throws ReplicationException {
210    if (toState.equals(SyncReplicationState.STANDBY) ||
211      (fromState.equals(SyncReplicationState.STANDBY) && serial) && enabled) {
212      // Disable the peer if we are going to transit to STANDBY state, as we need to remove
213      // all the pending replication files. If we do not disable the peer and delete the wal
214      // queues on zk directly, RS will get NoNode exception when updating the wal position
215      // and crash.
216      // Disable the peer if we are going to transit from STANDBY to DOWNGRADE_ACTIVE, and the
217      // replication is serial, as we need to update the lastPushedSequence id after we reopen all
218      // the regions, and for performance reason here we will update in batch, without using CAS, if
219      // we are still replicating at RS side, we may accidentally update the last pushed sequence id
220      // to a less value and cause the replication to be stuck.
221      env.getReplicationPeerManager().disablePeer(peerId);
222    }
223    env.getReplicationPeerManager().setPeerNewSyncReplicationState(peerId, toState);
224  }
225
226  @VisibleForTesting
227  protected void removeAllReplicationQueues(MasterProcedureEnv env) throws ReplicationException {
228    env.getReplicationPeerManager().removeAllQueues(peerId);
229  }
230
231  @VisibleForTesting
232  protected void transitPeerSyncReplicationState(MasterProcedureEnv env)
233      throws ReplicationException {
234    env.getReplicationPeerManager().transitPeerSyncReplicationState(peerId, toState);
235  }
236
237  @Override
238  protected Flow executeFromState(MasterProcedureEnv env,
239      PeerSyncReplicationStateTransitionState state) throws ProcedureSuspendedException {
240    switch (state) {
241      case PRE_PEER_SYNC_REPLICATION_STATE_TRANSITION:
242        try {
243          preTransit(env);
244        } catch (IOException e) {
245          LOG.warn("Failed to call pre CP hook or the pre check is failed for peer {} " +
246            "when transiting sync replication peer state to {}, " +
247            "mark the procedure as failure and give up", peerId, toState, e);
248          setFailure("master-transit-peer-sync-replication-state", e);
249          return Flow.NO_MORE_STATE;
250        }
251        setNextState(PeerSyncReplicationStateTransitionState.SET_PEER_NEW_SYNC_REPLICATION_STATE);
252        return Flow.HAS_MORE_STATE;
253      case SET_PEER_NEW_SYNC_REPLICATION_STATE:
254        try {
255          setPeerNewSyncReplicationState(env);
256        } catch (ReplicationException e) {
257          throw suspend(env.getMasterConfiguration(),
258            backoff -> LOG.warn(
259              "Failed to update peer storage for peer {} when starting transiting sync " +
260                "replication peer state from {} to {}, sleep {} secs and retry",
261              peerId, fromState, toState, backoff / 1000, e));
262        }
263        resetRetry();
264        setNextState(
265          PeerSyncReplicationStateTransitionState.REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_BEGIN);
266        return Flow.HAS_MORE_STATE;
267      case REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_BEGIN:
268        addChildProcedure(env.getMasterServices().getServerManager().getOnlineServersList().stream()
269          .map(sn -> new RefreshPeerProcedure(peerId, getPeerOperationType(), sn, 0))
270          .toArray(RefreshPeerProcedure[]::new));
271        setNextStateAfterRefreshBegin();
272        return Flow.HAS_MORE_STATE;
273      case REOPEN_ALL_REGIONS_IN_PEER:
274        reopenRegions(env);
275        if (fromState.equals(SyncReplicationState.STANDBY)) {
276          assert serial;
277          setNextState(
278            PeerSyncReplicationStateTransitionState.SYNC_REPLICATION_UPDATE_LAST_PUSHED_SEQ_ID_FOR_SERIAL_PEER);
279        } else {
280          setNextState(
281            PeerSyncReplicationStateTransitionState.TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE);
282        }
283        return Flow.HAS_MORE_STATE;
284      case SYNC_REPLICATION_UPDATE_LAST_PUSHED_SEQ_ID_FOR_SERIAL_PEER:
285        try {
286          setLastPushedSequenceId(env, env.getReplicationPeerManager().getPeerConfig(peerId).get());
287        } catch (Exception e) {
288          throw suspend(env.getMasterConfiguration(),
289            backoff -> LOG.warn(
290              "Failed to update last pushed sequence id for peer {} when transiting sync " +
291                "replication peer state from {} to {}, sleep {} secs and retry",
292              peerId, fromState, toState, backoff / 1000, e));
293        }
294        resetRetry();
295        setNextState(PeerSyncReplicationStateTransitionState.REPLAY_REMOTE_WAL_IN_PEER);
296        return Flow.HAS_MORE_STATE;
297      case REPLAY_REMOTE_WAL_IN_PEER:
298        replayRemoteWAL(env.getReplicationPeerManager().getPeerConfig(peerId).get().isSerial());
299        setNextState(
300          PeerSyncReplicationStateTransitionState.TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE);
301        return Flow.HAS_MORE_STATE;
302      case REMOVE_ALL_REPLICATION_QUEUES_IN_PEER:
303        try {
304          removeAllReplicationQueues(env);
305        } catch (ReplicationException e) {
306          throw suspend(env.getMasterConfiguration(),
307            backoff -> LOG.warn(
308              "Failed to remove all replication queues peer {} when starting transiting" +
309                " sync replication peer state from {} to {}, sleep {} secs and retry",
310              peerId, fromState, toState, backoff / 1000, e));
311        }
312        resetRetry();
313        setNextState(fromState.equals(SyncReplicationState.ACTIVE)
314          ? PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER
315          : PeerSyncReplicationStateTransitionState.TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE);
316        return Flow.HAS_MORE_STATE;
317      case TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE:
318        try {
319          transitPeerSyncReplicationState(env);
320        } catch (ReplicationException e) {
321          throw suspend(env.getMasterConfiguration(),
322            backoff -> LOG.warn(
323              "Failed to update peer storage for peer {} when ending transiting sync " +
324                "replication peer state from {} to {}, sleep {} secs and retry",
325              peerId, fromState, toState, backoff / 1000, e));
326        }
327        resetRetry();
328        setNextState(
329          PeerSyncReplicationStateTransitionState.REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_END);
330        return Flow.HAS_MORE_STATE;
331      case REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_END:
332        addChildProcedure(env.getMasterServices().getServerManager().getOnlineServersList().stream()
333          .map(sn -> new RefreshPeerProcedure(peerId, getPeerOperationType(), sn, 1))
334          .toArray(RefreshPeerProcedure[]::new));
335        setNextStateAfterRefreshEnd();
336        return Flow.HAS_MORE_STATE;
337      case SYNC_REPLICATION_SET_PEER_ENABLED:
338        try {
339          enablePeer(env);
340        } catch (ReplicationException e) {
341          throw suspend(env.getMasterConfiguration(),
342            backoff -> LOG.warn(
343              "Failed to set peer enabled for peer {} when transiting sync replication peer " +
344                "state from {} to {}, sleep {} secs and retry",
345              peerId, fromState, toState, backoff / 1000, e));
346        }
347        resetRetry();
348        setNextState(
349          PeerSyncReplicationStateTransitionState.SYNC_REPLICATION_ENABLE_PEER_REFRESH_PEER_ON_RS);
350        return Flow.HAS_MORE_STATE;
351      case SYNC_REPLICATION_ENABLE_PEER_REFRESH_PEER_ON_RS:
352        refreshPeer(env, PeerOperationType.ENABLE);
353        setNextState(PeerSyncReplicationStateTransitionState.CREATE_DIR_FOR_REMOTE_WAL);
354        return Flow.HAS_MORE_STATE;
355      case CREATE_DIR_FOR_REMOTE_WAL:
356        try {
357          createDirForRemoteWAL(env);
358        } catch (IOException e) {
359          throw suspend(env.getMasterConfiguration(),
360            backoff -> LOG.warn(
361              "Failed to create remote wal dir for peer {} when transiting sync replication " +
362                "peer state from {} to {}, sleep {} secs and retry",
363              peerId, fromState, toState, backoff / 1000, e));
364        }
365        resetRetry();
366        setNextState(
367          PeerSyncReplicationStateTransitionState.POST_PEER_SYNC_REPLICATION_STATE_TRANSITION);
368        return Flow.HAS_MORE_STATE;
369      case POST_PEER_SYNC_REPLICATION_STATE_TRANSITION:
370        try {
371          postTransit(env);
372        } catch (IOException e) {
373          LOG.warn(
374            "Failed to call post CP hook for peer {} when transiting sync replication " +
375              "peer state from {} to {}, ignore since the procedure has already done",
376            peerId, fromState, toState, e);
377        }
378        return Flow.NO_MORE_STATE;
379      default:
380        throw new UnsupportedOperationException("unhandled state=" + state);
381    }
382  }
383}