001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.replication; 019 020import java.io.IOException; 021import java.util.Collections; 022import java.util.HashSet; 023import java.util.Iterator; 024import java.util.List; 025import java.util.Set; 026import java.util.stream.Collectors; 027import org.apache.hadoop.fs.Path; 028import org.apache.hadoop.hbase.ServerName; 029import org.apache.hadoop.hbase.master.MasterFileSystem; 030import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; 031import org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface; 032import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; 033import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; 034import org.apache.hadoop.hbase.procedure2.ProcedureUtil; 035import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; 036import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; 037import org.apache.hadoop.hbase.replication.ReplicationException; 038import org.apache.hadoop.hbase.replication.ReplicationGroupOffset; 039import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; 040import org.apache.hadoop.hbase.replication.ReplicationQueueId; 041import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 042import org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp; 043import org.apache.hadoop.hbase.util.RetryCounter; 044import org.apache.yetus.audience.InterfaceAudience; 045import org.slf4j.Logger; 046import org.slf4j.LoggerFactory; 047 048import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 049import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.AssignReplicationQueuesState; 050import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.AssignReplicationQueuesStateData; 051import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; 052 053@InterfaceAudience.Private 054public class AssignReplicationQueuesProcedure 055 extends StateMachineProcedure<MasterProcedureEnv, AssignReplicationQueuesState> 056 implements ServerProcedureInterface { 057 058 private static final Logger LOG = LoggerFactory.getLogger(AssignReplicationQueuesProcedure.class); 059 060 private ServerName crashedServer; 061 062 private RetryCounter retryCounter; 063 064 public AssignReplicationQueuesProcedure() { 065 } 066 067 public AssignReplicationQueuesProcedure(ServerName crashedServer) { 068 this.crashedServer = crashedServer; 069 } 070 071 @Override 072 public ServerName getServerName() { 073 return crashedServer; 074 } 075 076 @Override 077 public boolean hasMetaTableRegion() { 078 return false; 079 } 080 081 @Override 082 public ServerOperationType getServerOperationType() { 083 return ServerOperationType.CLAIM_REPLICATION_QUEUES; 084 } 085 086 private void addMissingQueues(MasterProcedureEnv env) throws ReplicationException { 087 ReplicationQueueStorage storage = env.getReplicationPeerManager().getQueueStorage(); 088 089 Set<String> existingQueuePeerIds = new HashSet<>(); 090 List<ReplicationQueueId> queueIds = storage.listAllQueueIds(crashedServer); 091 for (Iterator<ReplicationQueueId> iter = queueIds.iterator(); iter.hasNext();) { 092 ReplicationQueueId queueId = iter.next(); 093 if (!queueId.isRecovered()) { 094 existingQueuePeerIds.add(queueId.getPeerId()); 095 } 096 } 097 List<ReplicationPeerDescription> peers = env.getReplicationPeerManager().listPeers(null); 098 for (ReplicationPeerDescription peer : peers) { 099 if (!existingQueuePeerIds.contains(peer.getPeerId())) { 100 ReplicationQueueId queueId = new ReplicationQueueId(crashedServer, peer.getPeerId()); 101 LOG.debug("Add replication queue {} for claiming", queueId); 102 env.getReplicationPeerManager().getQueueStorage().setOffset(queueId, 103 crashedServer.toString(), ReplicationGroupOffset.BEGIN, Collections.emptyMap()); 104 } 105 } 106 } 107 108 private Flow claimQueues(MasterProcedureEnv env) throws ReplicationException, IOException { 109 Set<String> existingPeerIds = env.getReplicationPeerManager().listPeers(null).stream() 110 .map(ReplicationPeerDescription::getPeerId).collect(Collectors.toSet()); 111 ReplicationQueueStorage storage = env.getReplicationPeerManager().getQueueStorage(); 112 // filter out replication queue for deleted peers 113 List<ReplicationQueueId> queueIds = storage.listAllQueueIds(crashedServer).stream() 114 .filter(q -> existingPeerIds.contains(q.getPeerId())).collect(Collectors.toList()); 115 if (queueIds.isEmpty()) { 116 LOG.debug("Finish claiming replication queues for {}", crashedServer); 117 // we are done 118 return Flow.NO_MORE_STATE; 119 } 120 LOG.debug("There are {} replication queues need to be claimed for {}", queueIds.size(), 121 crashedServer); 122 List<ServerName> targetServers = 123 env.getMasterServices().getServerManager().getOnlineServersList(); 124 if (targetServers.isEmpty()) { 125 throw new ReplicationException("no region server available"); 126 } 127 Collections.shuffle(targetServers); 128 for (int i = 0, n = Math.min(queueIds.size(), targetServers.size()); i < n; i++) { 129 addChildProcedure( 130 new ClaimReplicationQueueRemoteProcedure(queueIds.get(i), targetServers.get(i))); 131 } 132 retryCounter = null; 133 return Flow.HAS_MORE_STATE; 134 } 135 136 // check whether ReplicationSyncUp has already done the work for us, if so, we should skip 137 // claiming the replication queues and deleting them instead. 138 private boolean shouldSkip(MasterProcedureEnv env) throws IOException { 139 MasterFileSystem mfs = env.getMasterFileSystem(); 140 Path syncUpDir = new Path(mfs.getRootDir(), ReplicationSyncUp.INFO_DIR); 141 return mfs.getFileSystem().exists(new Path(syncUpDir, crashedServer.getServerName())); 142 } 143 144 private void removeQueues(MasterProcedureEnv env) throws ReplicationException, IOException { 145 ReplicationQueueStorage storage = env.getReplicationPeerManager().getQueueStorage(); 146 for (ReplicationQueueId queueId : storage.listAllQueueIds(crashedServer)) { 147 storage.removeQueue(queueId); 148 } 149 MasterFileSystem mfs = env.getMasterFileSystem(); 150 Path syncUpDir = new Path(mfs.getRootDir(), ReplicationSyncUp.INFO_DIR); 151 // remove the region server record file 152 mfs.getFileSystem().delete(new Path(syncUpDir, crashedServer.getServerName()), false); 153 } 154 155 @Override 156 protected Flow executeFromState(MasterProcedureEnv env, AssignReplicationQueuesState state) 157 throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { 158 try { 159 switch (state) { 160 case ASSIGN_REPLICATION_QUEUES_ADD_MISSING_QUEUES: 161 if (shouldSkip(env)) { 162 setNextState(AssignReplicationQueuesState.ASSIGN_REPLICATION_QUEUES_REMOVE_QUEUES); 163 return Flow.HAS_MORE_STATE; 164 } else { 165 addMissingQueues(env); 166 retryCounter = null; 167 setNextState(AssignReplicationQueuesState.ASSIGN_REPLICATION_QUEUES_CLAIM); 168 return Flow.HAS_MORE_STATE; 169 } 170 case ASSIGN_REPLICATION_QUEUES_CLAIM: 171 if (shouldSkip(env)) { 172 retryCounter = null; 173 setNextState(AssignReplicationQueuesState.ASSIGN_REPLICATION_QUEUES_REMOVE_QUEUES); 174 return Flow.HAS_MORE_STATE; 175 } else { 176 return claimQueues(env); 177 } 178 case ASSIGN_REPLICATION_QUEUES_REMOVE_QUEUES: 179 removeQueues(env); 180 return Flow.NO_MORE_STATE; 181 default: 182 throw new UnsupportedOperationException("unhandled state=" + state); 183 } 184 } catch (Exception e) { 185 if (retryCounter == null) { 186 retryCounter = ProcedureUtil.createRetryCounter(env.getMasterConfiguration()); 187 } 188 long backoff = retryCounter.getBackoffTimeAndIncrementAttempts(); 189 LOG.warn("Failed to claim replication queues for {}, suspend {} secs", crashedServer, 190 backoff / 1000, e); 191 setTimeout(Math.toIntExact(backoff)); 192 setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT); 193 skipPersistence(); 194 throw new ProcedureSuspendedException(); 195 } 196 } 197 198 @Override 199 protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) { 200 setState(ProcedureProtos.ProcedureState.RUNNABLE); 201 env.getProcedureScheduler().addFront(this); 202 return false; 203 } 204 205 @Override 206 protected void rollbackState(MasterProcedureEnv env, AssignReplicationQueuesState state) 207 throws IOException, InterruptedException { 208 throw new UnsupportedOperationException(); 209 } 210 211 @Override 212 protected AssignReplicationQueuesState getState(int stateId) { 213 return AssignReplicationQueuesState.forNumber(stateId); 214 } 215 216 @Override 217 protected int getStateId(AssignReplicationQueuesState state) { 218 return state.getNumber(); 219 } 220 221 @Override 222 protected AssignReplicationQueuesState getInitialState() { 223 return AssignReplicationQueuesState.ASSIGN_REPLICATION_QUEUES_ADD_MISSING_QUEUES; 224 } 225 226 @Override 227 protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { 228 super.serializeStateData(serializer); 229 serializer.serialize(AssignReplicationQueuesStateData.newBuilder() 230 .setCrashedServer(ProtobufUtil.toServerName(crashedServer)).build()); 231 } 232 233 @Override 234 protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { 235 super.deserializeStateData(serializer); 236 AssignReplicationQueuesStateData proto = 237 serializer.deserialize(AssignReplicationQueuesStateData.class); 238 crashedServer = ProtobufUtil.toServerName(proto.getCrashedServer()); 239 } 240 241}