001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.replication;
019
020import java.io.IOException;
021import java.util.List;
022import org.apache.hadoop.hbase.ServerName;
023import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
024import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
025import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
026import org.apache.yetus.audience.InterfaceAudience;
027import org.slf4j.Logger;
028import org.slf4j.LoggerFactory;
029
030import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
031import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SyncReplicationReplayWALState;
032import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SyncReplicationReplayWALStateData;
033
034/**
035 * The procedure for replaying a set of remote wals. It will get an available region server and
036 * schedule a {@link SyncReplicationReplayWALRemoteProcedure} to actually send the request to region
037 * server.
038 */
039@InterfaceAudience.Private
040public class SyncReplicationReplayWALProcedure
041  extends AbstractPeerNoLockProcedure<SyncReplicationReplayWALState> {
042
043  private static final Logger LOG =
044    LoggerFactory.getLogger(SyncReplicationReplayWALProcedure.class);
045
046  private ServerName worker = null;
047
048  private List<String> wals;
049
050  public SyncReplicationReplayWALProcedure() {
051  }
052
053  public SyncReplicationReplayWALProcedure(String peerId, List<String> wals) {
054    this.peerId = peerId;
055    this.wals = wals;
056  }
057
058  @Override
059  protected Flow executeFromState(MasterProcedureEnv env, SyncReplicationReplayWALState state)
060    throws ProcedureSuspendedException {
061    SyncReplicationReplayWALManager syncReplicationReplayWALManager =
062      env.getMasterServices().getSyncReplicationReplayWALManager();
063    switch (state) {
064      case ASSIGN_WORKER:
065        worker = syncReplicationReplayWALManager.acquirePeerWorker(peerId, this);
066        setNextState(SyncReplicationReplayWALState.DISPATCH_WALS_TO_WORKER);
067        return Flow.HAS_MORE_STATE;
068      case DISPATCH_WALS_TO_WORKER:
069        addChildProcedure(new SyncReplicationReplayWALRemoteProcedure(peerId, wals, worker));
070        setNextState(SyncReplicationReplayWALState.RELEASE_WORKER);
071        return Flow.HAS_MORE_STATE;
072      case RELEASE_WORKER:
073        boolean finished = false;
074        try {
075          finished = syncReplicationReplayWALManager.isReplayWALFinished(wals.get(0));
076        } catch (IOException e) {
077          throw suspend(env.getMasterConfiguration(),
078            backoff -> LOG.warn("Failed to check whether replay wals {} finished for peer id={}"
079              + ", sleep {} secs and retry", wals, peerId, backoff / 1000, e));
080        }
081        syncReplicationReplayWALManager.releasePeerWorker(peerId, worker,
082          env.getProcedureScheduler());
083        if (!finished) {
084          LOG.warn("Failed to replay wals {} for peer id={}, retry", wals, peerId);
085          setNextState(SyncReplicationReplayWALState.ASSIGN_WORKER);
086          return Flow.HAS_MORE_STATE;
087        }
088        return Flow.NO_MORE_STATE;
089      default:
090        throw new UnsupportedOperationException("unhandled state=" + state);
091    }
092  }
093
094  @Override
095  protected void rollbackState(MasterProcedureEnv env, SyncReplicationReplayWALState state)
096    throws IOException, InterruptedException {
097    if (state == getInitialState()) {
098      return;
099    }
100    throw new UnsupportedOperationException();
101  }
102
103  @Override
104  protected SyncReplicationReplayWALState getState(int state) {
105    return SyncReplicationReplayWALState.forNumber(state);
106  }
107
108  @Override
109  protected int getStateId(SyncReplicationReplayWALState state) {
110    return state.getNumber();
111  }
112
113  @Override
114  protected SyncReplicationReplayWALState getInitialState() {
115    return SyncReplicationReplayWALState.ASSIGN_WORKER;
116  }
117
118  @Override
119  protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
120    super.serializeStateData(serializer);
121    SyncReplicationReplayWALStateData.Builder builder =
122      SyncReplicationReplayWALStateData.newBuilder().setPeerId(peerId).addAllWal(wals);
123    if (worker != null) {
124      builder.setWorker(ProtobufUtil.toServerName(worker));
125    }
126    serializer.serialize(builder.build());
127  }
128
129  @Override
130  protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
131    super.deserializeStateData(serializer);
132    SyncReplicationReplayWALStateData data =
133      serializer.deserialize(SyncReplicationReplayWALStateData.class);
134    peerId = data.getPeerId();
135    wals = data.getWalList();
136    if (data.hasWorker()) {
137      worker = ProtobufUtil.toServerName(data.getWorker());
138    }
139  }
140
141  @Override
142  public PeerOperationType getPeerOperationType() {
143    return PeerOperationType.SYNC_REPLICATION_REPLAY_WAL;
144  }
145
146  @Override
147  protected void afterReplay(MasterProcedureEnv env) {
148    // If the procedure is not finished and the worker is not null, we should add it to the used
149    // worker set, to prevent the worker being used by others.
150    if (worker != null && !isFinished()) {
151      env.getMasterServices().getSyncReplicationReplayWALManager().addUsedPeerWorker(peerId,
152        worker);
153    }
154  }
155}