001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.replication;
019
020import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_CLEANER;
021import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_PEER;
022import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_CLEANER;
023import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_PEER;
024import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_MIGRATE;
025import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_PREPARE;
026import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING;
027
028import java.io.IOException;
029import java.util.List;
030import java.util.concurrent.CompletableFuture;
031import java.util.concurrent.ExecutorService;
032import java.util.concurrent.Executors;
033import java.util.function.LongConsumer;
034import java.util.stream.Collectors;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.hbase.master.procedure.GlobalProcedureInterface;
037import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
038import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface;
039import org.apache.hadoop.hbase.procedure2.ProcedureFutureUtil;
040import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
041import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
042import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
043import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
044import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
045import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
046import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration;
047import org.apache.hadoop.hbase.util.RetryCounter;
048import org.apache.hadoop.hbase.util.VersionInfo;
049import org.apache.yetus.audience.InterfaceAudience;
050import org.apache.zookeeper.KeeperException;
051import org.slf4j.Logger;
052import org.slf4j.LoggerFactory;
053
054import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
055
056import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState;
057import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableStateData;
058import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
059
060/**
061 * A procedure for migrating replication queue data from zookeeper to hbase:replication table.
062 */
063@InterfaceAudience.Private
064public class MigrateReplicationQueueFromZkToTableProcedure
065  extends StateMachineProcedure<MasterProcedureEnv, MigrateReplicationQueueFromZkToTableState>
066  implements GlobalProcedureInterface {
067
068  private static final Logger LOG =
069    LoggerFactory.getLogger(MigrateReplicationQueueFromZkToTableProcedure.class);
070
071  private static final int MIN_MAJOR_VERSION = 3;
072
073  private List<String> disabledPeerIds;
074
075  private CompletableFuture<Void> future;
076
077  private ExecutorService executor;
078
079  private RetryCounter retryCounter;
080
081  @Override
082  public String getGlobalId() {
083    return getClass().getSimpleName();
084  }
085
086  private CompletableFuture<Void> getFuture() {
087    return future;
088  }
089
090  private void setFuture(CompletableFuture<Void> f) {
091    future = f;
092  }
093
094  private ProcedureSuspendedException suspend(Configuration conf, LongConsumer backoffConsumer)
095    throws ProcedureSuspendedException {
096    if (retryCounter == null) {
097      retryCounter = ProcedureUtil.createRetryCounter(conf);
098    }
099    long backoff = retryCounter.getBackoffTimeAndIncrementAttempts();
100    backoffConsumer.accept(backoff);
101    throw suspend(Math.toIntExact(backoff), true);
102  }
103
104  private void resetRetry() {
105    retryCounter = null;
106  }
107
108  private ExecutorService getExecutorService() {
109    if (executor == null) {
110      executor = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
111        .setNameFormat(getClass().getSimpleName() + "-%d").setDaemon(true).build());
112    }
113    return executor;
114  }
115
116  private void shutdownExecutorService() {
117    if (executor != null) {
118      executor.shutdown();
119      executor = null;
120    }
121  }
122
123  private void disableReplicationLogCleaner(MasterProcedureEnv env)
124    throws ProcedureSuspendedException {
125    if (!env.getMasterServices().getReplicationLogCleanerBarrier().disable()) {
126      // it is not likely that we can reach here as we will schedule this procedure immediately
127      // after master restarting, where ReplicationLogCleaner should have not started its first run
128      // yet. But anyway, let's make the code more robust. And it is safe to wait a bit here since
129      // there will be no data in the new replication queue storage before we execute this procedure
130      // so ReplicationLogCleaner will quit immediately without doing anything.
131      throw suspend(env.getMasterConfiguration(),
132        backoff -> LOG.info(
133          "Can not disable replication log cleaner, sleep {} secs and retry later",
134          backoff / 1000));
135    }
136    resetRetry();
137  }
138
139  private void enableReplicationLogCleaner(MasterProcedureEnv env) {
140    env.getMasterServices().getReplicationLogCleanerBarrier().enable();
141  }
142
143  private void waitUntilNoPeerProcedure(MasterProcedureEnv env) throws ProcedureSuspendedException {
144    long peerProcCount;
145    try {
146      peerProcCount = env.getMasterServices().getProcedures().stream()
147        .filter(p -> p instanceof PeerProcedureInterface).filter(p -> !p.isFinished()).count();
148    } catch (IOException e) {
149      throw suspend(env.getMasterConfiguration(),
150        backoff -> LOG.warn("failed to check peer procedure status, sleep {} secs and retry later",
151          backoff / 1000, e));
152    }
153    if (peerProcCount > 0) {
154      throw suspend(env.getMasterConfiguration(),
155        backoff -> LOG.info(
156          "There are still {} pending peer procedures, sleep {} secs and retry later",
157          peerProcCount, backoff / 1000));
158    }
159    resetRetry();
160    LOG.info("No pending peer procedures found, continue...");
161  }
162
163  private void finishMigartion() {
164    shutdownExecutorService();
165    setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING);
166    resetRetry();
167  }
168
169  @Override
170  protected Flow executeFromState(MasterProcedureEnv env,
171    MigrateReplicationQueueFromZkToTableState state)
172    throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
173    switch (state) {
174      case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_CLEANER:
175        disableReplicationLogCleaner(env);
176        setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_PREPARE);
177        return Flow.HAS_MORE_STATE;
178      case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_PREPARE:
179        waitUntilNoPeerProcedure(env);
180        List<ReplicationPeerDescription> peers = env.getReplicationPeerManager().listPeers(null);
181        if (peers.isEmpty()) {
182          LOG.info("No active replication peer found, delete old replication queue data and quit");
183          ZKReplicationQueueStorageForMigration oldStorage =
184            new ZKReplicationQueueStorageForMigration(env.getMasterServices().getZooKeeper(),
185              env.getMasterConfiguration());
186          try {
187            oldStorage.deleteAllData();
188          } catch (KeeperException e) {
189            throw suspend(env.getMasterConfiguration(),
190              backoff -> LOG.warn(
191                "failed to delete old replication queue data, sleep {} secs and retry later",
192                backoff / 1000, e));
193          }
194          setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_CLEANER);
195          return Flow.HAS_MORE_STATE;
196        }
197        // here we do not care the peers which have already been disabled, as later we do not need
198        // to enable them
199        disabledPeerIds = peers.stream().filter(ReplicationPeerDescription::isEnabled)
200          .map(ReplicationPeerDescription::getPeerId).collect(Collectors.toList());
201        setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_PEER);
202        resetRetry();
203        return Flow.HAS_MORE_STATE;
204      case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_PEER:
205        for (String peerId : disabledPeerIds) {
206          addChildProcedure(new DisablePeerProcedure(peerId));
207        }
208        setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_MIGRATE);
209        return Flow.HAS_MORE_STATE;
210      case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_MIGRATE:
211        try {
212          if (
213            ProcedureFutureUtil.checkFuture(this, this::getFuture, this::setFuture,
214              this::finishMigartion)
215          ) {
216            return Flow.HAS_MORE_STATE;
217          }
218          ProcedureFutureUtil.suspendIfNecessary(this, this::setFuture,
219            env.getReplicationPeerManager()
220              .migrateQueuesFromZk(env.getMasterServices().getZooKeeper(), getExecutorService()),
221            env, this::finishMigartion);
222        } catch (IOException e) {
223          throw suspend(env.getMasterConfiguration(),
224            backoff -> LOG.warn("failed to migrate queue data, sleep {} secs and retry later",
225              backoff / 1000, e));
226        }
227        return Flow.HAS_MORE_STATE;
228      case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING:
229        long rsWithLowerVersion =
230          env.getMasterServices().getServerManager().getOnlineServers().values().stream()
231            .filter(sm -> VersionInfo.getMajorVersion(sm.getVersion()) < MIN_MAJOR_VERSION).count();
232        if (rsWithLowerVersion == 0) {
233          setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_PEER);
234          return Flow.HAS_MORE_STATE;
235        } else {
236          throw suspend(env.getMasterConfiguration(),
237            backoff -> LOG.warn(
238              "There are still {} region servers which have a major version"
239                + " less than {}, sleep {} secs and check later",
240              rsWithLowerVersion, MIN_MAJOR_VERSION, backoff / 1000));
241        }
242      case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_PEER:
243        for (String peerId : disabledPeerIds) {
244          addChildProcedure(new EnablePeerProcedure(peerId));
245        }
246        setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_CLEANER);
247        return Flow.HAS_MORE_STATE;
248      case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_CLEANER:
249        enableReplicationLogCleaner(env);
250        return Flow.NO_MORE_STATE;
251      default:
252        throw new UnsupportedOperationException("unhandled state=" + state);
253    }
254  }
255
256  @Override
257  protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) {
258    setState(ProcedureProtos.ProcedureState.RUNNABLE);
259    env.getProcedureScheduler().addFront(this);
260    return false;
261  }
262
263  @Override
264  protected void rollbackState(MasterProcedureEnv env,
265    MigrateReplicationQueueFromZkToTableState state) throws IOException, InterruptedException {
266    throw new UnsupportedOperationException();
267  }
268
269  @Override
270  protected MigrateReplicationQueueFromZkToTableState getState(int stateId) {
271    return MigrateReplicationQueueFromZkToTableState.forNumber(stateId);
272  }
273
274  @Override
275  protected int getStateId(MigrateReplicationQueueFromZkToTableState state) {
276    return state.getNumber();
277  }
278
279  @Override
280  protected MigrateReplicationQueueFromZkToTableState getInitialState() {
281    return MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_CLEANER;
282  }
283
284  @Override
285  protected void afterReplay(MasterProcedureEnv env) {
286    if (getCurrentState() == getInitialState()) {
287      // do not need to disable log cleaner or acquire lock if we are in the initial state, later
288      // when executing the procedure we will try to disable and acquire.
289      return;
290    }
291    if (!env.getMasterServices().getReplicationLogCleanerBarrier().disable()) {
292      throw new IllegalStateException("can not disable log cleaner, this should not happen");
293    }
294  }
295
296  @Override
297  protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
298    super.serializeStateData(serializer);
299    MigrateReplicationQueueFromZkToTableStateData.Builder builder =
300      MigrateReplicationQueueFromZkToTableStateData.newBuilder();
301    if (disabledPeerIds != null) {
302      builder.addAllDisabledPeerId(disabledPeerIds);
303    }
304    serializer.serialize(builder.build());
305  }
306
307  @Override
308  protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
309    super.deserializeStateData(serializer);
310    MigrateReplicationQueueFromZkToTableStateData data =
311      serializer.deserialize(MigrateReplicationQueueFromZkToTableStateData.class);
312    disabledPeerIds = data.getDisabledPeerIdList().stream().collect(Collectors.toList());
313  }
314}