001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.replication;
019
020import java.io.IOException;
021import java.util.Collections;
022import java.util.Iterator;
023import java.util.List;
024import org.apache.hadoop.hbase.ServerName;
025import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
026import org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface;
027import org.apache.hadoop.hbase.procedure2.Procedure;
028import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
029import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
030import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
031import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
032import org.apache.hadoop.hbase.replication.ReplicationException;
033import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
034import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
035import org.apache.hadoop.hbase.util.RetryCounter;
036import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
037import org.apache.yetus.audience.InterfaceAudience;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040
041import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
042import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ClaimReplicationQueuesStateData;
043import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
044
045/**
046 * Used to assign the replication queues of a dead server to other region servers.
047 */
048@InterfaceAudience.Private
049public class ClaimReplicationQueuesProcedure extends Procedure<MasterProcedureEnv>
050  implements ServerProcedureInterface {
051
052  private static final Logger LOG = LoggerFactory.getLogger(ClaimReplicationQueuesProcedure.class);
053
054  private ServerName crashedServer;
055
056  private RetryCounter retryCounter;
057
058  public ClaimReplicationQueuesProcedure() {
059  }
060
061  public ClaimReplicationQueuesProcedure(ServerName crashedServer) {
062    this.crashedServer = crashedServer;
063  }
064
065  @Override
066  public ServerName getServerName() {
067    return crashedServer;
068  }
069
070  @Override
071  public boolean hasMetaTableRegion() {
072    return false;
073  }
074
075  @Override
076  public ServerOperationType getServerOperationType() {
077    return ServerOperationType.CLAIM_REPLICATION_QUEUES;
078  }
079
080  @Override
081  protected Procedure<MasterProcedureEnv>[] execute(MasterProcedureEnv env)
082    throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
083    ReplicationQueueStorage storage = env.getReplicationPeerManager().getQueueStorage();
084    try {
085      List<String> queues = storage.getAllQueues(crashedServer);
086      // this is for upgrading to the new region replication framework, where we will delete the
087      // legacy region_replica_replication peer directly, without deleting the replication queues,
088      // as it may still be used by region servers which have not been upgraded yet.
089      for (Iterator<String> iter = queues.iterator(); iter.hasNext();) {
090        ReplicationQueueInfo queue = new ReplicationQueueInfo(iter.next());
091        if (queue.getPeerId().equals(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER)) {
092          LOG.info("Found replication queue {} for legacy region replication peer, "
093            + "skipping claiming and removing...", queue.getQueueId());
094          iter.remove();
095          storage.removeQueue(crashedServer, queue.getQueueId());
096        }
097      }
098      if (queues.isEmpty()) {
099        LOG.debug("Finish claiming replication queues for {}", crashedServer);
100        storage.removeReplicatorIfQueueIsEmpty(crashedServer);
101        // we are done
102        return null;
103      }
104      LOG.debug("There are {} replication queues need to be claimed for {}", queues.size(),
105        crashedServer);
106      List<ServerName> targetServers =
107        env.getMasterServices().getServerManager().getOnlineServersList();
108      if (targetServers.isEmpty()) {
109        throw new ReplicationException("no region server available");
110      }
111      Collections.shuffle(targetServers);
112      ClaimReplicationQueueRemoteProcedure[] procs =
113        new ClaimReplicationQueueRemoteProcedure[Math.min(queues.size(), targetServers.size())];
114      for (int i = 0; i < procs.length; i++) {
115        procs[i] = new ClaimReplicationQueueRemoteProcedure(crashedServer, queues.get(i),
116          targetServers.get(i));
117      }
118      return procs;
119    } catch (ReplicationException e) {
120      if (retryCounter == null) {
121        retryCounter = ProcedureUtil.createRetryCounter(env.getMasterConfiguration());
122      }
123      long backoff = retryCounter.getBackoffTimeAndIncrementAttempts();
124      LOG.warn("Failed to claim replication queues for {}, suspend {}secs {}; {};", crashedServer,
125        backoff / 1000, e);
126      setTimeout(Math.toIntExact(backoff));
127      setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
128      skipPersistence();
129      throw new ProcedureSuspendedException();
130    }
131  }
132
133  @Override
134  protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) {
135    setState(ProcedureProtos.ProcedureState.RUNNABLE);
136    env.getProcedureScheduler().addFront(this);
137    return false;
138  }
139
140  @Override
141  protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException {
142    throw new UnsupportedOperationException();
143  }
144
145  @Override
146  protected boolean abort(MasterProcedureEnv env) {
147    return false;
148  }
149
150  @Override
151  protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
152    serializer.serialize(ClaimReplicationQueuesStateData.newBuilder()
153      .setCrashedServer(ProtobufUtil.toServerName(crashedServer)).build());
154  }
155
156  @Override
157  protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
158    ClaimReplicationQueuesStateData data =
159      serializer.deserialize(ClaimReplicationQueuesStateData.class);
160    crashedServer = ProtobufUtil.toServerName(data.getCrashedServer());
161  }
162}