001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.replication; 019 020import java.io.IOException; 021import java.util.Collections; 022import java.util.HashSet; 023import java.util.Iterator; 024import java.util.List; 025import java.util.Set; 026import org.apache.hadoop.hbase.ServerName; 027import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; 028import org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface; 029import org.apache.hadoop.hbase.procedure2.Procedure; 030import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; 031import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; 032import org.apache.hadoop.hbase.procedure2.ProcedureUtil; 033import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; 034import org.apache.hadoop.hbase.replication.ReplicationException; 035import org.apache.hadoop.hbase.replication.ReplicationGroupOffset; 036import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; 037import org.apache.hadoop.hbase.replication.ReplicationQueueId; 038import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 039import org.apache.hadoop.hbase.util.RetryCounter; 040import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; 041import org.apache.yetus.audience.InterfaceAudience; 042import org.slf4j.Logger; 043import org.slf4j.LoggerFactory; 044 045import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 046import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ClaimReplicationQueuesStateData; 047import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; 048 049/** 050 * Used to assign the replication queues of a dead server to other region servers. 051 * @deprecated Use {@link AssignReplicationQueuesProcedure} instead, kept only for keeping 052 * compatibility. 053 */ 054@Deprecated 055@InterfaceAudience.Private 056public class ClaimReplicationQueuesProcedure extends Procedure<MasterProcedureEnv> 057 implements ServerProcedureInterface { 058 059 private static final Logger LOG = LoggerFactory.getLogger(ClaimReplicationQueuesProcedure.class); 060 061 private ServerName crashedServer; 062 063 private RetryCounter retryCounter; 064 065 public ClaimReplicationQueuesProcedure() { 066 } 067 068 public ClaimReplicationQueuesProcedure(ServerName crashedServer) { 069 this.crashedServer = crashedServer; 070 } 071 072 @Override 073 public ServerName getServerName() { 074 return crashedServer; 075 } 076 077 @Override 078 public boolean hasMetaTableRegion() { 079 return false; 080 } 081 082 @Override 083 public ServerOperationType getServerOperationType() { 084 return ServerOperationType.CLAIM_REPLICATION_QUEUES; 085 } 086 087 @Override 088 protected Procedure<MasterProcedureEnv>[] execute(MasterProcedureEnv env) 089 throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { 090 ReplicationQueueStorage storage = env.getReplicationPeerManager().getQueueStorage(); 091 try { 092 List<ReplicationQueueId> queues = storage.listAllQueueIds(crashedServer); 093 Set<String> existQueuePeerIds = new HashSet<>(); 094 // this is for upgrading to the new region replication framework, where we will delete the 095 // legacy region_replica_replication peer directly, without deleting the replication queues 096 for (Iterator<ReplicationQueueId> iter = queues.iterator(); iter.hasNext();) { 097 ReplicationQueueId queueId = iter.next(); 098 if (queueId.getPeerId().equals(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER)) { 099 LOG.info("Found replication queue {} for legacy region replication peer, " 100 + "skipping claiming and removing...", queueId); 101 iter.remove(); 102 storage.removeQueue(queueId); 103 } else if (!queueId.isRecovered()) { 104 existQueuePeerIds.add(queueId.getPeerId()); 105 } 106 } 107 List<ReplicationPeerDescription> peers = env.getReplicationPeerManager().listPeers(null); 108 // TODO: the implementation is not enough yet, if there are retries, we need to know whether 109 // the replication queue for the given peer has been claimed or not, otherwise this logic will 110 // introduce redundant replication queues for the same peer. Add this logic to make some UTs 111 // pass first. 112 for (ReplicationPeerDescription peer : peers) { 113 if (!existQueuePeerIds.contains(peer.getPeerId())) { 114 ReplicationQueueId queueId = new ReplicationQueueId(crashedServer, peer.getPeerId()); 115 env.getReplicationPeerManager().getQueueStorage().setOffset(queueId, 116 crashedServer.toString(), ReplicationGroupOffset.BEGIN, Collections.emptyMap()); 117 queues.add(queueId); 118 } 119 } 120 if (queues.isEmpty()) { 121 LOG.debug("Finish claiming replication queues for {}", crashedServer); 122 // we are done 123 return null; 124 } 125 LOG.debug("There are {} replication queues need to be claimed for {}", queues.size(), 126 crashedServer); 127 List<ServerName> targetServers = 128 env.getMasterServices().getServerManager().getOnlineServersList(); 129 if (targetServers.isEmpty()) { 130 throw new ReplicationException("no region server available"); 131 } 132 Collections.shuffle(targetServers); 133 ClaimReplicationQueueRemoteProcedure[] procs = 134 new ClaimReplicationQueueRemoteProcedure[Math.min(queues.size(), targetServers.size())]; 135 for (int i = 0; i < procs.length; i++) { 136 procs[i] = new ClaimReplicationQueueRemoteProcedure(queues.get(i), targetServers.get(i)); 137 } 138 return procs; 139 } catch (ReplicationException e) { 140 if (retryCounter == null) { 141 retryCounter = ProcedureUtil.createRetryCounter(env.getMasterConfiguration()); 142 } 143 long backoff = retryCounter.getBackoffTimeAndIncrementAttempts(); 144 LOG.warn("Failed to claim replication queues for {}, suspend {}secs {}; {};", crashedServer, 145 backoff / 1000, e); 146 setTimeout(Math.toIntExact(backoff)); 147 setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT); 148 skipPersistence(); 149 throw new ProcedureSuspendedException(); 150 } 151 } 152 153 @Override 154 protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) { 155 setState(ProcedureProtos.ProcedureState.RUNNABLE); 156 env.getProcedureScheduler().addFront(this); 157 return false; 158 } 159 160 @Override 161 protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException { 162 throw new UnsupportedOperationException(); 163 } 164 165 @Override 166 protected boolean abort(MasterProcedureEnv env) { 167 return false; 168 } 169 170 @Override 171 protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { 172 serializer.serialize(ClaimReplicationQueuesStateData.newBuilder() 173 .setCrashedServer(ProtobufUtil.toServerName(crashedServer)).build()); 174 } 175 176 @Override 177 protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { 178 ClaimReplicationQueuesStateData data = 179 serializer.deserialize(ClaimReplicationQueuesStateData.class); 180 crashedServer = ProtobufUtil.toServerName(data.getCrashedServer()); 181 } 182}