001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.replication; 019 020import java.io.IOException; 021import java.io.InterruptedIOException; 022import java.util.HashMap; 023import java.util.Map; 024import java.util.function.LongConsumer; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.hbase.MetaTableAccessor; 027import org.apache.hadoop.hbase.TableName; 028import org.apache.hadoop.hbase.TableNotFoundException; 029import org.apache.hadoop.hbase.client.Connection; 030import org.apache.hadoop.hbase.client.TableDescriptor; 031import org.apache.hadoop.hbase.client.TableState; 032import org.apache.hadoop.hbase.master.TableStateManager; 033import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; 034import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch; 035import org.apache.hadoop.hbase.master.procedure.ReopenTableRegionsProcedure; 036import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; 037import org.apache.hadoop.hbase.procedure2.ProcedureUtil; 038import org.apache.hadoop.hbase.replication.ReplicationException; 039import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 040import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 041import org.apache.hadoop.hbase.util.Pair; 042import org.apache.hadoop.hbase.util.RetryCounter; 043import org.apache.yetus.audience.InterfaceAudience; 044import org.slf4j.Logger; 045import org.slf4j.LoggerFactory; 046 047import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerModificationState; 048import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; 049 050/** 051 * The base class for all replication peer related procedure except sync replication state 052 * transition. 053 */ 054@InterfaceAudience.Private 055public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModificationState> { 056 057 private static final Logger LOG = LoggerFactory.getLogger(ModifyPeerProcedure.class); 058 059 protected static final int UPDATE_LAST_SEQ_ID_BATCH_SIZE = 1000; 060 061 // The sleep interval when waiting table to be enabled or disabled. 062 protected static final int SLEEP_INTERVAL_MS = 1000; 063 064 private RetryCounter retryCounter; 065 066 protected ModifyPeerProcedure() { 067 } 068 069 protected ModifyPeerProcedure(String peerId) { 070 super(peerId); 071 } 072 073 /** 074 * Called before we start the actual processing. The implementation should call the pre CP hook, 075 * and also the pre-check for the peer modification. 076 * <p> 077 * If an IOException is thrown then we will give up and mark the procedure as failed directly. If 078 * all checks passes then the procedure can not be rolled back any more. 079 */ 080 protected abstract void prePeerModification(MasterProcedureEnv env) 081 throws IOException, ReplicationException; 082 083 protected abstract void updatePeerStorage(MasterProcedureEnv env) throws ReplicationException; 084 085 /** 086 * Called before we finish the procedure. The implementation can do some logging work, and also 087 * call the coprocessor hook if any. 088 * <p> 089 * Notice that, since we have already done the actual work, throwing {@code IOException} here will 090 * not fail this procedure, we will just ignore it and finish the procedure as suceeded. If 091 * {@code ReplicationException} is thrown we will retry since this usually means we fails to 092 * update the peer storage. 093 */ 094 protected abstract void postPeerModification(MasterProcedureEnv env) 095 throws IOException, ReplicationException; 096 097 private void releaseLatch() { 098 ProcedurePrepareLatch.releaseLatch(latch, this); 099 } 100 101 /** 102 * Implementation class can override this method. By default we will jump to 103 * POST_PEER_MODIFICATION and finish the procedure. 104 */ 105 protected PeerModificationState nextStateAfterRefresh() { 106 return PeerModificationState.POST_PEER_MODIFICATION; 107 } 108 109 /** 110 * The implementation class should override this method if the procedure may enter the serial 111 * related states. 112 */ 113 protected boolean enablePeerBeforeFinish() { 114 throw new UnsupportedOperationException(); 115 } 116 117 private void refreshPeer(MasterProcedureEnv env, PeerOperationType type) { 118 addChildProcedure(env.getMasterServices().getServerManager().getOnlineServersList().stream() 119 .map(sn -> new RefreshPeerProcedure(peerId, type, sn)) 120 .toArray(RefreshPeerProcedure[]::new)); 121 } 122 123 protected ReplicationPeerConfig getOldPeerConfig() { 124 return null; 125 } 126 127 protected ReplicationPeerConfig getNewPeerConfig() { 128 throw new UnsupportedOperationException(); 129 } 130 131 protected void updateLastPushedSequenceIdForSerialPeer(MasterProcedureEnv env) 132 throws IOException, ReplicationException { 133 throw new UnsupportedOperationException(); 134 } 135 136 // If the table is in enabling state, we need to wait until it is enabled and then reopen all its 137 // regions. 138 private boolean needReopen(TableStateManager tsm, TableName tn) throws IOException { 139 for (;;) { 140 try { 141 TableState state = tsm.getTableState(tn); 142 if (state.isEnabled()) { 143 return true; 144 } 145 if (!state.isEnabling()) { 146 return false; 147 } 148 Thread.sleep(SLEEP_INTERVAL_MS); 149 } catch (TableNotFoundException e) { 150 return false; 151 } catch (InterruptedException e) { 152 throw (IOException) new InterruptedIOException(e.getMessage()).initCause(e); 153 } 154 } 155 } 156 157 // will be override in test to simulate error 158 protected void reopenRegions(MasterProcedureEnv env) throws IOException { 159 ReplicationPeerConfig peerConfig = getNewPeerConfig(); 160 ReplicationPeerConfig oldPeerConfig = getOldPeerConfig(); 161 TableStateManager tsm = env.getMasterServices().getTableStateManager(); 162 for (TableDescriptor td : env.getMasterServices().getTableDescriptors().getAll().values()) { 163 if (!td.hasGlobalReplicationScope()) { 164 continue; 165 } 166 TableName tn = td.getTableName(); 167 if (!peerConfig.needToReplicate(tn)) { 168 continue; 169 } 170 if (oldPeerConfig != null && oldPeerConfig.isSerial() && 171 oldPeerConfig.needToReplicate(tn)) { 172 continue; 173 } 174 if (needReopen(tsm, tn)) { 175 addChildProcedure(new ReopenTableRegionsProcedure(tn)); 176 } 177 } 178 } 179 180 // will be override in test to simulate error 181 protected void enablePeer(MasterProcedureEnv env) throws ReplicationException { 182 env.getReplicationPeerManager().enablePeer(peerId); 183 } 184 185 private void addToMap(Map<String, Long> lastSeqIds, String encodedRegionName, long barrier, 186 ReplicationQueueStorage queueStorage) throws ReplicationException { 187 if (barrier >= 0) { 188 lastSeqIds.put(encodedRegionName, barrier); 189 if (lastSeqIds.size() >= UPDATE_LAST_SEQ_ID_BATCH_SIZE) { 190 queueStorage.setLastSequenceIds(peerId, lastSeqIds); 191 lastSeqIds.clear(); 192 } 193 } 194 } 195 196 protected final void setLastPushedSequenceId(MasterProcedureEnv env, 197 ReplicationPeerConfig peerConfig) throws IOException, ReplicationException { 198 Map<String, Long> lastSeqIds = new HashMap<String, Long>(); 199 for (TableDescriptor td : env.getMasterServices().getTableDescriptors().getAll().values()) { 200 if (!td.hasGlobalReplicationScope()) { 201 continue; 202 } 203 TableName tn = td.getTableName(); 204 if (!peerConfig.needToReplicate(tn)) { 205 continue; 206 } 207 setLastPushedSequenceIdForTable(env, tn, lastSeqIds); 208 } 209 if (!lastSeqIds.isEmpty()) { 210 env.getReplicationPeerManager().getQueueStorage().setLastSequenceIds(peerId, lastSeqIds); 211 } 212 } 213 214 // If the table is currently disabling, then we need to wait until it is disabled.We will write 215 // replication barrier for a disabled table. And return whether we need to update the last pushed 216 // sequence id, if the table has been deleted already, i.e, we hit TableStateNotFoundException, 217 // then we do not need to update last pushed sequence id for this table. 218 private boolean needSetLastPushedSequenceId(TableStateManager tsm, TableName tn) 219 throws IOException { 220 for (;;) { 221 try { 222 if (!tsm.getTableState(tn).isDisabling()) { 223 return true; 224 } 225 Thread.sleep(SLEEP_INTERVAL_MS); 226 } catch (TableNotFoundException e) { 227 return false; 228 } catch (InterruptedException e) { 229 throw (IOException) new InterruptedIOException(e.getMessage()).initCause(e); 230 } 231 } 232 } 233 234 // Will put the encodedRegionName->lastPushedSeqId pair into the map passed in, if the map is 235 // large enough we will call queueStorage.setLastSequenceIds and clear the map. So the caller 236 // should not forget to check whether the map is empty at last, if not you should call 237 // queueStorage.setLastSequenceIds to write out the remaining entries in the map. 238 protected final void setLastPushedSequenceIdForTable(MasterProcedureEnv env, TableName tableName, 239 Map<String, Long> lastSeqIds) throws IOException, ReplicationException { 240 TableStateManager tsm = env.getMasterServices().getTableStateManager(); 241 ReplicationQueueStorage queueStorage = env.getReplicationPeerManager().getQueueStorage(); 242 Connection conn = env.getMasterServices().getConnection(); 243 if (!needSetLastPushedSequenceId(tsm, tableName)) { 244 LOG.debug("Skip settting last pushed sequence id for {}", tableName); 245 return; 246 } 247 for (Pair<String, Long> name2Barrier : MetaTableAccessor 248 .getTableEncodedRegionNameAndLastBarrier(conn, tableName)) { 249 LOG.trace("Update last pushed sequence id for {}, {}", tableName, name2Barrier); 250 addToMap(lastSeqIds, name2Barrier.getFirst(), name2Barrier.getSecond().longValue() - 1, 251 queueStorage); 252 } 253 } 254 255 @Override 256 protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) { 257 setState(ProcedureProtos.ProcedureState.RUNNABLE); 258 env.getProcedureScheduler().addFront(this); 259 return false; 260 } 261 262 private ProcedureSuspendedException suspend(Configuration conf, 263 LongConsumer backoffConsumer) throws ProcedureSuspendedException { 264 if (retryCounter == null) { 265 retryCounter = ProcedureUtil.createRetryCounter(conf); 266 } 267 long backoff = retryCounter.getBackoffTimeAndIncrementAttempts(); 268 backoffConsumer.accept(backoff); 269 setTimeout(Math.toIntExact(backoff)); 270 setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT); 271 skipPersistence(); 272 throw new ProcedureSuspendedException(); 273 } 274 275 @Override 276 protected Flow executeFromState(MasterProcedureEnv env, PeerModificationState state) 277 throws ProcedureSuspendedException { 278 switch (state) { 279 case PRE_PEER_MODIFICATION: 280 try { 281 prePeerModification(env); 282 } catch (IOException e) { 283 LOG.warn("{} failed to call pre CP hook or the pre check is failed for peer {}, " + 284 "mark the procedure as failure and give up", getClass().getName(), peerId, e); 285 setFailure("master-" + getPeerOperationType().name().toLowerCase() + "-peer", e); 286 releaseLatch(); 287 return Flow.NO_MORE_STATE; 288 } catch (ReplicationException e) { 289 throw suspend(env.getMasterConfiguration(), 290 backoff -> LOG.warn("{} failed to call prePeerModification for peer {}, sleep {} secs", 291 getClass().getName(), peerId, backoff / 1000, e)); 292 } 293 retryCounter = null; 294 setNextState(PeerModificationState.UPDATE_PEER_STORAGE); 295 return Flow.HAS_MORE_STATE; 296 case UPDATE_PEER_STORAGE: 297 try { 298 updatePeerStorage(env); 299 } catch (ReplicationException e) { 300 throw suspend(env.getMasterConfiguration(), 301 backoff -> LOG.warn("{} update peer storage for peer {} failed, sleep {} secs", 302 getClass().getName(), peerId, backoff / 1000, e)); 303 } 304 retryCounter = null; 305 setNextState(PeerModificationState.REFRESH_PEER_ON_RS); 306 return Flow.HAS_MORE_STATE; 307 case REFRESH_PEER_ON_RS: 308 refreshPeer(env, getPeerOperationType()); 309 setNextState(nextStateAfterRefresh()); 310 return Flow.HAS_MORE_STATE; 311 case SERIAL_PEER_REOPEN_REGIONS: 312 try { 313 reopenRegions(env); 314 } catch (Exception e) { 315 throw suspend(env.getMasterConfiguration(), 316 backoff -> LOG.warn("{} reopen regions for peer {} failed, sleep {} secs", 317 getClass().getName(), peerId, backoff / 1000, e)); 318 } 319 retryCounter = null; 320 setNextState(PeerModificationState.SERIAL_PEER_UPDATE_LAST_PUSHED_SEQ_ID); 321 return Flow.HAS_MORE_STATE; 322 case SERIAL_PEER_UPDATE_LAST_PUSHED_SEQ_ID: 323 try { 324 updateLastPushedSequenceIdForSerialPeer(env); 325 } catch (Exception e) { 326 throw suspend(env.getMasterConfiguration(), 327 backoff -> LOG.warn("{} set last sequence id for peer {} failed, sleep {} secs", 328 getClass().getName(), peerId, backoff / 1000, e)); 329 } 330 retryCounter = null; 331 setNextState(enablePeerBeforeFinish() ? PeerModificationState.SERIAL_PEER_SET_PEER_ENABLED 332 : PeerModificationState.POST_PEER_MODIFICATION); 333 return Flow.HAS_MORE_STATE; 334 case SERIAL_PEER_SET_PEER_ENABLED: 335 try { 336 enablePeer(env); 337 } catch (ReplicationException e) { 338 throw suspend(env.getMasterConfiguration(), 339 backoff -> LOG.warn("{} enable peer before finish for peer {} failed, sleep {} secs", 340 getClass().getName(), peerId, backoff / 1000, e)); 341 } 342 retryCounter = null; 343 setNextState(PeerModificationState.SERIAL_PEER_ENABLE_PEER_REFRESH_PEER_ON_RS); 344 return Flow.HAS_MORE_STATE; 345 case SERIAL_PEER_ENABLE_PEER_REFRESH_PEER_ON_RS: 346 refreshPeer(env, PeerOperationType.ENABLE); 347 setNextState(PeerModificationState.POST_PEER_MODIFICATION); 348 return Flow.HAS_MORE_STATE; 349 case POST_PEER_MODIFICATION: 350 try { 351 postPeerModification(env); 352 } catch (ReplicationException e) { 353 throw suspend(env.getMasterConfiguration(), 354 backoff -> LOG.warn( 355 "{} failed to call postPeerModification for peer {}, sleep {} secs", 356 getClass().getName(), peerId, backoff / 1000, e)); 357 } catch (IOException e) { 358 LOG.warn("{} failed to call post CP hook for peer {}, " + 359 "ignore since the procedure has already done", getClass().getName(), peerId, e); 360 } 361 releaseLatch(); 362 return Flow.NO_MORE_STATE; 363 default: 364 throw new UnsupportedOperationException("unhandled state=" + state); 365 } 366 } 367 368 @Override 369 protected void rollbackState(MasterProcedureEnv env, PeerModificationState state) 370 throws IOException, InterruptedException { 371 if (state == PeerModificationState.PRE_PEER_MODIFICATION) { 372 // actually the peer related operations has no rollback, but if we haven't done any 373 // modifications on the peer storage yet, we can just return. 374 return; 375 } 376 throw new UnsupportedOperationException(); 377 } 378 379 @Override 380 protected PeerModificationState getState(int stateId) { 381 return PeerModificationState.forNumber(stateId); 382 } 383 384 @Override 385 protected int getStateId(PeerModificationState state) { 386 return state.getNumber(); 387 } 388 389 @Override 390 protected PeerModificationState getInitialState() { 391 return PeerModificationState.PRE_PEER_MODIFICATION; 392 } 393}