001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.replication;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.HashMap;
023import java.util.List;
024import java.util.Map;
025import org.apache.hadoop.hbase.TableName;
026import org.apache.hadoop.hbase.client.Connection;
027import org.apache.hadoop.hbase.client.TableDescriptor;
028import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
029import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
030import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
031import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
032import org.apache.hadoop.hbase.replication.ReplicationBarrierFamilyFormat;
033import org.apache.hadoop.hbase.replication.ReplicationException;
034import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
035import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
036import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
037import org.apache.hadoop.hbase.replication.ReplicationUtils;
038import org.apache.yetus.audience.InterfaceAudience;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041
042import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerModificationState;
043import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.UpdatePeerConfigStateData;
044
045/**
046 * The procedure for updating the config for a replication peer.
047 */
048@InterfaceAudience.Private
049public class UpdatePeerConfigProcedure extends ModifyPeerProcedure {
050
051  private static final Logger LOG = LoggerFactory.getLogger(UpdatePeerConfigProcedure.class);
052
053  private ReplicationPeerConfig peerConfig;
054
055  private ReplicationPeerConfig oldPeerConfig;
056
057  private boolean enabled;
058
059  public UpdatePeerConfigProcedure() {
060  }
061
062  public UpdatePeerConfigProcedure(String peerId, ReplicationPeerConfig peerConfig) {
063    super(peerId);
064    this.peerConfig = peerConfig;
065  }
066
067  @Override
068  public PeerOperationType getPeerOperationType() {
069    return PeerOperationType.UPDATE_CONFIG;
070  }
071
072  private void addToList(List<String> encodedRegionNames, String encodedRegionName,
073    ReplicationQueueStorage queueStorage) throws ReplicationException {
074    encodedRegionNames.add(encodedRegionName);
075    if (encodedRegionNames.size() >= UPDATE_LAST_SEQ_ID_BATCH_SIZE) {
076      queueStorage.removeLastSequenceIds(peerId, encodedRegionNames);
077      encodedRegionNames.clear();
078    }
079  }
080
081  @Override
082  protected PeerModificationState nextStateAfterRefresh() {
083    if (peerConfig.isSerial()) {
084      if (oldPeerConfig.isSerial()) {
085        // both serial, then if the ns/table-cfs configs are not changed, just go with the normal
086        // way, otherwise we need to reopen the regions for the newly added tables.
087        return ReplicationUtils.isNamespacesAndTableCFsEqual(peerConfig, oldPeerConfig)
088          ? super.nextStateAfterRefresh()
089          : PeerModificationState.SERIAL_PEER_REOPEN_REGIONS;
090      } else {
091        // we change the peer to serial, need to reopen all regions
092        return PeerModificationState.SERIAL_PEER_REOPEN_REGIONS;
093      }
094    } else {
095      if (oldPeerConfig.isSerial()) {
096        // we remove the serial flag for peer, then we do not need to reopen all regions, but we
097        // need to remove the last pushed sequence ids.
098        return PeerModificationState.SERIAL_PEER_UPDATE_LAST_PUSHED_SEQ_ID;
099      } else {
100        // not serial for both, just go with the normal way.
101        return super.nextStateAfterRefresh();
102      }
103    }
104  }
105
106  @Override
107  protected void updateLastPushedSequenceIdForSerialPeer(MasterProcedureEnv env)
108    throws IOException, ReplicationException {
109    if (!oldPeerConfig.isSerial()) {
110      assert peerConfig.isSerial();
111      // change to serial
112      setLastPushedSequenceId(env, peerConfig);
113      return;
114    }
115    if (!peerConfig.isSerial()) {
116      // remove the serial flag
117      env.getReplicationPeerManager().removeAllLastPushedSeqIds(peerId);
118      return;
119    }
120    // enter here means peerConfig and oldPeerConfig are both serial, let's find out the diffs and
121    // process them
122    ReplicationQueueStorage queueStorage = env.getReplicationPeerManager().getQueueStorage();
123    Connection conn = env.getMasterServices().getConnection();
124    Map<String, Long> lastSeqIds = new HashMap<String, Long>();
125    List<String> encodedRegionNames = new ArrayList<>();
126    for (TableDescriptor td : env.getMasterServices().getTableDescriptors().getAll().values()) {
127      if (!td.hasGlobalReplicationScope()) {
128        continue;
129      }
130      TableName tn = td.getTableName();
131      if (oldPeerConfig.needToReplicate(tn)) {
132        if (!peerConfig.needToReplicate(tn)) {
133          // removed from peer config
134          for (String encodedRegionName : ReplicationBarrierFamilyFormat
135            .getTableEncodedRegionNamesForSerialReplication(conn, tn)) {
136            addToList(encodedRegionNames, encodedRegionName, queueStorage);
137          }
138        }
139      } else if (peerConfig.needToReplicate(tn)) {
140        // newly added to peer config
141        setLastPushedSequenceIdForTable(env, tn, lastSeqIds);
142      }
143    }
144    if (!encodedRegionNames.isEmpty()) {
145      queueStorage.removeLastSequenceIds(peerId, encodedRegionNames);
146    }
147    if (!lastSeqIds.isEmpty()) {
148      queueStorage.setLastSequenceIds(peerId, lastSeqIds);
149    }
150  }
151
152  @Override
153  protected boolean enablePeerBeforeFinish() {
154    // do not need to test reopenRegionsAfterRefresh since we can only enter here if
155    // reopenRegionsAfterRefresh returns true.
156    return enabled;
157  }
158
159  @Override
160  protected ReplicationPeerConfig getOldPeerConfig() {
161    return oldPeerConfig;
162  }
163
164  @Override
165  protected ReplicationPeerConfig getNewPeerConfig() {
166    return peerConfig;
167  }
168
169  @Override
170  protected void prePeerModification(MasterProcedureEnv env) throws IOException {
171    MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
172    if (cpHost != null) {
173      cpHost.preUpdateReplicationPeerConfig(peerId, peerConfig);
174    }
175    ReplicationPeerDescription desc =
176      env.getReplicationPeerManager().preUpdatePeerConfig(peerId, peerConfig);
177    oldPeerConfig = desc.getPeerConfig();
178    enabled = desc.isEnabled();
179  }
180
181  @Override
182  protected void updatePeerStorage(MasterProcedureEnv env) throws ReplicationException {
183    env.getReplicationPeerManager().updatePeerConfig(peerId, peerConfig);
184    // if we need to jump to the special states for serial peers, then we need to disable the peer
185    // first if it is not disabled yet.
186    if (enabled && nextStateAfterRefresh() != super.nextStateAfterRefresh()) {
187      env.getReplicationPeerManager().disablePeer(peerId);
188    }
189  }
190
191  @Override
192  protected void postPeerModification(MasterProcedureEnv env)
193    throws IOException, ReplicationException {
194    if (oldPeerConfig.isSerial() && !peerConfig.isSerial()) {
195      env.getReplicationPeerManager().removeAllLastPushedSeqIds(peerId);
196    }
197    LOG.info("Successfully updated peer config of {} to {}", peerId, peerConfig);
198    MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
199    if (cpHost != null) {
200      cpHost.postUpdateReplicationPeerConfig(peerId, peerConfig);
201    }
202  }
203
204  @Override
205  protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
206    super.serializeStateData(serializer);
207    UpdatePeerConfigStateData.Builder builder = UpdatePeerConfigStateData.newBuilder()
208      .setPeerConfig(ReplicationPeerConfigUtil.convert(peerConfig));
209    if (oldPeerConfig != null) {
210      builder.setOldPeerConfig(ReplicationPeerConfigUtil.convert(oldPeerConfig));
211    }
212    builder.setEnabled(enabled);
213    serializer.serialize(builder.build());
214  }
215
216  @Override
217  protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
218    super.deserializeStateData(serializer);
219    UpdatePeerConfigStateData data = serializer.deserialize(UpdatePeerConfigStateData.class);
220    peerConfig = ReplicationPeerConfigUtil.convert(data.getPeerConfig());
221    if (data.hasOldPeerConfig()) {
222      oldPeerConfig = ReplicationPeerConfigUtil.convert(data.getOldPeerConfig());
223    } else {
224      oldPeerConfig = null;
225    }
226    enabled = data.getEnabled();
227  }
228}