001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.replication; 019 020import java.io.IOException; 021import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; 022import org.apache.hadoop.hbase.master.MasterCoprocessorHost; 023import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; 024import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; 025import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; 026import org.apache.hadoop.hbase.replication.ReplicationException; 027import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 028import org.apache.yetus.audience.InterfaceAudience; 029import org.slf4j.Logger; 030import org.slf4j.LoggerFactory; 031 032import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.AddPeerStateData; 033import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerModificationState; 034 035/** 036 * The procedure for adding a new replication peer. 037 */ 038@InterfaceAudience.Private 039public class AddPeerProcedure extends ModifyPeerProcedure { 040 041 private static final Logger LOG = LoggerFactory.getLogger(AddPeerProcedure.class); 042 043 private ReplicationPeerConfig peerConfig; 044 045 private boolean enabled; 046 047 private boolean cleanerDisabled; 048 049 public AddPeerProcedure() { 050 } 051 052 public AddPeerProcedure(String peerId, ReplicationPeerConfig peerConfig, boolean enabled) { 053 super(peerId); 054 this.peerConfig = peerConfig; 055 this.enabled = enabled; 056 } 057 058 @Override 059 public PeerOperationType getPeerOperationType() { 060 return PeerOperationType.ADD; 061 } 062 063 @Override 064 protected PeerModificationState nextStateAfterRefresh() { 065 return peerConfig.isSerial() 066 ? PeerModificationState.SERIAL_PEER_REOPEN_REGIONS 067 : super.nextStateAfterRefresh(); 068 } 069 070 @Override 071 protected void updateLastPushedSequenceIdForSerialPeer(MasterProcedureEnv env) 072 throws IOException, ReplicationException { 073 setLastPushedSequenceId(env, peerConfig); 074 } 075 076 @Override 077 protected boolean enablePeerBeforeFinish() { 078 return enabled; 079 } 080 081 @Override 082 protected ReplicationPeerConfig getNewPeerConfig() { 083 return peerConfig; 084 } 085 086 @Override 087 protected void releaseLatch(MasterProcedureEnv env) { 088 if (cleanerDisabled) { 089 env.getMasterServices().getReplicationLogCleanerBarrier().enable(); 090 } 091 if (peerConfig.isSyncReplication()) { 092 env.getMasterServices().getSyncReplicationPeerLock().release(); 093 } 094 super.releaseLatch(env); 095 } 096 097 @Override 098 protected void prePeerModification(MasterProcedureEnv env) 099 throws IOException, ReplicationException, ProcedureSuspendedException { 100 if (!env.getMasterServices().getReplicationLogCleanerBarrier().disable()) { 101 throw suspend(env.getMasterConfiguration(), 102 backoff -> LOG.warn("LogCleaner is run at the same time when adding peer {}, sleep {} secs", 103 peerId, backoff / 1000)); 104 } 105 cleanerDisabled = true; 106 MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); 107 if (cpHost != null) { 108 cpHost.preAddReplicationPeer(peerId, peerConfig); 109 } 110 if (peerConfig.isSyncReplication()) { 111 if (!env.getMasterServices().getSyncReplicationPeerLock().tryAcquire()) { 112 throw suspend(env.getMasterConfiguration(), 113 backoff -> LOG.warn( 114 "Can not acquire sync replication peer lock for peer {}, sleep {} secs", peerId, 115 backoff / 1000)); 116 } 117 } 118 env.getReplicationPeerManager().preAddPeer(peerId, peerConfig); 119 } 120 121 @Override 122 protected void updatePeerStorage(MasterProcedureEnv env) throws ReplicationException { 123 env.getReplicationPeerManager().addPeer(peerId, peerConfig, 124 peerConfig.isSerial() ? false : enabled); 125 } 126 127 @Override 128 protected void postPeerModification(MasterProcedureEnv env) 129 throws IOException, ReplicationException { 130 LOG.info("Successfully added {} peer {}, config {}", enabled ? "ENABLED" : "DISABLED", peerId, 131 peerConfig); 132 MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); 133 if (cpHost != null) { 134 env.getMasterCoprocessorHost().postAddReplicationPeer(peerId, peerConfig); 135 } 136 } 137 138 @Override 139 protected void afterReplay(MasterProcedureEnv env) { 140 if (getCurrentState() == getInitialState()) { 141 // do not need to disable log cleaner or acquire lock if we are in the initial state, later 142 // when executing the procedure we will try to disable and acquire. 143 return; 144 } 145 if (!env.getMasterServices().getReplicationLogCleanerBarrier().disable()) { 146 throw new IllegalStateException("can not disable log cleaner, this should not happen"); 147 } 148 cleanerDisabled = true; 149 if (peerConfig.isSyncReplication()) { 150 if (!env.getMasterServices().getSyncReplicationPeerLock().tryAcquire()) { 151 throw new IllegalStateException( 152 "Can not acquire sync replication peer lock for peer " + peerId); 153 } 154 } 155 } 156 157 @Override 158 protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { 159 super.serializeStateData(serializer); 160 serializer.serialize(AddPeerStateData.newBuilder() 161 .setPeerConfig(ReplicationPeerConfigUtil.convert(peerConfig)).setEnabled(enabled).build()); 162 } 163 164 @Override 165 protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { 166 super.deserializeStateData(serializer); 167 AddPeerStateData data = serializer.deserialize(AddPeerStateData.class); 168 peerConfig = ReplicationPeerConfigUtil.convert(data.getPeerConfig()); 169 enabled = data.getEnabled(); 170 } 171}