001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.replication; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.HashMap; 023import java.util.List; 024import java.util.Map; 025import org.apache.hadoop.hbase.TableName; 026import org.apache.hadoop.hbase.client.Connection; 027import org.apache.hadoop.hbase.client.TableDescriptor; 028import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; 029import org.apache.hadoop.hbase.master.MasterCoprocessorHost; 030import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; 031import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; 032import org.apache.hadoop.hbase.replication.ReplicationBarrierFamilyFormat; 033import org.apache.hadoop.hbase.replication.ReplicationException; 034import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 035import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; 036import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 037import org.apache.hadoop.hbase.replication.ReplicationUtils; 038import org.apache.yetus.audience.InterfaceAudience; 039import org.slf4j.Logger; 040import org.slf4j.LoggerFactory; 041 042import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerModificationState; 043import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.UpdatePeerConfigStateData; 044 045/** 046 * The procedure for updating the config for a replication peer. 047 */ 048@InterfaceAudience.Private 049public class UpdatePeerConfigProcedure extends ModifyPeerProcedure { 050 051 private static final Logger LOG = LoggerFactory.getLogger(UpdatePeerConfigProcedure.class); 052 053 private ReplicationPeerConfig peerConfig; 054 055 private ReplicationPeerConfig oldPeerConfig; 056 057 private boolean enabled; 058 059 public UpdatePeerConfigProcedure() { 060 } 061 062 public UpdatePeerConfigProcedure(String peerId, ReplicationPeerConfig peerConfig) { 063 super(peerId); 064 this.peerConfig = peerConfig; 065 } 066 067 @Override 068 public PeerOperationType getPeerOperationType() { 069 return PeerOperationType.UPDATE_CONFIG; 070 } 071 072 private void addToList(List<String> encodedRegionNames, String encodedRegionName, 073 ReplicationQueueStorage queueStorage) throws ReplicationException { 074 encodedRegionNames.add(encodedRegionName); 075 if (encodedRegionNames.size() >= UPDATE_LAST_SEQ_ID_BATCH_SIZE) { 076 queueStorage.removeLastSequenceIds(peerId, encodedRegionNames); 077 encodedRegionNames.clear(); 078 } 079 } 080 081 @Override 082 protected PeerModificationState nextStateAfterRefresh() { 083 if (peerConfig.isSerial()) { 084 if (oldPeerConfig.isSerial()) { 085 // both serial, then if the ns/table-cfs configs are not changed, just go with the normal 086 // way, otherwise we need to reopen the regions for the newly added tables. 087 return ReplicationUtils.isNamespacesAndTableCFsEqual(peerConfig, oldPeerConfig) 088 ? super.nextStateAfterRefresh() 089 : PeerModificationState.SERIAL_PEER_REOPEN_REGIONS; 090 } else { 091 // we change the peer to serial, need to reopen all regions 092 return PeerModificationState.SERIAL_PEER_REOPEN_REGIONS; 093 } 094 } else { 095 if (oldPeerConfig.isSerial()) { 096 // we remove the serial flag for peer, then we do not need to reopen all regions, but we 097 // need to remove the last pushed sequence ids. 098 return PeerModificationState.SERIAL_PEER_UPDATE_LAST_PUSHED_SEQ_ID; 099 } else { 100 // not serial for both, just go with the normal way. 101 return super.nextStateAfterRefresh(); 102 } 103 } 104 } 105 106 @Override 107 protected void updateLastPushedSequenceIdForSerialPeer(MasterProcedureEnv env) 108 throws IOException, ReplicationException { 109 if (!oldPeerConfig.isSerial()) { 110 assert peerConfig.isSerial(); 111 // change to serial 112 setLastPushedSequenceId(env, peerConfig); 113 return; 114 } 115 if (!peerConfig.isSerial()) { 116 // remove the serial flag 117 env.getReplicationPeerManager().removeAllLastPushedSeqIds(peerId); 118 return; 119 } 120 // enter here means peerConfig and oldPeerConfig are both serial, let's find out the diffs and 121 // process them 122 ReplicationQueueStorage queueStorage = env.getReplicationPeerManager().getQueueStorage(); 123 Connection conn = env.getMasterServices().getConnection(); 124 Map<String, Long> lastSeqIds = new HashMap<String, Long>(); 125 List<String> encodedRegionNames = new ArrayList<>(); 126 for (TableDescriptor td : env.getMasterServices().getTableDescriptors().getAll().values()) { 127 if (!td.hasGlobalReplicationScope()) { 128 continue; 129 } 130 TableName tn = td.getTableName(); 131 if (oldPeerConfig.needToReplicate(tn)) { 132 if (!peerConfig.needToReplicate(tn)) { 133 // removed from peer config 134 for (String encodedRegionName : ReplicationBarrierFamilyFormat 135 .getTableEncodedRegionNamesForSerialReplication(conn, tn)) { 136 addToList(encodedRegionNames, encodedRegionName, queueStorage); 137 } 138 } 139 } else if (peerConfig.needToReplicate(tn)) { 140 // newly added to peer config 141 setLastPushedSequenceIdForTable(env, tn, lastSeqIds); 142 } 143 } 144 if (!encodedRegionNames.isEmpty()) { 145 queueStorage.removeLastSequenceIds(peerId, encodedRegionNames); 146 } 147 if (!lastSeqIds.isEmpty()) { 148 queueStorage.setLastSequenceIds(peerId, lastSeqIds); 149 } 150 } 151 152 @Override 153 protected boolean enablePeerBeforeFinish() { 154 // do not need to test reopenRegionsAfterRefresh since we can only enter here if 155 // reopenRegionsAfterRefresh returns true. 156 return enabled; 157 } 158 159 @Override 160 protected ReplicationPeerConfig getOldPeerConfig() { 161 return oldPeerConfig; 162 } 163 164 @Override 165 protected ReplicationPeerConfig getNewPeerConfig() { 166 return peerConfig; 167 } 168 169 @Override 170 protected void prePeerModification(MasterProcedureEnv env) throws IOException { 171 MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); 172 if (cpHost != null) { 173 cpHost.preUpdateReplicationPeerConfig(peerId, peerConfig); 174 } 175 ReplicationPeerDescription desc = 176 env.getReplicationPeerManager().preUpdatePeerConfig(peerId, peerConfig); 177 oldPeerConfig = desc.getPeerConfig(); 178 enabled = desc.isEnabled(); 179 } 180 181 @Override 182 protected void updatePeerStorage(MasterProcedureEnv env) throws ReplicationException { 183 env.getReplicationPeerManager().updatePeerConfig(peerId, peerConfig); 184 // if we need to jump to the special states for serial peers, then we need to disable the peer 185 // first if it is not disabled yet. 186 if (enabled && nextStateAfterRefresh() != super.nextStateAfterRefresh()) { 187 env.getReplicationPeerManager().disablePeer(peerId); 188 } 189 } 190 191 @Override 192 protected void postPeerModification(MasterProcedureEnv env) 193 throws IOException, ReplicationException { 194 if (oldPeerConfig.isSerial() && !peerConfig.isSerial()) { 195 env.getReplicationPeerManager().removeAllLastPushedSeqIds(peerId); 196 } 197 LOG.info("Successfully updated peer config of {} to {}", peerId, peerConfig); 198 MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); 199 if (cpHost != null) { 200 cpHost.postUpdateReplicationPeerConfig(peerId, peerConfig); 201 } 202 } 203 204 @Override 205 protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { 206 super.serializeStateData(serializer); 207 UpdatePeerConfigStateData.Builder builder = UpdatePeerConfigStateData.newBuilder() 208 .setPeerConfig(ReplicationPeerConfigUtil.convert(peerConfig)); 209 if (oldPeerConfig != null) { 210 builder.setOldPeerConfig(ReplicationPeerConfigUtil.convert(oldPeerConfig)); 211 } 212 builder.setEnabled(enabled); 213 serializer.serialize(builder.build()); 214 } 215 216 @Override 217 protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { 218 super.deserializeStateData(serializer); 219 UpdatePeerConfigStateData data = serializer.deserialize(UpdatePeerConfigStateData.class); 220 peerConfig = ReplicationPeerConfigUtil.convert(data.getPeerConfig()); 221 if (data.hasOldPeerConfig()) { 222 oldPeerConfig = ReplicationPeerConfigUtil.convert(data.getOldPeerConfig()); 223 } else { 224 oldPeerConfig = null; 225 } 226 enabled = data.getEnabled(); 227 } 228}