001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.replication; 019 020import java.io.IOException; 021import org.apache.hadoop.fs.FileSystem; 022import org.apache.hadoop.fs.Path; 023import org.apache.hadoop.hbase.DoNotRetryIOException; 024import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; 025import org.apache.hadoop.hbase.master.MasterCoprocessorHost; 026import org.apache.hadoop.hbase.master.MasterFileSystem; 027import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; 028import org.apache.hadoop.hbase.master.procedure.ReopenTableRegionsProcedure; 029import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; 030import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; 031import org.apache.hadoop.hbase.procedure2.StateMachineProcedure.Flow; 032import org.apache.hadoop.hbase.replication.ReplicationException; 033import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; 034import org.apache.hadoop.hbase.replication.ReplicationUtils; 035import org.apache.hadoop.hbase.replication.SyncReplicationState; 036import org.apache.yetus.audience.InterfaceAudience; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039 040import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerSyncReplicationStateTransitionState; 041import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.TransitPeerSyncReplicationStateStateData; 042 043/** 044 * The procedure for transit current sync replication state for a synchronous replication peer. 045 */ 046@InterfaceAudience.Private 047public class TransitPeerSyncReplicationStateProcedure 048 extends AbstractPeerProcedure<PeerSyncReplicationStateTransitionState> { 049 050 private static final Logger LOG = 051 LoggerFactory.getLogger(TransitPeerSyncReplicationStateProcedure.class); 052 053 protected SyncReplicationState fromState; 054 055 private SyncReplicationState toState; 056 057 private boolean enabled; 058 059 private boolean serial; 060 061 public TransitPeerSyncReplicationStateProcedure() { 062 } 063 064 public TransitPeerSyncReplicationStateProcedure(String peerId, SyncReplicationState state) { 065 super(peerId); 066 this.toState = state; 067 } 068 069 @Override 070 public PeerOperationType getPeerOperationType() { 071 return PeerOperationType.TRANSIT_SYNC_REPLICATION_STATE; 072 } 073 074 @Override 075 protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { 076 super.serializeStateData(serializer); 077 TransitPeerSyncReplicationStateStateData.Builder builder = 078 TransitPeerSyncReplicationStateStateData.newBuilder() 079 .setToState(ReplicationPeerConfigUtil.toSyncReplicationState(toState)); 080 if (fromState != null) { 081 builder.setFromState(ReplicationPeerConfigUtil.toSyncReplicationState(fromState)); 082 } 083 serializer.serialize(builder.build()); 084 } 085 086 @Override 087 protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { 088 super.deserializeStateData(serializer); 089 TransitPeerSyncReplicationStateStateData data = 090 serializer.deserialize(TransitPeerSyncReplicationStateStateData.class); 091 toState = ReplicationPeerConfigUtil.toSyncReplicationState(data.getToState()); 092 if (data.hasFromState()) { 093 fromState = ReplicationPeerConfigUtil.toSyncReplicationState(data.getFromState()); 094 } 095 } 096 097 @Override 098 protected PeerSyncReplicationStateTransitionState getState(int stateId) { 099 return PeerSyncReplicationStateTransitionState.forNumber(stateId); 100 } 101 102 @Override 103 protected int getStateId(PeerSyncReplicationStateTransitionState state) { 104 return state.getNumber(); 105 } 106 107 @Override 108 protected PeerSyncReplicationStateTransitionState getInitialState() { 109 return PeerSyncReplicationStateTransitionState.PRE_PEER_SYNC_REPLICATION_STATE_TRANSITION; 110 } 111 112 protected void preTransit(MasterProcedureEnv env) throws IOException { 113 MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); 114 if (cpHost != null) { 115 cpHost.preTransitReplicationPeerSyncReplicationState(peerId, toState); 116 } 117 ReplicationPeerDescription desc = 118 env.getReplicationPeerManager().preTransitPeerSyncReplicationState(peerId, toState); 119 if (toState == SyncReplicationState.ACTIVE) { 120 Path remoteWALDirForPeer = 121 ReplicationUtils.getPeerRemoteWALDir(desc.getPeerConfig().getRemoteWALDir(), peerId); 122 // check whether the remote wal directory is present 123 if ( 124 !remoteWALDirForPeer.getFileSystem(env.getMasterConfiguration()).exists(remoteWALDirForPeer) 125 ) { 126 throw new DoNotRetryIOException( 127 "The remote WAL directory " + remoteWALDirForPeer + " does not exist"); 128 } 129 } 130 fromState = desc.getSyncReplicationState(); 131 enabled = desc.isEnabled(); 132 serial = desc.getPeerConfig().isSerial(); 133 } 134 135 private void postTransit(MasterProcedureEnv env) throws IOException { 136 LOG.info( 137 "Successfully transit current cluster state from {} to {} for sync replication peer {}", 138 fromState, toState, peerId); 139 MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); 140 if (cpHost != null) { 141 env.getMasterCoprocessorHost().postTransitReplicationPeerSyncReplicationState(peerId, 142 fromState, toState); 143 } 144 } 145 146 protected void reopenRegions(MasterProcedureEnv env) { 147 addChildProcedure( 148 env.getReplicationPeerManager().getPeerConfig(peerId).get().getTableCFsMap().keySet().stream() 149 .map(ReopenTableRegionsProcedure::new).toArray(ReopenTableRegionsProcedure[]::new)); 150 } 151 152 protected void createDirForRemoteWAL(MasterProcedureEnv env) throws IOException { 153 MasterFileSystem mfs = env.getMasterFileSystem(); 154 Path remoteWALDir = new Path(mfs.getWALRootDir(), ReplicationUtils.REMOTE_WAL_DIR_NAME); 155 Path remoteWALDirForPeer = ReplicationUtils.getPeerRemoteWALDir(remoteWALDir, peerId); 156 FileSystem walFs = mfs.getWALFileSystem(); 157 if (walFs.exists(remoteWALDirForPeer)) { 158 LOG.warn("Wal dir {} already exists, usually this should not happen, continue anyway", 159 remoteWALDirForPeer); 160 } else if (!walFs.mkdirs(remoteWALDirForPeer)) { 161 throw new IOException("Failed to create remote wal dir " + remoteWALDirForPeer); 162 } 163 } 164 165 private void setNextStateAfterRefreshBegin() { 166 if (fromState.equals(SyncReplicationState.ACTIVE)) { 167 setNextState(toState.equals(SyncReplicationState.STANDBY) 168 ? PeerSyncReplicationStateTransitionState.REMOVE_ALL_REPLICATION_QUEUES_IN_PEER 169 : PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER); 170 } else if (fromState.equals(SyncReplicationState.DOWNGRADE_ACTIVE)) { 171 setNextState(toState.equals(SyncReplicationState.STANDBY) 172 ? PeerSyncReplicationStateTransitionState.REMOVE_ALL_REPLICATION_QUEUES_IN_PEER 173 : PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER); 174 } else { 175 assert toState.equals(SyncReplicationState.DOWNGRADE_ACTIVE); 176 // for serial peer, we need to reopen all the regions and then update the last pushed sequence 177 // id, before replaying any remote wals, so that the serial replication will not be stuck, and 178 // also guarantee the order when replicating the remote wal back. 179 setNextState(serial 180 ? PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER 181 : PeerSyncReplicationStateTransitionState.REPLAY_REMOTE_WAL_IN_PEER); 182 } 183 } 184 185 private void setNextStateAfterRefreshEnd() { 186 if (toState == SyncReplicationState.STANDBY) { 187 setNextState(enabled 188 ? PeerSyncReplicationStateTransitionState.SYNC_REPLICATION_SET_PEER_ENABLED 189 : PeerSyncReplicationStateTransitionState.CREATE_DIR_FOR_REMOTE_WAL); 190 } else if (fromState == SyncReplicationState.STANDBY) { 191 assert toState.equals(SyncReplicationState.DOWNGRADE_ACTIVE); 192 setNextState(serial && enabled 193 ? PeerSyncReplicationStateTransitionState.SYNC_REPLICATION_SET_PEER_ENABLED 194 : PeerSyncReplicationStateTransitionState.POST_PEER_SYNC_REPLICATION_STATE_TRANSITION); 195 } else { 196 setNextState( 197 PeerSyncReplicationStateTransitionState.POST_PEER_SYNC_REPLICATION_STATE_TRANSITION); 198 } 199 } 200 201 private void replayRemoteWAL(boolean serial) { 202 addChildProcedure(new RecoverStandbyProcedure(peerId, serial)); 203 } 204 205 protected void setPeerNewSyncReplicationState(MasterProcedureEnv env) 206 throws ReplicationException { 207 if ( 208 toState.equals(SyncReplicationState.STANDBY) 209 || (fromState.equals(SyncReplicationState.STANDBY) && serial) && enabled 210 ) { 211 // Disable the peer if we are going to transit to STANDBY state, as we need to remove 212 // all the pending replication files. If we do not disable the peer and delete the wal 213 // queues on zk directly, RS will get NoNode exception when updating the wal position 214 // and crash. 215 // Disable the peer if we are going to transit from STANDBY to DOWNGRADE_ACTIVE, and the 216 // replication is serial, as we need to update the lastPushedSequence id after we reopen all 217 // the regions, and for performance reason here we will update in batch, without using CAS, if 218 // we are still replicating at RS side, we may accidentally update the last pushed sequence id 219 // to a less value and cause the replication to be stuck. 220 env.getReplicationPeerManager().disablePeer(peerId); 221 } 222 env.getReplicationPeerManager().setPeerNewSyncReplicationState(peerId, toState); 223 } 224 225 protected void removeAllReplicationQueues(MasterProcedureEnv env) throws ReplicationException { 226 env.getReplicationPeerManager().removeAllQueues(peerId); 227 } 228 229 protected void transitPeerSyncReplicationState(MasterProcedureEnv env) 230 throws ReplicationException { 231 env.getReplicationPeerManager().transitPeerSyncReplicationState(peerId, toState); 232 } 233 234 @Override 235 protected Flow executeFromState(MasterProcedureEnv env, 236 PeerSyncReplicationStateTransitionState state) throws ProcedureSuspendedException { 237 switch (state) { 238 case PRE_PEER_SYNC_REPLICATION_STATE_TRANSITION: 239 try { 240 if ( 241 env.getMasterServices().getProcedures().stream() 242 .filter(p -> p instanceof MigrateReplicationQueueFromZkToTableProcedure) 243 .anyMatch(p -> !p.isFinished()) 244 ) { 245 LOG.info("There is a pending {}, give up execution of {}", 246 MigrateReplicationQueueFromZkToTableProcedure.class.getSimpleName(), 247 getClass().getSimpleName()); 248 setFailure("master-transit-peer-sync-replication-state", 249 new DoNotRetryIOException("There is a pending " 250 + MigrateReplicationQueueFromZkToTableProcedure.class.getSimpleName())); 251 return Flow.NO_MORE_STATE; 252 } 253 checkPeerModificationEnabled(env); 254 preTransit(env); 255 } catch (IOException e) { 256 LOG.warn("Failed to call pre CP hook or the pre check is failed for peer {} " 257 + "when transiting sync replication peer state to {}, " 258 + "mark the procedure as failure and give up", peerId, toState, e); 259 setFailure("master-transit-peer-sync-replication-state", e); 260 return Flow.NO_MORE_STATE; 261 } 262 setNextState(PeerSyncReplicationStateTransitionState.SET_PEER_NEW_SYNC_REPLICATION_STATE); 263 return Flow.HAS_MORE_STATE; 264 case SET_PEER_NEW_SYNC_REPLICATION_STATE: 265 try { 266 setPeerNewSyncReplicationState(env); 267 } catch (ReplicationException e) { 268 throw suspend(env.getMasterConfiguration(), 269 backoff -> LOG.warn( 270 "Failed to update peer storage for peer {} when starting transiting sync " 271 + "replication peer state from {} to {}, sleep {} secs and retry", 272 peerId, fromState, toState, backoff / 1000, e)); 273 } 274 resetRetry(); 275 setNextState( 276 PeerSyncReplicationStateTransitionState.REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_BEGIN); 277 return Flow.HAS_MORE_STATE; 278 case REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_BEGIN: 279 addChildProcedure(env.getMasterServices().getServerManager().getOnlineServersList().stream() 280 .map(sn -> new RefreshPeerProcedure(peerId, getPeerOperationType(), sn, 0)) 281 .toArray(RefreshPeerProcedure[]::new)); 282 setNextStateAfterRefreshBegin(); 283 return Flow.HAS_MORE_STATE; 284 case REOPEN_ALL_REGIONS_IN_PEER: 285 reopenRegions(env); 286 if (fromState.equals(SyncReplicationState.STANDBY)) { 287 assert serial; 288 setNextState( 289 PeerSyncReplicationStateTransitionState.SYNC_REPLICATION_UPDATE_LAST_PUSHED_SEQ_ID_FOR_SERIAL_PEER); 290 } else { 291 setNextState( 292 PeerSyncReplicationStateTransitionState.TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE); 293 } 294 return Flow.HAS_MORE_STATE; 295 case SYNC_REPLICATION_UPDATE_LAST_PUSHED_SEQ_ID_FOR_SERIAL_PEER: 296 try { 297 setLastPushedSequenceId(env, env.getReplicationPeerManager().getPeerConfig(peerId).get()); 298 } catch (Exception e) { 299 throw suspend(env.getMasterConfiguration(), 300 backoff -> LOG.warn( 301 "Failed to update last pushed sequence id for peer {} when transiting sync " 302 + "replication peer state from {} to {}, sleep {} secs and retry", 303 peerId, fromState, toState, backoff / 1000, e)); 304 } 305 resetRetry(); 306 setNextState(PeerSyncReplicationStateTransitionState.REPLAY_REMOTE_WAL_IN_PEER); 307 return Flow.HAS_MORE_STATE; 308 case REPLAY_REMOTE_WAL_IN_PEER: 309 replayRemoteWAL(env.getReplicationPeerManager().getPeerConfig(peerId).get().isSerial()); 310 setNextState( 311 PeerSyncReplicationStateTransitionState.TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE); 312 return Flow.HAS_MORE_STATE; 313 case REMOVE_ALL_REPLICATION_QUEUES_IN_PEER: 314 try { 315 removeAllReplicationQueues(env); 316 } catch (ReplicationException e) { 317 throw suspend(env.getMasterConfiguration(), 318 backoff -> LOG.warn( 319 "Failed to remove all replication queues peer {} when starting transiting" 320 + " sync replication peer state from {} to {}, sleep {} secs and retry", 321 peerId, fromState, toState, backoff / 1000, e)); 322 } 323 resetRetry(); 324 setNextState(fromState.equals(SyncReplicationState.ACTIVE) 325 ? PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER 326 : PeerSyncReplicationStateTransitionState.TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE); 327 return Flow.HAS_MORE_STATE; 328 case TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE: 329 try { 330 transitPeerSyncReplicationState(env); 331 } catch (ReplicationException e) { 332 throw suspend(env.getMasterConfiguration(), 333 backoff -> LOG.warn( 334 "Failed to update peer storage for peer {} when ending transiting sync " 335 + "replication peer state from {} to {}, sleep {} secs and retry", 336 peerId, fromState, toState, backoff / 1000, e)); 337 } 338 resetRetry(); 339 setNextState( 340 PeerSyncReplicationStateTransitionState.REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_END); 341 return Flow.HAS_MORE_STATE; 342 case REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_END: 343 addChildProcedure(env.getMasterServices().getServerManager().getOnlineServersList().stream() 344 .map(sn -> new RefreshPeerProcedure(peerId, getPeerOperationType(), sn, 1)) 345 .toArray(RefreshPeerProcedure[]::new)); 346 setNextStateAfterRefreshEnd(); 347 return Flow.HAS_MORE_STATE; 348 case SYNC_REPLICATION_SET_PEER_ENABLED: 349 try { 350 enablePeer(env); 351 } catch (ReplicationException e) { 352 throw suspend(env.getMasterConfiguration(), 353 backoff -> LOG.warn( 354 "Failed to set peer enabled for peer {} when transiting sync replication peer " 355 + "state from {} to {}, sleep {} secs and retry", 356 peerId, fromState, toState, backoff / 1000, e)); 357 } 358 resetRetry(); 359 setNextState( 360 PeerSyncReplicationStateTransitionState.SYNC_REPLICATION_ENABLE_PEER_REFRESH_PEER_ON_RS); 361 return Flow.HAS_MORE_STATE; 362 case SYNC_REPLICATION_ENABLE_PEER_REFRESH_PEER_ON_RS: 363 refreshPeer(env, PeerOperationType.ENABLE); 364 setNextState(PeerSyncReplicationStateTransitionState.CREATE_DIR_FOR_REMOTE_WAL); 365 return Flow.HAS_MORE_STATE; 366 case CREATE_DIR_FOR_REMOTE_WAL: 367 try { 368 createDirForRemoteWAL(env); 369 } catch (IOException e) { 370 throw suspend(env.getMasterConfiguration(), 371 backoff -> LOG.warn( 372 "Failed to create remote wal dir for peer {} when transiting sync replication " 373 + "peer state from {} to {}, sleep {} secs and retry", 374 peerId, fromState, toState, backoff / 1000, e)); 375 } 376 resetRetry(); 377 setNextState( 378 PeerSyncReplicationStateTransitionState.POST_PEER_SYNC_REPLICATION_STATE_TRANSITION); 379 return Flow.HAS_MORE_STATE; 380 case POST_PEER_SYNC_REPLICATION_STATE_TRANSITION: 381 try { 382 postTransit(env); 383 } catch (IOException e) { 384 LOG.warn( 385 "Failed to call post CP hook for peer {} when transiting sync replication " 386 + "peer state from {} to {}, ignore since the procedure has already done", 387 peerId, fromState, toState, e); 388 } 389 return Flow.NO_MORE_STATE; 390 default: 391 throw new UnsupportedOperationException("unhandled state=" + state); 392 } 393 } 394}