001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.replication; 019 020import java.io.IOException; 021import java.util.List; 022import org.apache.hadoop.hbase.ServerName; 023import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; 024import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; 025import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; 026import org.apache.yetus.audience.InterfaceAudience; 027import org.slf4j.Logger; 028import org.slf4j.LoggerFactory; 029 030import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 031import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SyncReplicationReplayWALState; 032import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SyncReplicationReplayWALStateData; 033 034/** 035 * The procedure for replaying a set of remote wals. It will get an available region server and 036 * schedule a {@link SyncReplicationReplayWALRemoteProcedure} to actually send the request to region 037 * server. 038 */ 039@InterfaceAudience.Private 040public class SyncReplicationReplayWALProcedure 041 extends AbstractPeerNoLockProcedure<SyncReplicationReplayWALState> { 042 043 private static final Logger LOG = 044 LoggerFactory.getLogger(SyncReplicationReplayWALProcedure.class); 045 046 private ServerName worker = null; 047 048 private List<String> wals; 049 050 public SyncReplicationReplayWALProcedure() { 051 } 052 053 public SyncReplicationReplayWALProcedure(String peerId, List<String> wals) { 054 this.peerId = peerId; 055 this.wals = wals; 056 } 057 058 @Override 059 protected Flow executeFromState(MasterProcedureEnv env, SyncReplicationReplayWALState state) 060 throws ProcedureSuspendedException { 061 SyncReplicationReplayWALManager syncReplicationReplayWALManager = 062 env.getMasterServices().getSyncReplicationReplayWALManager(); 063 switch (state) { 064 case ASSIGN_WORKER: 065 worker = syncReplicationReplayWALManager.acquirePeerWorker(peerId, this); 066 setNextState(SyncReplicationReplayWALState.DISPATCH_WALS_TO_WORKER); 067 return Flow.HAS_MORE_STATE; 068 case DISPATCH_WALS_TO_WORKER: 069 addChildProcedure(new SyncReplicationReplayWALRemoteProcedure(peerId, wals, worker)); 070 setNextState(SyncReplicationReplayWALState.RELEASE_WORKER); 071 return Flow.HAS_MORE_STATE; 072 case RELEASE_WORKER: 073 boolean finished = false; 074 try { 075 finished = syncReplicationReplayWALManager.isReplayWALFinished(wals.get(0)); 076 } catch (IOException e) { 077 throw suspend(env.getMasterConfiguration(), 078 backoff -> LOG.warn("Failed to check whether replay wals {} finished for peer id={}" 079 + ", sleep {} secs and retry", wals, peerId, backoff / 1000, e)); 080 } 081 syncReplicationReplayWALManager.releasePeerWorker(peerId, worker, 082 env.getProcedureScheduler()); 083 if (!finished) { 084 LOG.warn("Failed to replay wals {} for peer id={}, retry", wals, peerId); 085 setNextState(SyncReplicationReplayWALState.ASSIGN_WORKER); 086 return Flow.HAS_MORE_STATE; 087 } 088 return Flow.NO_MORE_STATE; 089 default: 090 throw new UnsupportedOperationException("unhandled state=" + state); 091 } 092 } 093 094 @Override 095 protected void rollbackState(MasterProcedureEnv env, SyncReplicationReplayWALState state) 096 throws IOException, InterruptedException { 097 if (state == getInitialState()) { 098 return; 099 } 100 throw new UnsupportedOperationException(); 101 } 102 103 @Override 104 protected SyncReplicationReplayWALState getState(int state) { 105 return SyncReplicationReplayWALState.forNumber(state); 106 } 107 108 @Override 109 protected int getStateId(SyncReplicationReplayWALState state) { 110 return state.getNumber(); 111 } 112 113 @Override 114 protected SyncReplicationReplayWALState getInitialState() { 115 return SyncReplicationReplayWALState.ASSIGN_WORKER; 116 } 117 118 @Override 119 protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { 120 super.serializeStateData(serializer); 121 SyncReplicationReplayWALStateData.Builder builder = 122 SyncReplicationReplayWALStateData.newBuilder().setPeerId(peerId).addAllWal(wals); 123 if (worker != null) { 124 builder.setWorker(ProtobufUtil.toServerName(worker)); 125 } 126 serializer.serialize(builder.build()); 127 } 128 129 @Override 130 protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { 131 super.deserializeStateData(serializer); 132 SyncReplicationReplayWALStateData data = 133 serializer.deserialize(SyncReplicationReplayWALStateData.class); 134 peerId = data.getPeerId(); 135 wals = data.getWalList(); 136 if (data.hasWorker()) { 137 worker = ProtobufUtil.toServerName(data.getWorker()); 138 } 139 } 140 141 @Override 142 public PeerOperationType getPeerOperationType() { 143 return PeerOperationType.SYNC_REPLICATION_REPLAY_WAL; 144 } 145 146 @Override 147 protected void afterReplay(MasterProcedureEnv env) { 148 // If the procedure is not finished and the worker is not null, we should add it to the used 149 // worker set, to prevent the worker being used by others. 150 if (worker != null && !isFinished()) { 151 env.getMasterServices().getSyncReplicationReplayWALManager().addUsedPeerWorker(peerId, 152 worker); 153 } 154 } 155}