001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.replication; 019 020import java.io.IOException; 021import java.io.InterruptedIOException; 022import org.apache.hadoop.hbase.DoNotRetryIOException; 023import org.apache.hadoop.hbase.TableName; 024import org.apache.hadoop.hbase.TableNotFoundException; 025import org.apache.hadoop.hbase.client.TableDescriptor; 026import org.apache.hadoop.hbase.client.TableState; 027import org.apache.hadoop.hbase.master.TableStateManager; 028import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; 029import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch; 030import org.apache.hadoop.hbase.master.procedure.ReopenTableRegionsProcedure; 031import org.apache.hadoop.hbase.procedure2.Procedure; 032import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; 033import org.apache.hadoop.hbase.replication.ReplicationException; 034import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 035import org.apache.yetus.audience.InterfaceAudience; 036import org.slf4j.Logger; 037import org.slf4j.LoggerFactory; 038 039import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerModificationState; 040 041/** 042 * The base class for all replication peer related procedure except sync replication state 043 * transition. 044 */ 045@InterfaceAudience.Private 046public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModificationState> { 047 048 private static final Logger LOG = LoggerFactory.getLogger(ModifyPeerProcedure.class); 049 050 protected ModifyPeerProcedure() { 051 } 052 053 protected ModifyPeerProcedure(String peerId) { 054 super(peerId); 055 } 056 057 /** 058 * Called before we start the actual processing. The implementation should call the pre CP hook, 059 * and also the pre-check for the peer modification. 060 * <p> 061 * If an IOException is thrown then we will give up and mark the procedure as failed directly. If 062 * all checks passes then the procedure can not be rolled back any more. 063 */ 064 protected abstract void prePeerModification(MasterProcedureEnv env) 065 throws IOException, ReplicationException, ProcedureSuspendedException; 066 067 protected abstract void updatePeerStorage(MasterProcedureEnv env) throws ReplicationException; 068 069 /** 070 * Called before we finish the procedure. The implementation can do some logging work, and also 071 * call the coprocessor hook if any. 072 * <p> 073 * Notice that, since we have already done the actual work, throwing {@code IOException} here will 074 * not fail this procedure, we will just ignore it and finish the procedure as suceeded. If 075 * {@code ReplicationException} is thrown we will retry since this usually means we fails to 076 * update the peer storage. 077 */ 078 protected abstract void postPeerModification(MasterProcedureEnv env) 079 throws IOException, ReplicationException, ProcedureSuspendedException; 080 081 protected void releaseLatch(MasterProcedureEnv env) { 082 ProcedurePrepareLatch.releaseLatch(latch, this); 083 } 084 085 /** 086 * Implementation class can override this method. By default we will jump to 087 * POST_PEER_MODIFICATION and finish the procedure. 088 */ 089 protected PeerModificationState nextStateAfterRefresh() { 090 return PeerModificationState.POST_PEER_MODIFICATION; 091 } 092 093 /** 094 * The implementation class should override this method if the procedure may enter the serial 095 * related states. 096 */ 097 protected boolean enablePeerBeforeFinish() { 098 throw new UnsupportedOperationException(); 099 } 100 101 protected ReplicationPeerConfig getOldPeerConfig() { 102 return null; 103 } 104 105 protected ReplicationPeerConfig getNewPeerConfig() { 106 throw new UnsupportedOperationException(); 107 } 108 109 protected void updateLastPushedSequenceIdForSerialPeer(MasterProcedureEnv env) 110 throws IOException, ReplicationException { 111 throw new UnsupportedOperationException(); 112 } 113 114 // If the table is in enabling state, we need to wait until it is enabled and then reopen all its 115 // regions. 116 private boolean needReopen(TableStateManager tsm, TableName tn) throws IOException { 117 for (;;) { 118 try { 119 TableState state = tsm.getTableState(tn); 120 if (state.isEnabled()) { 121 return true; 122 } 123 if (!state.isEnabling()) { 124 return false; 125 } 126 Thread.sleep(SLEEP_INTERVAL_MS); 127 } catch (TableNotFoundException e) { 128 return false; 129 } catch (InterruptedException e) { 130 throw (IOException) new InterruptedIOException(e.getMessage()).initCause(e); 131 } 132 } 133 } 134 135 // will be override in test to simulate error 136 protected void reopenRegions(MasterProcedureEnv env) throws IOException { 137 ReplicationPeerConfig peerConfig = getNewPeerConfig(); 138 ReplicationPeerConfig oldPeerConfig = getOldPeerConfig(); 139 TableStateManager tsm = env.getMasterServices().getTableStateManager(); 140 for (TableDescriptor td : env.getMasterServices().getTableDescriptors().getAll().values()) { 141 if (!td.hasGlobalReplicationScope()) { 142 continue; 143 } 144 TableName tn = td.getTableName(); 145 if (!peerConfig.needToReplicate(tn)) { 146 continue; 147 } 148 if (oldPeerConfig != null && oldPeerConfig.isSerial() && oldPeerConfig.needToReplicate(tn)) { 149 continue; 150 } 151 if (needReopen(tsm, tn)) { 152 addChildProcedure(new ReopenTableRegionsProcedure(tn)); 153 } 154 } 155 } 156 157 private boolean shouldFailForMigrating(MasterProcedureEnv env) throws IOException { 158 long parentProcId = getParentProcId(); 159 if ( 160 parentProcId != Procedure.NO_PROC_ID && env.getMasterServices().getMasterProcedureExecutor() 161 .getProcedure(parentProcId) instanceof MigrateReplicationQueueFromZkToTableProcedure 162 ) { 163 // this is scheduled by MigrateReplicationQueueFromZkToTableProcedure, should not fail it 164 return false; 165 } 166 return env.getMasterServices().getProcedures().stream() 167 .filter(p -> p instanceof MigrateReplicationQueueFromZkToTableProcedure) 168 .anyMatch(p -> !p.isFinished()); 169 } 170 171 @Override 172 protected Flow executeFromState(MasterProcedureEnv env, PeerModificationState state) 173 throws ProcedureSuspendedException, InterruptedException { 174 switch (state) { 175 case PRE_PEER_MODIFICATION: 176 try { 177 if (shouldFailForMigrating(env)) { 178 LOG.info("There is a pending {}, give up execution of {}", 179 MigrateReplicationQueueFromZkToTableProcedure.class.getSimpleName(), 180 getClass().getName()); 181 setFailure("master-" + getPeerOperationType().name().toLowerCase() + "-peer", 182 new DoNotRetryIOException("There is a pending " 183 + MigrateReplicationQueueFromZkToTableProcedure.class.getSimpleName())); 184 releaseLatch(env); 185 return Flow.NO_MORE_STATE; 186 } 187 checkPeerModificationEnabled(env); 188 prePeerModification(env); 189 } catch (IOException e) { 190 LOG.warn("{} failed to call pre CP hook or the pre check is failed for peer {}, " 191 + "mark the procedure as failure and give up", getClass().getName(), peerId, e); 192 setFailure("master-" + getPeerOperationType().name().toLowerCase() + "-peer", e); 193 releaseLatch(env); 194 return Flow.NO_MORE_STATE; 195 } catch (ReplicationException e) { 196 throw suspend(env.getMasterConfiguration(), 197 backoff -> LOG.warn("{} failed to call prePeerModification for peer {}, sleep {} secs", 198 getClass().getName(), peerId, backoff / 1000, e)); 199 } 200 resetRetry(); 201 setNextState(PeerModificationState.UPDATE_PEER_STORAGE); 202 return Flow.HAS_MORE_STATE; 203 case UPDATE_PEER_STORAGE: 204 try { 205 updatePeerStorage(env); 206 } catch (ReplicationException e) { 207 throw suspend(env.getMasterConfiguration(), 208 backoff -> LOG.warn("{} update peer storage for peer {} failed, sleep {} secs", 209 getClass().getName(), peerId, backoff / 1000, e)); 210 } 211 resetRetry(); 212 setNextState(PeerModificationState.REFRESH_PEER_ON_RS); 213 return Flow.HAS_MORE_STATE; 214 case REFRESH_PEER_ON_RS: 215 refreshPeer(env, getPeerOperationType()); 216 setNextState(nextStateAfterRefresh()); 217 return Flow.HAS_MORE_STATE; 218 case SERIAL_PEER_REOPEN_REGIONS: 219 try { 220 reopenRegions(env); 221 } catch (Exception e) { 222 throw suspend(env.getMasterConfiguration(), 223 backoff -> LOG.warn("{} reopen regions for peer {} failed, sleep {} secs", 224 getClass().getName(), peerId, backoff / 1000, e)); 225 } 226 resetRetry(); 227 setNextState(PeerModificationState.SERIAL_PEER_UPDATE_LAST_PUSHED_SEQ_ID); 228 return Flow.HAS_MORE_STATE; 229 case SERIAL_PEER_UPDATE_LAST_PUSHED_SEQ_ID: 230 try { 231 updateLastPushedSequenceIdForSerialPeer(env); 232 } catch (Exception e) { 233 throw suspend(env.getMasterConfiguration(), 234 backoff -> LOG.warn("{} set last sequence id for peer {} failed, sleep {} secs", 235 getClass().getName(), peerId, backoff / 1000, e)); 236 } 237 resetRetry(); 238 setNextState(enablePeerBeforeFinish() 239 ? PeerModificationState.SERIAL_PEER_SET_PEER_ENABLED 240 : PeerModificationState.POST_PEER_MODIFICATION); 241 return Flow.HAS_MORE_STATE; 242 case SERIAL_PEER_SET_PEER_ENABLED: 243 try { 244 enablePeer(env); 245 } catch (ReplicationException e) { 246 throw suspend(env.getMasterConfiguration(), 247 backoff -> LOG.warn("{} enable peer before finish for peer {} failed, sleep {} secs", 248 getClass().getName(), peerId, backoff / 1000, e)); 249 } 250 resetRetry(); 251 setNextState(PeerModificationState.SERIAL_PEER_ENABLE_PEER_REFRESH_PEER_ON_RS); 252 return Flow.HAS_MORE_STATE; 253 case SERIAL_PEER_ENABLE_PEER_REFRESH_PEER_ON_RS: 254 refreshPeer(env, PeerOperationType.ENABLE); 255 setNextState(PeerModificationState.POST_PEER_MODIFICATION); 256 return Flow.HAS_MORE_STATE; 257 case POST_PEER_MODIFICATION: 258 try { 259 postPeerModification(env); 260 } catch (ReplicationException e) { 261 throw suspend(env.getMasterConfiguration(), 262 backoff -> LOG.warn( 263 "{} failed to call postPeerModification for peer {}, sleep {} secs", 264 getClass().getName(), peerId, backoff / 1000, e)); 265 } catch (IOException e) { 266 LOG.warn("{} failed to call post CP hook for peer {}, " 267 + "ignore since the procedure has already done", getClass().getName(), peerId, e); 268 } 269 releaseLatch(env); 270 return Flow.NO_MORE_STATE; 271 default: 272 throw new UnsupportedOperationException("unhandled state=" + state); 273 } 274 } 275 276 @Override 277 protected PeerModificationState getState(int stateId) { 278 return PeerModificationState.forNumber(stateId); 279 } 280 281 @Override 282 protected int getStateId(PeerModificationState state) { 283 return state.getNumber(); 284 } 285 286 @Override 287 protected PeerModificationState getInitialState() { 288 return PeerModificationState.PRE_PEER_MODIFICATION; 289 } 290}