001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.replication; 019 020import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_CLEANER; 021import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_PEER; 022import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_CLEANER; 023import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_PEER; 024import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_MIGRATE; 025import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_PREPARE; 026import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING; 027 028import java.io.IOException; 029import java.util.List; 030import java.util.concurrent.CompletableFuture; 031import java.util.concurrent.ExecutorService; 032import java.util.concurrent.Executors; 033import java.util.function.LongConsumer; 034import java.util.stream.Collectors; 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.hbase.master.procedure.GlobalProcedureInterface; 037import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; 038import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface; 039import org.apache.hadoop.hbase.procedure2.ProcedureFutureUtil; 040import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; 041import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; 042import org.apache.hadoop.hbase.procedure2.ProcedureUtil; 043import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; 044import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; 045import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; 046import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration; 047import org.apache.hadoop.hbase.util.RetryCounter; 048import org.apache.hadoop.hbase.util.VersionInfo; 049import org.apache.yetus.audience.InterfaceAudience; 050import org.apache.zookeeper.KeeperException; 051import org.slf4j.Logger; 052import org.slf4j.LoggerFactory; 053 054import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 055 056import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState; 057import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableStateData; 058import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; 059 060/** 061 * A procedure for migrating replication queue data from zookeeper to hbase:replication table. 062 */ 063@InterfaceAudience.Private 064public class MigrateReplicationQueueFromZkToTableProcedure 065 extends StateMachineProcedure<MasterProcedureEnv, MigrateReplicationQueueFromZkToTableState> 066 implements GlobalProcedureInterface { 067 068 private static final Logger LOG = 069 LoggerFactory.getLogger(MigrateReplicationQueueFromZkToTableProcedure.class); 070 071 private static final int MIN_MAJOR_VERSION = 3; 072 073 private List<String> disabledPeerIds; 074 075 private CompletableFuture<Void> future; 076 077 private ExecutorService executor; 078 079 private RetryCounter retryCounter; 080 081 @Override 082 public String getGlobalId() { 083 return getClass().getSimpleName(); 084 } 085 086 private CompletableFuture<Void> getFuture() { 087 return future; 088 } 089 090 private void setFuture(CompletableFuture<Void> f) { 091 future = f; 092 } 093 094 private ProcedureSuspendedException suspend(Configuration conf, LongConsumer backoffConsumer) 095 throws ProcedureSuspendedException { 096 if (retryCounter == null) { 097 retryCounter = ProcedureUtil.createRetryCounter(conf); 098 } 099 long backoff = retryCounter.getBackoffTimeAndIncrementAttempts(); 100 backoffConsumer.accept(backoff); 101 throw suspend(Math.toIntExact(backoff), true); 102 } 103 104 private void resetRetry() { 105 retryCounter = null; 106 } 107 108 private ExecutorService getExecutorService() { 109 if (executor == null) { 110 executor = Executors.newCachedThreadPool(new ThreadFactoryBuilder() 111 .setNameFormat(getClass().getSimpleName() + "-%d").setDaemon(true).build()); 112 } 113 return executor; 114 } 115 116 private void shutdownExecutorService() { 117 if (executor != null) { 118 executor.shutdown(); 119 executor = null; 120 } 121 } 122 123 private void disableReplicationLogCleaner(MasterProcedureEnv env) 124 throws ProcedureSuspendedException { 125 if (!env.getMasterServices().getReplicationLogCleanerBarrier().disable()) { 126 // it is not likely that we can reach here as we will schedule this procedure immediately 127 // after master restarting, where ReplicationLogCleaner should have not started its first run 128 // yet. But anyway, let's make the code more robust. And it is safe to wait a bit here since 129 // there will be no data in the new replication queue storage before we execute this procedure 130 // so ReplicationLogCleaner will quit immediately without doing anything. 131 throw suspend(env.getMasterConfiguration(), 132 backoff -> LOG.info( 133 "Can not disable replication log cleaner, sleep {} secs and retry later", 134 backoff / 1000)); 135 } 136 resetRetry(); 137 } 138 139 private void enableReplicationLogCleaner(MasterProcedureEnv env) { 140 env.getMasterServices().getReplicationLogCleanerBarrier().enable(); 141 } 142 143 private void waitUntilNoPeerProcedure(MasterProcedureEnv env) throws ProcedureSuspendedException { 144 long peerProcCount; 145 try { 146 peerProcCount = env.getMasterServices().getProcedures().stream() 147 .filter(p -> p instanceof PeerProcedureInterface).filter(p -> !p.isFinished()).count(); 148 } catch (IOException e) { 149 throw suspend(env.getMasterConfiguration(), 150 backoff -> LOG.warn("failed to check peer procedure status, sleep {} secs and retry later", 151 backoff / 1000, e)); 152 } 153 if (peerProcCount > 0) { 154 throw suspend(env.getMasterConfiguration(), 155 backoff -> LOG.info( 156 "There are still {} pending peer procedures, sleep {} secs and retry later", 157 peerProcCount, backoff / 1000)); 158 } 159 resetRetry(); 160 LOG.info("No pending peer procedures found, continue..."); 161 } 162 163 private void finishMigartion() { 164 shutdownExecutorService(); 165 setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING); 166 resetRetry(); 167 } 168 169 @Override 170 protected Flow executeFromState(MasterProcedureEnv env, 171 MigrateReplicationQueueFromZkToTableState state) 172 throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { 173 switch (state) { 174 case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_CLEANER: 175 disableReplicationLogCleaner(env); 176 setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_PREPARE); 177 return Flow.HAS_MORE_STATE; 178 case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_PREPARE: 179 waitUntilNoPeerProcedure(env); 180 List<ReplicationPeerDescription> peers = env.getReplicationPeerManager().listPeers(null); 181 if (peers.isEmpty()) { 182 LOG.info("No active replication peer found, delete old replication queue data and quit"); 183 ZKReplicationQueueStorageForMigration oldStorage = 184 new ZKReplicationQueueStorageForMigration(env.getMasterServices().getZooKeeper(), 185 env.getMasterConfiguration()); 186 try { 187 oldStorage.deleteAllData(); 188 } catch (KeeperException e) { 189 throw suspend(env.getMasterConfiguration(), 190 backoff -> LOG.warn( 191 "failed to delete old replication queue data, sleep {} secs and retry later", 192 backoff / 1000, e)); 193 } 194 setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_CLEANER); 195 return Flow.HAS_MORE_STATE; 196 } 197 // here we do not care the peers which have already been disabled, as later we do not need 198 // to enable them 199 disabledPeerIds = peers.stream().filter(ReplicationPeerDescription::isEnabled) 200 .map(ReplicationPeerDescription::getPeerId).collect(Collectors.toList()); 201 setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_PEER); 202 resetRetry(); 203 return Flow.HAS_MORE_STATE; 204 case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_PEER: 205 for (String peerId : disabledPeerIds) { 206 addChildProcedure(new DisablePeerProcedure(peerId)); 207 } 208 setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_MIGRATE); 209 return Flow.HAS_MORE_STATE; 210 case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_MIGRATE: 211 try { 212 if ( 213 ProcedureFutureUtil.checkFuture(this, this::getFuture, this::setFuture, 214 this::finishMigartion) 215 ) { 216 return Flow.HAS_MORE_STATE; 217 } 218 ProcedureFutureUtil.suspendIfNecessary(this, this::setFuture, 219 env.getReplicationPeerManager() 220 .migrateQueuesFromZk(env.getMasterServices().getZooKeeper(), getExecutorService()), 221 env, this::finishMigartion); 222 } catch (IOException e) { 223 throw suspend(env.getMasterConfiguration(), 224 backoff -> LOG.warn("failed to migrate queue data, sleep {} secs and retry later", 225 backoff / 1000, e)); 226 } 227 return Flow.HAS_MORE_STATE; 228 case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING: 229 long rsWithLowerVersion = 230 env.getMasterServices().getServerManager().getOnlineServers().values().stream() 231 .filter(sm -> VersionInfo.getMajorVersion(sm.getVersion()) < MIN_MAJOR_VERSION).count(); 232 if (rsWithLowerVersion == 0) { 233 setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_PEER); 234 return Flow.HAS_MORE_STATE; 235 } else { 236 throw suspend(env.getMasterConfiguration(), 237 backoff -> LOG.warn( 238 "There are still {} region servers which have a major version" 239 + " less than {}, sleep {} secs and check later", 240 rsWithLowerVersion, MIN_MAJOR_VERSION, backoff / 1000)); 241 } 242 case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_PEER: 243 for (String peerId : disabledPeerIds) { 244 addChildProcedure(new EnablePeerProcedure(peerId)); 245 } 246 setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_CLEANER); 247 return Flow.HAS_MORE_STATE; 248 case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_CLEANER: 249 enableReplicationLogCleaner(env); 250 return Flow.NO_MORE_STATE; 251 default: 252 throw new UnsupportedOperationException("unhandled state=" + state); 253 } 254 } 255 256 @Override 257 protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) { 258 setState(ProcedureProtos.ProcedureState.RUNNABLE); 259 env.getProcedureScheduler().addFront(this); 260 return false; 261 } 262 263 @Override 264 protected void rollbackState(MasterProcedureEnv env, 265 MigrateReplicationQueueFromZkToTableState state) throws IOException, InterruptedException { 266 throw new UnsupportedOperationException(); 267 } 268 269 @Override 270 protected MigrateReplicationQueueFromZkToTableState getState(int stateId) { 271 return MigrateReplicationQueueFromZkToTableState.forNumber(stateId); 272 } 273 274 @Override 275 protected int getStateId(MigrateReplicationQueueFromZkToTableState state) { 276 return state.getNumber(); 277 } 278 279 @Override 280 protected MigrateReplicationQueueFromZkToTableState getInitialState() { 281 return MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_CLEANER; 282 } 283 284 @Override 285 protected void afterReplay(MasterProcedureEnv env) { 286 if (getCurrentState() == getInitialState()) { 287 // do not need to disable log cleaner or acquire lock if we are in the initial state, later 288 // when executing the procedure we will try to disable and acquire. 289 return; 290 } 291 if (!env.getMasterServices().getReplicationLogCleanerBarrier().disable()) { 292 throw new IllegalStateException("can not disable log cleaner, this should not happen"); 293 } 294 } 295 296 @Override 297 protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { 298 super.serializeStateData(serializer); 299 MigrateReplicationQueueFromZkToTableStateData.Builder builder = 300 MigrateReplicationQueueFromZkToTableStateData.newBuilder(); 301 if (disabledPeerIds != null) { 302 builder.addAllDisabledPeerId(disabledPeerIds); 303 } 304 serializer.serialize(builder.build()); 305 } 306 307 @Override 308 protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { 309 super.deserializeStateData(serializer); 310 MigrateReplicationQueueFromZkToTableStateData data = 311 serializer.deserialize(MigrateReplicationQueueFromZkToTableStateData.class); 312 disabledPeerIds = data.getDisabledPeerIdList().stream().collect(Collectors.toList()); 313 } 314}