001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.replication;
019
020import java.io.IOException;
021import java.util.Collections;
022import java.util.HashSet;
023import java.util.Iterator;
024import java.util.List;
025import java.util.Set;
026import java.util.stream.Collectors;
027import org.apache.hadoop.fs.Path;
028import org.apache.hadoop.hbase.ServerName;
029import org.apache.hadoop.hbase.master.MasterFileSystem;
030import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
031import org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface;
032import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
033import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
034import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
035import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
036import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
037import org.apache.hadoop.hbase.replication.ReplicationException;
038import org.apache.hadoop.hbase.replication.ReplicationGroupOffset;
039import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
040import org.apache.hadoop.hbase.replication.ReplicationQueueId;
041import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
042import org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp;
043import org.apache.hadoop.hbase.util.RetryCounter;
044import org.apache.yetus.audience.InterfaceAudience;
045import org.slf4j.Logger;
046import org.slf4j.LoggerFactory;
047
048import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
049import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.AssignReplicationQueuesState;
050import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.AssignReplicationQueuesStateData;
051import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
052
053@InterfaceAudience.Private
054public class AssignReplicationQueuesProcedure
055  extends StateMachineProcedure<MasterProcedureEnv, AssignReplicationQueuesState>
056  implements ServerProcedureInterface {
057
058  private static final Logger LOG = LoggerFactory.getLogger(AssignReplicationQueuesProcedure.class);
059
060  private ServerName crashedServer;
061
062  private RetryCounter retryCounter;
063
064  public AssignReplicationQueuesProcedure() {
065  }
066
067  public AssignReplicationQueuesProcedure(ServerName crashedServer) {
068    this.crashedServer = crashedServer;
069  }
070
071  @Override
072  public ServerName getServerName() {
073    return crashedServer;
074  }
075
076  @Override
077  public boolean hasMetaTableRegion() {
078    return false;
079  }
080
081  @Override
082  public ServerOperationType getServerOperationType() {
083    return ServerOperationType.CLAIM_REPLICATION_QUEUES;
084  }
085
086  private void addMissingQueues(MasterProcedureEnv env) throws ReplicationException {
087    ReplicationQueueStorage storage = env.getReplicationPeerManager().getQueueStorage();
088
089    Set<String> existingQueuePeerIds = new HashSet<>();
090    List<ReplicationQueueId> queueIds = storage.listAllQueueIds(crashedServer);
091    for (Iterator<ReplicationQueueId> iter = queueIds.iterator(); iter.hasNext();) {
092      ReplicationQueueId queueId = iter.next();
093      if (!queueId.isRecovered()) {
094        existingQueuePeerIds.add(queueId.getPeerId());
095      }
096    }
097    List<ReplicationPeerDescription> peers = env.getReplicationPeerManager().listPeers(null);
098    for (ReplicationPeerDescription peer : peers) {
099      if (!existingQueuePeerIds.contains(peer.getPeerId())) {
100        ReplicationQueueId queueId = new ReplicationQueueId(crashedServer, peer.getPeerId());
101        LOG.debug("Add replication queue {} for claiming", queueId);
102        env.getReplicationPeerManager().getQueueStorage().setOffset(queueId,
103          crashedServer.toString(), ReplicationGroupOffset.BEGIN, Collections.emptyMap());
104      }
105    }
106  }
107
108  private Flow claimQueues(MasterProcedureEnv env) throws ReplicationException, IOException {
109    Set<String> existingPeerIds = env.getReplicationPeerManager().listPeers(null).stream()
110      .map(ReplicationPeerDescription::getPeerId).collect(Collectors.toSet());
111    ReplicationQueueStorage storage = env.getReplicationPeerManager().getQueueStorage();
112    // filter out replication queue for deleted peers
113    List<ReplicationQueueId> queueIds = storage.listAllQueueIds(crashedServer).stream()
114      .filter(q -> existingPeerIds.contains(q.getPeerId())).collect(Collectors.toList());
115    if (queueIds.isEmpty()) {
116      LOG.debug("Finish claiming replication queues for {}", crashedServer);
117      // we are done
118      return Flow.NO_MORE_STATE;
119    }
120    LOG.debug("There are {} replication queues need to be claimed for {}", queueIds.size(),
121      crashedServer);
122    List<ServerName> targetServers =
123      env.getMasterServices().getServerManager().getOnlineServersList();
124    if (targetServers.isEmpty()) {
125      throw new ReplicationException("no region server available");
126    }
127    Collections.shuffle(targetServers);
128    for (int i = 0, n = Math.min(queueIds.size(), targetServers.size()); i < n; i++) {
129      addChildProcedure(
130        new ClaimReplicationQueueRemoteProcedure(queueIds.get(i), targetServers.get(i)));
131    }
132    retryCounter = null;
133    return Flow.HAS_MORE_STATE;
134  }
135
136  // check whether ReplicationSyncUp has already done the work for us, if so, we should skip
137  // claiming the replication queues and deleting them instead.
138  private boolean shouldSkip(MasterProcedureEnv env) throws IOException {
139    MasterFileSystem mfs = env.getMasterFileSystem();
140    Path syncUpDir = new Path(mfs.getRootDir(), ReplicationSyncUp.INFO_DIR);
141    return mfs.getFileSystem().exists(new Path(syncUpDir, crashedServer.getServerName()));
142  }
143
144  private void removeQueues(MasterProcedureEnv env) throws ReplicationException, IOException {
145    ReplicationQueueStorage storage = env.getReplicationPeerManager().getQueueStorage();
146    for (ReplicationQueueId queueId : storage.listAllQueueIds(crashedServer)) {
147      storage.removeQueue(queueId);
148    }
149    MasterFileSystem mfs = env.getMasterFileSystem();
150    Path syncUpDir = new Path(mfs.getRootDir(), ReplicationSyncUp.INFO_DIR);
151    // remove the region server record file
152    mfs.getFileSystem().delete(new Path(syncUpDir, crashedServer.getServerName()), false);
153  }
154
155  @Override
156  protected Flow executeFromState(MasterProcedureEnv env, AssignReplicationQueuesState state)
157    throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
158    try {
159      switch (state) {
160        case ASSIGN_REPLICATION_QUEUES_ADD_MISSING_QUEUES:
161          if (shouldSkip(env)) {
162            setNextState(AssignReplicationQueuesState.ASSIGN_REPLICATION_QUEUES_REMOVE_QUEUES);
163            return Flow.HAS_MORE_STATE;
164          } else {
165            addMissingQueues(env);
166            retryCounter = null;
167            setNextState(AssignReplicationQueuesState.ASSIGN_REPLICATION_QUEUES_CLAIM);
168            return Flow.HAS_MORE_STATE;
169          }
170        case ASSIGN_REPLICATION_QUEUES_CLAIM:
171          if (shouldSkip(env)) {
172            retryCounter = null;
173            setNextState(AssignReplicationQueuesState.ASSIGN_REPLICATION_QUEUES_REMOVE_QUEUES);
174            return Flow.HAS_MORE_STATE;
175          } else {
176            return claimQueues(env);
177          }
178        case ASSIGN_REPLICATION_QUEUES_REMOVE_QUEUES:
179          removeQueues(env);
180          return Flow.NO_MORE_STATE;
181        default:
182          throw new UnsupportedOperationException("unhandled state=" + state);
183      }
184    } catch (Exception e) {
185      if (retryCounter == null) {
186        retryCounter = ProcedureUtil.createRetryCounter(env.getMasterConfiguration());
187      }
188      long backoff = retryCounter.getBackoffTimeAndIncrementAttempts();
189      LOG.warn("Failed to claim replication queues for {}, suspend {} secs", crashedServer,
190        backoff / 1000, e);
191      setTimeout(Math.toIntExact(backoff));
192      setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
193      skipPersistence();
194      throw new ProcedureSuspendedException();
195    }
196  }
197
198  @Override
199  protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) {
200    setState(ProcedureProtos.ProcedureState.RUNNABLE);
201    env.getProcedureScheduler().addFront(this);
202    return false;
203  }
204
205  @Override
206  protected void rollbackState(MasterProcedureEnv env, AssignReplicationQueuesState state)
207    throws IOException, InterruptedException {
208    throw new UnsupportedOperationException();
209  }
210
211  @Override
212  protected AssignReplicationQueuesState getState(int stateId) {
213    return AssignReplicationQueuesState.forNumber(stateId);
214  }
215
216  @Override
217  protected int getStateId(AssignReplicationQueuesState state) {
218    return state.getNumber();
219  }
220
221  @Override
222  protected AssignReplicationQueuesState getInitialState() {
223    return AssignReplicationQueuesState.ASSIGN_REPLICATION_QUEUES_ADD_MISSING_QUEUES;
224  }
225
226  @Override
227  protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
228    super.serializeStateData(serializer);
229    serializer.serialize(AssignReplicationQueuesStateData.newBuilder()
230      .setCrashedServer(ProtobufUtil.toServerName(crashedServer)).build());
231  }
232
233  @Override
234  protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
235    super.deserializeStateData(serializer);
236    AssignReplicationQueuesStateData proto =
237      serializer.deserialize(AssignReplicationQueuesStateData.class);
238    crashedServer = ProtobufUtil.toServerName(proto.getCrashedServer());
239  }
240
241}