001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.replication; 019 020import java.io.IOException; 021import org.apache.hadoop.fs.FileSystem; 022import org.apache.hadoop.fs.Path; 023import org.apache.hadoop.hbase.DoNotRetryIOException; 024import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; 025import org.apache.hadoop.hbase.master.MasterCoprocessorHost; 026import org.apache.hadoop.hbase.master.MasterFileSystem; 027import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; 028import org.apache.hadoop.hbase.master.procedure.ReopenTableRegionsProcedure; 029import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; 030import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; 031import org.apache.hadoop.hbase.replication.ReplicationException; 032import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; 033import org.apache.hadoop.hbase.replication.ReplicationUtils; 034import org.apache.hadoop.hbase.replication.SyncReplicationState; 035import org.apache.yetus.audience.InterfaceAudience; 036import org.slf4j.Logger; 037import org.slf4j.LoggerFactory; 038 039import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerSyncReplicationStateTransitionState; 040import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.TransitPeerSyncReplicationStateStateData; 041 042/** 043 * The procedure for transit current sync replication state for a synchronous replication peer. 044 */ 045@InterfaceAudience.Private 046public class TransitPeerSyncReplicationStateProcedure 047 extends AbstractPeerProcedure<PeerSyncReplicationStateTransitionState> { 048 049 private static final Logger LOG = 050 LoggerFactory.getLogger(TransitPeerSyncReplicationStateProcedure.class); 051 052 protected SyncReplicationState fromState; 053 054 private SyncReplicationState toState; 055 056 private boolean enabled; 057 058 private boolean serial; 059 060 public TransitPeerSyncReplicationStateProcedure() { 061 } 062 063 public TransitPeerSyncReplicationStateProcedure(String peerId, SyncReplicationState state) { 064 super(peerId); 065 this.toState = state; 066 } 067 068 @Override 069 public PeerOperationType getPeerOperationType() { 070 return PeerOperationType.TRANSIT_SYNC_REPLICATION_STATE; 071 } 072 073 @Override 074 protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { 075 super.serializeStateData(serializer); 076 TransitPeerSyncReplicationStateStateData.Builder builder = 077 TransitPeerSyncReplicationStateStateData.newBuilder() 078 .setToState(ReplicationPeerConfigUtil.toSyncReplicationState(toState)); 079 if (fromState != null) { 080 builder.setFromState(ReplicationPeerConfigUtil.toSyncReplicationState(fromState)); 081 } 082 serializer.serialize(builder.build()); 083 } 084 085 @Override 086 protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { 087 super.deserializeStateData(serializer); 088 TransitPeerSyncReplicationStateStateData data = 089 serializer.deserialize(TransitPeerSyncReplicationStateStateData.class); 090 toState = ReplicationPeerConfigUtil.toSyncReplicationState(data.getToState()); 091 if (data.hasFromState()) { 092 fromState = ReplicationPeerConfigUtil.toSyncReplicationState(data.getFromState()); 093 } 094 } 095 096 @Override 097 protected PeerSyncReplicationStateTransitionState getState(int stateId) { 098 return PeerSyncReplicationStateTransitionState.forNumber(stateId); 099 } 100 101 @Override 102 protected int getStateId(PeerSyncReplicationStateTransitionState state) { 103 return state.getNumber(); 104 } 105 106 @Override 107 protected PeerSyncReplicationStateTransitionState getInitialState() { 108 return PeerSyncReplicationStateTransitionState.PRE_PEER_SYNC_REPLICATION_STATE_TRANSITION; 109 } 110 111 protected void preTransit(MasterProcedureEnv env) throws IOException { 112 MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); 113 if (cpHost != null) { 114 cpHost.preTransitReplicationPeerSyncReplicationState(peerId, toState); 115 } 116 ReplicationPeerDescription desc = 117 env.getReplicationPeerManager().preTransitPeerSyncReplicationState(peerId, toState); 118 if (toState == SyncReplicationState.ACTIVE) { 119 Path remoteWALDirForPeer = 120 ReplicationUtils.getPeerRemoteWALDir(desc.getPeerConfig().getRemoteWALDir(), peerId); 121 // check whether the remote wal directory is present 122 if ( 123 !remoteWALDirForPeer.getFileSystem(env.getMasterConfiguration()).exists(remoteWALDirForPeer) 124 ) { 125 throw new DoNotRetryIOException( 126 "The remote WAL directory " + remoteWALDirForPeer + " does not exist"); 127 } 128 } 129 fromState = desc.getSyncReplicationState(); 130 enabled = desc.isEnabled(); 131 serial = desc.getPeerConfig().isSerial(); 132 } 133 134 private void postTransit(MasterProcedureEnv env) throws IOException { 135 LOG.info( 136 "Successfully transit current cluster state from {} to {} for sync replication peer {}", 137 fromState, toState, peerId); 138 MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); 139 if (cpHost != null) { 140 env.getMasterCoprocessorHost().postTransitReplicationPeerSyncReplicationState(peerId, 141 fromState, toState); 142 } 143 } 144 145 protected void reopenRegions(MasterProcedureEnv env) { 146 addChildProcedure( 147 env.getReplicationPeerManager().getPeerConfig(peerId).get().getTableCFsMap().keySet().stream() 148 .map(ReopenTableRegionsProcedure::new).toArray(ReopenTableRegionsProcedure[]::new)); 149 } 150 151 protected void createDirForRemoteWAL(MasterProcedureEnv env) throws IOException { 152 MasterFileSystem mfs = env.getMasterFileSystem(); 153 Path remoteWALDir = new Path(mfs.getWALRootDir(), ReplicationUtils.REMOTE_WAL_DIR_NAME); 154 Path remoteWALDirForPeer = ReplicationUtils.getPeerRemoteWALDir(remoteWALDir, peerId); 155 FileSystem walFs = mfs.getWALFileSystem(); 156 if (walFs.exists(remoteWALDirForPeer)) { 157 LOG.warn("Wal dir {} already exists, usually this should not happen, continue anyway", 158 remoteWALDirForPeer); 159 } else if (!walFs.mkdirs(remoteWALDirForPeer)) { 160 throw new IOException("Failed to create remote wal dir " + remoteWALDirForPeer); 161 } 162 } 163 164 private void setNextStateAfterRefreshBegin() { 165 if (fromState.equals(SyncReplicationState.ACTIVE)) { 166 setNextState(toState.equals(SyncReplicationState.STANDBY) 167 ? PeerSyncReplicationStateTransitionState.REMOVE_ALL_REPLICATION_QUEUES_IN_PEER 168 : PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER); 169 } else if (fromState.equals(SyncReplicationState.DOWNGRADE_ACTIVE)) { 170 setNextState(toState.equals(SyncReplicationState.STANDBY) 171 ? PeerSyncReplicationStateTransitionState.REMOVE_ALL_REPLICATION_QUEUES_IN_PEER 172 : PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER); 173 } else { 174 assert toState.equals(SyncReplicationState.DOWNGRADE_ACTIVE); 175 // for serial peer, we need to reopen all the regions and then update the last pushed sequence 176 // id, before replaying any remote wals, so that the serial replication will not be stuck, and 177 // also guarantee the order when replicating the remote wal back. 178 setNextState(serial 179 ? PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER 180 : PeerSyncReplicationStateTransitionState.REPLAY_REMOTE_WAL_IN_PEER); 181 } 182 } 183 184 private void setNextStateAfterRefreshEnd() { 185 if (toState == SyncReplicationState.STANDBY) { 186 setNextState(enabled 187 ? PeerSyncReplicationStateTransitionState.SYNC_REPLICATION_SET_PEER_ENABLED 188 : PeerSyncReplicationStateTransitionState.CREATE_DIR_FOR_REMOTE_WAL); 189 } else if (fromState == SyncReplicationState.STANDBY) { 190 assert toState.equals(SyncReplicationState.DOWNGRADE_ACTIVE); 191 setNextState(serial && enabled 192 ? PeerSyncReplicationStateTransitionState.SYNC_REPLICATION_SET_PEER_ENABLED 193 : PeerSyncReplicationStateTransitionState.POST_PEER_SYNC_REPLICATION_STATE_TRANSITION); 194 } else { 195 setNextState( 196 PeerSyncReplicationStateTransitionState.POST_PEER_SYNC_REPLICATION_STATE_TRANSITION); 197 } 198 } 199 200 private void replayRemoteWAL(boolean serial) { 201 addChildProcedure(new RecoverStandbyProcedure(peerId, serial)); 202 } 203 204 protected void setPeerNewSyncReplicationState(MasterProcedureEnv env) 205 throws ReplicationException { 206 if ( 207 toState.equals(SyncReplicationState.STANDBY) 208 || (fromState.equals(SyncReplicationState.STANDBY) && serial) && enabled 209 ) { 210 // Disable the peer if we are going to transit to STANDBY state, as we need to remove 211 // all the pending replication files. If we do not disable the peer and delete the wal 212 // queues on zk directly, RS will get NoNode exception when updating the wal position 213 // and crash. 214 // Disable the peer if we are going to transit from STANDBY to DOWNGRADE_ACTIVE, and the 215 // replication is serial, as we need to update the lastPushedSequence id after we reopen all 216 // the regions, and for performance reason here we will update in batch, without using CAS, if 217 // we are still replicating at RS side, we may accidentally update the last pushed sequence id 218 // to a less value and cause the replication to be stuck. 219 env.getReplicationPeerManager().disablePeer(peerId); 220 } 221 env.getReplicationPeerManager().setPeerNewSyncReplicationState(peerId, toState); 222 } 223 224 protected void removeAllReplicationQueues(MasterProcedureEnv env) throws ReplicationException { 225 env.getReplicationPeerManager().removeAllQueues(peerId); 226 } 227 228 protected void transitPeerSyncReplicationState(MasterProcedureEnv env) 229 throws ReplicationException { 230 env.getReplicationPeerManager().transitPeerSyncReplicationState(peerId, toState); 231 } 232 233 @Override 234 protected Flow executeFromState(MasterProcedureEnv env, 235 PeerSyncReplicationStateTransitionState state) throws ProcedureSuspendedException { 236 switch (state) { 237 case PRE_PEER_SYNC_REPLICATION_STATE_TRANSITION: 238 try { 239 if ( 240 env.getMasterServices().getProcedures().stream() 241 .filter(p -> p instanceof MigrateReplicationQueueFromZkToTableProcedure) 242 .anyMatch(p -> !p.isFinished()) 243 ) { 244 LOG.info("There is a pending {}, give up execution of {}", 245 MigrateReplicationQueueFromZkToTableProcedure.class.getSimpleName(), 246 getClass().getSimpleName()); 247 setFailure("master-transit-peer-sync-replication-state", 248 new DoNotRetryIOException("There is a pending " 249 + MigrateReplicationQueueFromZkToTableProcedure.class.getSimpleName())); 250 return Flow.NO_MORE_STATE; 251 } 252 checkPeerModificationEnabled(env); 253 preTransit(env); 254 } catch (IOException e) { 255 LOG.warn("Failed to call pre CP hook or the pre check is failed for peer {} " 256 + "when transiting sync replication peer state to {}, " 257 + "mark the procedure as failure and give up", peerId, toState, e); 258 setFailure("master-transit-peer-sync-replication-state", e); 259 return Flow.NO_MORE_STATE; 260 } 261 setNextState(PeerSyncReplicationStateTransitionState.SET_PEER_NEW_SYNC_REPLICATION_STATE); 262 return Flow.HAS_MORE_STATE; 263 case SET_PEER_NEW_SYNC_REPLICATION_STATE: 264 try { 265 setPeerNewSyncReplicationState(env); 266 } catch (ReplicationException e) { 267 throw suspend(env.getMasterConfiguration(), 268 backoff -> LOG.warn( 269 "Failed to update peer storage for peer {} when starting transiting sync " 270 + "replication peer state from {} to {}, sleep {} secs and retry", 271 peerId, fromState, toState, backoff / 1000, e)); 272 } 273 resetRetry(); 274 setNextState( 275 PeerSyncReplicationStateTransitionState.REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_BEGIN); 276 return Flow.HAS_MORE_STATE; 277 case REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_BEGIN: 278 addChildProcedure(env.getMasterServices().getServerManager().getOnlineServersList().stream() 279 .map(sn -> new RefreshPeerProcedure(peerId, getPeerOperationType(), sn, 0)) 280 .toArray(RefreshPeerProcedure[]::new)); 281 setNextStateAfterRefreshBegin(); 282 return Flow.HAS_MORE_STATE; 283 case REOPEN_ALL_REGIONS_IN_PEER: 284 reopenRegions(env); 285 if (fromState.equals(SyncReplicationState.STANDBY)) { 286 assert serial; 287 setNextState( 288 PeerSyncReplicationStateTransitionState.SYNC_REPLICATION_UPDATE_LAST_PUSHED_SEQ_ID_FOR_SERIAL_PEER); 289 } else { 290 setNextState( 291 PeerSyncReplicationStateTransitionState.TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE); 292 } 293 return Flow.HAS_MORE_STATE; 294 case SYNC_REPLICATION_UPDATE_LAST_PUSHED_SEQ_ID_FOR_SERIAL_PEER: 295 try { 296 setLastPushedSequenceId(env, env.getReplicationPeerManager().getPeerConfig(peerId).get()); 297 } catch (Exception e) { 298 throw suspend(env.getMasterConfiguration(), 299 backoff -> LOG.warn( 300 "Failed to update last pushed sequence id for peer {} when transiting sync " 301 + "replication peer state from {} to {}, sleep {} secs and retry", 302 peerId, fromState, toState, backoff / 1000, e)); 303 } 304 resetRetry(); 305 setNextState(PeerSyncReplicationStateTransitionState.REPLAY_REMOTE_WAL_IN_PEER); 306 return Flow.HAS_MORE_STATE; 307 case REPLAY_REMOTE_WAL_IN_PEER: 308 replayRemoteWAL(env.getReplicationPeerManager().getPeerConfig(peerId).get().isSerial()); 309 setNextState( 310 PeerSyncReplicationStateTransitionState.TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE); 311 return Flow.HAS_MORE_STATE; 312 case REMOVE_ALL_REPLICATION_QUEUES_IN_PEER: 313 try { 314 removeAllReplicationQueues(env); 315 } catch (ReplicationException e) { 316 throw suspend(env.getMasterConfiguration(), 317 backoff -> LOG.warn( 318 "Failed to remove all replication queues peer {} when starting transiting" 319 + " sync replication peer state from {} to {}, sleep {} secs and retry", 320 peerId, fromState, toState, backoff / 1000, e)); 321 } 322 resetRetry(); 323 setNextState(fromState.equals(SyncReplicationState.ACTIVE) 324 ? PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER 325 : PeerSyncReplicationStateTransitionState.TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE); 326 return Flow.HAS_MORE_STATE; 327 case TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE: 328 try { 329 transitPeerSyncReplicationState(env); 330 } catch (ReplicationException e) { 331 throw suspend(env.getMasterConfiguration(), 332 backoff -> LOG.warn( 333 "Failed to update peer storage for peer {} when ending transiting sync " 334 + "replication peer state from {} to {}, sleep {} secs and retry", 335 peerId, fromState, toState, backoff / 1000, e)); 336 } 337 resetRetry(); 338 setNextState( 339 PeerSyncReplicationStateTransitionState.REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_END); 340 return Flow.HAS_MORE_STATE; 341 case REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_END: 342 addChildProcedure(env.getMasterServices().getServerManager().getOnlineServersList().stream() 343 .map(sn -> new RefreshPeerProcedure(peerId, getPeerOperationType(), sn, 1)) 344 .toArray(RefreshPeerProcedure[]::new)); 345 setNextStateAfterRefreshEnd(); 346 return Flow.HAS_MORE_STATE; 347 case SYNC_REPLICATION_SET_PEER_ENABLED: 348 try { 349 enablePeer(env); 350 } catch (ReplicationException e) { 351 throw suspend(env.getMasterConfiguration(), 352 backoff -> LOG.warn( 353 "Failed to set peer enabled for peer {} when transiting sync replication peer " 354 + "state from {} to {}, sleep {} secs and retry", 355 peerId, fromState, toState, backoff / 1000, e)); 356 } 357 resetRetry(); 358 setNextState( 359 PeerSyncReplicationStateTransitionState.SYNC_REPLICATION_ENABLE_PEER_REFRESH_PEER_ON_RS); 360 return Flow.HAS_MORE_STATE; 361 case SYNC_REPLICATION_ENABLE_PEER_REFRESH_PEER_ON_RS: 362 refreshPeer(env, PeerOperationType.ENABLE); 363 setNextState(PeerSyncReplicationStateTransitionState.CREATE_DIR_FOR_REMOTE_WAL); 364 return Flow.HAS_MORE_STATE; 365 case CREATE_DIR_FOR_REMOTE_WAL: 366 try { 367 createDirForRemoteWAL(env); 368 } catch (IOException e) { 369 throw suspend(env.getMasterConfiguration(), 370 backoff -> LOG.warn( 371 "Failed to create remote wal dir for peer {} when transiting sync replication " 372 + "peer state from {} to {}, sleep {} secs and retry", 373 peerId, fromState, toState, backoff / 1000, e)); 374 } 375 resetRetry(); 376 setNextState( 377 PeerSyncReplicationStateTransitionState.POST_PEER_SYNC_REPLICATION_STATE_TRANSITION); 378 return Flow.HAS_MORE_STATE; 379 case POST_PEER_SYNC_REPLICATION_STATE_TRANSITION: 380 try { 381 postTransit(env); 382 } catch (IOException e) { 383 LOG.warn( 384 "Failed to call post CP hook for peer {} when transiting sync replication " 385 + "peer state from {} to {}, ignore since the procedure has already done", 386 peerId, fromState, toState, e); 387 } 388 return Flow.NO_MORE_STATE; 389 default: 390 throw new UnsupportedOperationException("unhandled state=" + state); 391 } 392 } 393}