001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.replication;
019
020import java.io.IOException;
021import java.util.Arrays;
022import java.util.Collections;
023import java.util.List;
024import java.util.stream.Collectors;
025import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
026import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
027import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
028import org.apache.hadoop.hbase.procedure2.Procedure;
029import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
030import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
031import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
032import org.apache.hadoop.hbase.replication.ReplicationException;
033import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
034import org.apache.yetus.audience.InterfaceAudience;
035import org.slf4j.Logger;
036import org.slf4j.LoggerFactory;
037
038import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RemovePeerStateData;
039
040/**
041 * The procedure for removing a replication peer.
042 */
043@InterfaceAudience.Private
044public class RemovePeerProcedure extends ModifyPeerProcedure {
045
046  private static final Logger LOG = LoggerFactory.getLogger(RemovePeerProcedure.class);
047
048  private ReplicationPeerConfig peerConfig;
049
050  private List<Long> ongoingAssignReplicationQueuesProcIds = Collections.emptyList();
051
052  public RemovePeerProcedure() {
053  }
054
055  public RemovePeerProcedure(String peerId) {
056    super(peerId);
057  }
058
059  @Override
060  public PeerOperationType getPeerOperationType() {
061    return PeerOperationType.REMOVE;
062  }
063
064  @Override
065  protected void prePeerModification(MasterProcedureEnv env) throws IOException {
066    MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
067    if (cpHost != null) {
068      cpHost.preRemoveReplicationPeer(peerId);
069    }
070    peerConfig = env.getReplicationPeerManager().preRemovePeer(peerId);
071  }
072
073  @Override
074  protected void updatePeerStorage(MasterProcedureEnv env) throws ReplicationException {
075    env.getReplicationPeerManager().removePeer(peerId);
076    // record ongoing AssignReplicationQueuesProcedures after we update the peer storage
077    ongoingAssignReplicationQueuesProcIds = env.getMasterServices().getMasterProcedureExecutor()
078      .getProcedures().stream().filter(p -> p instanceof AssignReplicationQueuesProcedure)
079      .filter(p -> !p.isFinished()).map(Procedure::getProcId).collect(Collectors.toList());
080  }
081
082  private void removeRemoteWALs(MasterProcedureEnv env) throws IOException {
083    env.getMasterServices().getSyncReplicationReplayWALManager().removePeerRemoteWALs(peerId);
084  }
085
086  private void checkAssignReplicationQueuesFinished(MasterProcedureEnv env)
087    throws ProcedureSuspendedException {
088    if (ongoingAssignReplicationQueuesProcIds.isEmpty()) {
089      LOG.info("No ongoing assign replication queues procedures when removing peer {}, move on",
090        peerId);
091    }
092    ProcedureExecutor<MasterProcedureEnv> procExec =
093      env.getMasterServices().getMasterProcedureExecutor();
094    long[] unfinishedProcIds =
095      ongoingAssignReplicationQueuesProcIds.stream().map(procExec::getProcedure)
096        .filter(p -> p != null && !p.isFinished()).mapToLong(Procedure::getProcId).toArray();
097    if (unfinishedProcIds.length == 0) {
098      LOG.info(
099        "All assign replication queues procedures are finished when removing peer {}, move on",
100        peerId);
101    } else {
102      throw suspend(env.getMasterConfiguration(), backoff -> LOG.info(
103        "There are still {} pending assign replication queues procedures {} when removing peer {}, sleep {} secs",
104        unfinishedProcIds.length, Arrays.toString(unfinishedProcIds), peerId, backoff / 1000));
105    }
106  }
107
108  @Override
109  protected void postPeerModification(MasterProcedureEnv env)
110    throws IOException, ReplicationException, ProcedureSuspendedException {
111    checkAssignReplicationQueuesFinished(env);
112
113    if (peerConfig.isSyncReplication()) {
114      removeRemoteWALs(env);
115    }
116    env.getReplicationPeerManager().removeAllQueuesAndHFileRefs(peerId);
117    if (peerConfig.isSerial()) {
118      env.getReplicationPeerManager().removeAllLastPushedSeqIds(peerId);
119    }
120    LOG.info("Successfully removed peer {}", peerId);
121    MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
122    if (cpHost != null) {
123      cpHost.postRemoveReplicationPeer(peerId);
124    }
125  }
126
127  @Override
128  protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
129    super.serializeStateData(serializer);
130    RemovePeerStateData.Builder builder = RemovePeerStateData.newBuilder();
131    if (peerConfig != null) {
132      builder.setPeerConfig(ReplicationPeerConfigUtil.convert(peerConfig));
133    }
134    builder.addAllOngoingAssignReplicationQueuesProcIds(ongoingAssignReplicationQueuesProcIds);
135    serializer.serialize(builder.build());
136  }
137
138  @Override
139  protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
140    super.deserializeStateData(serializer);
141    RemovePeerStateData data = serializer.deserialize(RemovePeerStateData.class);
142    if (data.hasPeerConfig()) {
143      this.peerConfig = ReplicationPeerConfigUtil.convert(data.getPeerConfig());
144    }
145    ongoingAssignReplicationQueuesProcIds = data.getOngoingAssignReplicationQueuesProcIdsList();
146  }
147}