001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.replication; 019 020import java.io.IOException; 021import java.io.InterruptedIOException; 022import java.util.HashMap; 023import java.util.Map; 024import java.util.function.LongConsumer; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.hbase.MetaTableAccessor; 027import org.apache.hadoop.hbase.TableName; 028import org.apache.hadoop.hbase.TableNotFoundException; 029import org.apache.hadoop.hbase.client.Connection; 030import org.apache.hadoop.hbase.client.TableDescriptor; 031import org.apache.hadoop.hbase.client.TableState; 032import org.apache.hadoop.hbase.master.TableStateManager; 033import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; 034import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch; 035import org.apache.hadoop.hbase.master.procedure.ReopenTableRegionsProcedure; 036import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; 037import org.apache.hadoop.hbase.procedure2.ProcedureUtil; 038import org.apache.hadoop.hbase.replication.ReplicationException; 039import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 040import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 041import org.apache.hadoop.hbase.util.Pair; 042import org.apache.hadoop.hbase.util.RetryCounter; 043import org.apache.yetus.audience.InterfaceAudience; 044import org.slf4j.Logger; 045import org.slf4j.LoggerFactory; 046 047import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 048 049import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerModificationState; 050import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; 051 052/** 053 * The base class for all replication peer related procedure except sync replication state 054 * transition. 055 */ 056@InterfaceAudience.Private 057public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModificationState> { 058 059 private static final Logger LOG = LoggerFactory.getLogger(ModifyPeerProcedure.class); 060 061 protected static final int UPDATE_LAST_SEQ_ID_BATCH_SIZE = 1000; 062 063 // The sleep interval when waiting table to be enabled or disabled. 064 protected static final int SLEEP_INTERVAL_MS = 1000; 065 066 private RetryCounter retryCounter; 067 068 protected ModifyPeerProcedure() { 069 } 070 071 protected ModifyPeerProcedure(String peerId) { 072 super(peerId); 073 } 074 075 /** 076 * Called before we start the actual processing. The implementation should call the pre CP hook, 077 * and also the pre-check for the peer modification. 078 * <p> 079 * If an IOException is thrown then we will give up and mark the procedure as failed directly. If 080 * all checks passes then the procedure can not be rolled back any more. 081 */ 082 protected abstract void prePeerModification(MasterProcedureEnv env) 083 throws IOException, ReplicationException; 084 085 protected abstract void updatePeerStorage(MasterProcedureEnv env) throws ReplicationException; 086 087 /** 088 * Called before we finish the procedure. The implementation can do some logging work, and also 089 * call the coprocessor hook if any. 090 * <p> 091 * Notice that, since we have already done the actual work, throwing {@code IOException} here will 092 * not fail this procedure, we will just ignore it and finish the procedure as suceeded. If 093 * {@code ReplicationException} is thrown we will retry since this usually means we fails to 094 * update the peer storage. 095 */ 096 protected abstract void postPeerModification(MasterProcedureEnv env) 097 throws IOException, ReplicationException; 098 099 private void releaseLatch() { 100 ProcedurePrepareLatch.releaseLatch(latch, this); 101 } 102 103 /** 104 * Implementation class can override this method. By default we will jump to 105 * POST_PEER_MODIFICATION and finish the procedure. 106 */ 107 protected PeerModificationState nextStateAfterRefresh() { 108 return PeerModificationState.POST_PEER_MODIFICATION; 109 } 110 111 /** 112 * The implementation class should override this method if the procedure may enter the serial 113 * related states. 114 */ 115 protected boolean enablePeerBeforeFinish() { 116 throw new UnsupportedOperationException(); 117 } 118 119 private void refreshPeer(MasterProcedureEnv env, PeerOperationType type) { 120 addChildProcedure(env.getMasterServices().getServerManager().getOnlineServersList().stream() 121 .map(sn -> new RefreshPeerProcedure(peerId, type, sn)) 122 .toArray(RefreshPeerProcedure[]::new)); 123 } 124 125 protected ReplicationPeerConfig getOldPeerConfig() { 126 return null; 127 } 128 129 protected ReplicationPeerConfig getNewPeerConfig() { 130 throw new UnsupportedOperationException(); 131 } 132 133 protected void updateLastPushedSequenceIdForSerialPeer(MasterProcedureEnv env) 134 throws IOException, ReplicationException { 135 throw new UnsupportedOperationException(); 136 } 137 138 // If the table is in enabling state, we need to wait until it is enabled and then reopen all its 139 // regions. 140 private boolean needReopen(TableStateManager tsm, TableName tn) throws IOException { 141 for (;;) { 142 try { 143 TableState state = tsm.getTableState(tn); 144 if (state.isEnabled()) { 145 return true; 146 } 147 if (!state.isEnabling()) { 148 return false; 149 } 150 Thread.sleep(SLEEP_INTERVAL_MS); 151 } catch (TableNotFoundException e) { 152 return false; 153 } catch (InterruptedException e) { 154 throw (IOException) new InterruptedIOException(e.getMessage()).initCause(e); 155 } 156 } 157 } 158 159 // will be override in test to simulate error 160 @VisibleForTesting 161 protected void reopenRegions(MasterProcedureEnv env) throws IOException { 162 ReplicationPeerConfig peerConfig = getNewPeerConfig(); 163 ReplicationPeerConfig oldPeerConfig = getOldPeerConfig(); 164 TableStateManager tsm = env.getMasterServices().getTableStateManager(); 165 for (TableDescriptor td : env.getMasterServices().getTableDescriptors().getAll().values()) { 166 if (!td.hasGlobalReplicationScope()) { 167 continue; 168 } 169 TableName tn = td.getTableName(); 170 if (!peerConfig.needToReplicate(tn)) { 171 continue; 172 } 173 if (oldPeerConfig != null && oldPeerConfig.isSerial() && 174 oldPeerConfig.needToReplicate(tn)) { 175 continue; 176 } 177 if (needReopen(tsm, tn)) { 178 addChildProcedure(new ReopenTableRegionsProcedure(tn)); 179 } 180 } 181 } 182 183 // will be override in test to simulate error 184 @VisibleForTesting 185 protected void enablePeer(MasterProcedureEnv env) throws ReplicationException { 186 env.getReplicationPeerManager().enablePeer(peerId); 187 } 188 189 private void addToMap(Map<String, Long> lastSeqIds, String encodedRegionName, long barrier, 190 ReplicationQueueStorage queueStorage) throws ReplicationException { 191 if (barrier >= 0) { 192 lastSeqIds.put(encodedRegionName, barrier); 193 if (lastSeqIds.size() >= UPDATE_LAST_SEQ_ID_BATCH_SIZE) { 194 queueStorage.setLastSequenceIds(peerId, lastSeqIds); 195 lastSeqIds.clear(); 196 } 197 } 198 } 199 200 protected final void setLastPushedSequenceId(MasterProcedureEnv env, 201 ReplicationPeerConfig peerConfig) throws IOException, ReplicationException { 202 Map<String, Long> lastSeqIds = new HashMap<String, Long>(); 203 for (TableDescriptor td : env.getMasterServices().getTableDescriptors().getAll().values()) { 204 if (!td.hasGlobalReplicationScope()) { 205 continue; 206 } 207 TableName tn = td.getTableName(); 208 if (!peerConfig.needToReplicate(tn)) { 209 continue; 210 } 211 setLastPushedSequenceIdForTable(env, tn, lastSeqIds); 212 } 213 if (!lastSeqIds.isEmpty()) { 214 env.getReplicationPeerManager().getQueueStorage().setLastSequenceIds(peerId, lastSeqIds); 215 } 216 } 217 218 // If the table is currently disabling, then we need to wait until it is disabled.We will write 219 // replication barrier for a disabled table. And return whether we need to update the last pushed 220 // sequence id, if the table has been deleted already, i.e, we hit TableStateNotFoundException, 221 // then we do not need to update last pushed sequence id for this table. 222 private boolean needSetLastPushedSequenceId(TableStateManager tsm, TableName tn) 223 throws IOException { 224 for (;;) { 225 try { 226 if (!tsm.getTableState(tn).isDisabling()) { 227 return true; 228 } 229 Thread.sleep(SLEEP_INTERVAL_MS); 230 } catch (TableNotFoundException e) { 231 return false; 232 } catch (InterruptedException e) { 233 throw (IOException) new InterruptedIOException(e.getMessage()).initCause(e); 234 } 235 } 236 } 237 238 // Will put the encodedRegionName->lastPushedSeqId pair into the map passed in, if the map is 239 // large enough we will call queueStorage.setLastSequenceIds and clear the map. So the caller 240 // should not forget to check whether the map is empty at last, if not you should call 241 // queueStorage.setLastSequenceIds to write out the remaining entries in the map. 242 protected final void setLastPushedSequenceIdForTable(MasterProcedureEnv env, TableName tableName, 243 Map<String, Long> lastSeqIds) throws IOException, ReplicationException { 244 TableStateManager tsm = env.getMasterServices().getTableStateManager(); 245 ReplicationQueueStorage queueStorage = env.getReplicationPeerManager().getQueueStorage(); 246 Connection conn = env.getMasterServices().getConnection(); 247 if (!needSetLastPushedSequenceId(tsm, tableName)) { 248 LOG.debug("Skip settting last pushed sequence id for {}", tableName); 249 return; 250 } 251 for (Pair<String, Long> name2Barrier : MetaTableAccessor 252 .getTableEncodedRegionNameAndLastBarrier(conn, tableName)) { 253 LOG.trace("Update last pushed sequence id for {}, {}", tableName, name2Barrier); 254 addToMap(lastSeqIds, name2Barrier.getFirst(), name2Barrier.getSecond().longValue() - 1, 255 queueStorage); 256 } 257 } 258 259 @Override 260 protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) { 261 setState(ProcedureProtos.ProcedureState.RUNNABLE); 262 env.getProcedureScheduler().addFront(this); 263 return false; 264 } 265 266 private ProcedureSuspendedException suspend(Configuration conf, 267 LongConsumer backoffConsumer) throws ProcedureSuspendedException { 268 if (retryCounter == null) { 269 retryCounter = ProcedureUtil.createRetryCounter(conf); 270 } 271 long backoff = retryCounter.getBackoffTimeAndIncrementAttempts(); 272 backoffConsumer.accept(backoff); 273 setTimeout(Math.toIntExact(backoff)); 274 setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT); 275 skipPersistence(); 276 throw new ProcedureSuspendedException(); 277 } 278 279 @Override 280 protected Flow executeFromState(MasterProcedureEnv env, PeerModificationState state) 281 throws ProcedureSuspendedException { 282 switch (state) { 283 case PRE_PEER_MODIFICATION: 284 try { 285 prePeerModification(env); 286 } catch (IOException e) { 287 LOG.warn("{} failed to call pre CP hook or the pre check is failed for peer {}, " + 288 "mark the procedure as failure and give up", getClass().getName(), peerId, e); 289 setFailure("master-" + getPeerOperationType().name().toLowerCase() + "-peer", e); 290 releaseLatch(); 291 return Flow.NO_MORE_STATE; 292 } catch (ReplicationException e) { 293 throw suspend(env.getMasterConfiguration(), 294 backoff -> LOG.warn("{} failed to call prePeerModification for peer {}, sleep {} secs", 295 getClass().getName(), peerId, backoff / 1000, e)); 296 } 297 retryCounter = null; 298 setNextState(PeerModificationState.UPDATE_PEER_STORAGE); 299 return Flow.HAS_MORE_STATE; 300 case UPDATE_PEER_STORAGE: 301 try { 302 updatePeerStorage(env); 303 } catch (ReplicationException e) { 304 throw suspend(env.getMasterConfiguration(), 305 backoff -> LOG.warn("{} update peer storage for peer {} failed, sleep {} secs", 306 getClass().getName(), peerId, backoff / 1000, e)); 307 } 308 retryCounter = null; 309 setNextState(PeerModificationState.REFRESH_PEER_ON_RS); 310 return Flow.HAS_MORE_STATE; 311 case REFRESH_PEER_ON_RS: 312 refreshPeer(env, getPeerOperationType()); 313 setNextState(nextStateAfterRefresh()); 314 return Flow.HAS_MORE_STATE; 315 case SERIAL_PEER_REOPEN_REGIONS: 316 try { 317 reopenRegions(env); 318 } catch (Exception e) { 319 throw suspend(env.getMasterConfiguration(), 320 backoff -> LOG.warn("{} reopen regions for peer {} failed, sleep {} secs", 321 getClass().getName(), peerId, backoff / 1000, e)); 322 } 323 retryCounter = null; 324 setNextState(PeerModificationState.SERIAL_PEER_UPDATE_LAST_PUSHED_SEQ_ID); 325 return Flow.HAS_MORE_STATE; 326 case SERIAL_PEER_UPDATE_LAST_PUSHED_SEQ_ID: 327 try { 328 updateLastPushedSequenceIdForSerialPeer(env); 329 } catch (Exception e) { 330 throw suspend(env.getMasterConfiguration(), 331 backoff -> LOG.warn("{} set last sequence id for peer {} failed, sleep {} secs", 332 getClass().getName(), peerId, backoff / 1000, e)); 333 } 334 retryCounter = null; 335 setNextState(enablePeerBeforeFinish() ? PeerModificationState.SERIAL_PEER_SET_PEER_ENABLED 336 : PeerModificationState.POST_PEER_MODIFICATION); 337 return Flow.HAS_MORE_STATE; 338 case SERIAL_PEER_SET_PEER_ENABLED: 339 try { 340 enablePeer(env); 341 } catch (ReplicationException e) { 342 throw suspend(env.getMasterConfiguration(), 343 backoff -> LOG.warn("{} enable peer before finish for peer {} failed, sleep {} secs", 344 getClass().getName(), peerId, backoff / 1000, e)); 345 } 346 retryCounter = null; 347 setNextState(PeerModificationState.SERIAL_PEER_ENABLE_PEER_REFRESH_PEER_ON_RS); 348 return Flow.HAS_MORE_STATE; 349 case SERIAL_PEER_ENABLE_PEER_REFRESH_PEER_ON_RS: 350 refreshPeer(env, PeerOperationType.ENABLE); 351 setNextState(PeerModificationState.POST_PEER_MODIFICATION); 352 return Flow.HAS_MORE_STATE; 353 case POST_PEER_MODIFICATION: 354 try { 355 postPeerModification(env); 356 } catch (ReplicationException e) { 357 throw suspend(env.getMasterConfiguration(), 358 backoff -> LOG.warn( 359 "{} failed to call postPeerModification for peer {}, sleep {} secs", 360 getClass().getName(), peerId, backoff / 1000, e)); 361 } catch (IOException e) { 362 LOG.warn("{} failed to call post CP hook for peer {}, " + 363 "ignore since the procedure has already done", getClass().getName(), peerId, e); 364 } 365 releaseLatch(); 366 return Flow.NO_MORE_STATE; 367 default: 368 throw new UnsupportedOperationException("unhandled state=" + state); 369 } 370 } 371 372 @Override 373 protected void rollbackState(MasterProcedureEnv env, PeerModificationState state) 374 throws IOException, InterruptedException { 375 if (state == PeerModificationState.PRE_PEER_MODIFICATION) { 376 // actually the peer related operations has no rollback, but if we haven't done any 377 // modifications on the peer storage yet, we can just return. 378 return; 379 } 380 throw new UnsupportedOperationException(); 381 } 382 383 @Override 384 protected PeerModificationState getState(int stateId) { 385 return PeerModificationState.forNumber(stateId); 386 } 387 388 @Override 389 protected int getStateId(PeerModificationState state) { 390 return state.getNumber(); 391 } 392 393 @Override 394 protected PeerModificationState getInitialState() { 395 return PeerModificationState.PRE_PEER_MODIFICATION; 396 } 397}