001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import java.io.IOException;
021import java.util.concurrent.locks.Lock;
022import org.apache.hadoop.hbase.ServerName;
023import org.apache.hadoop.hbase.replication.ReplicationException;
024import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
025import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
026import org.apache.hadoop.hbase.replication.ReplicationPeerImpl;
027import org.apache.hadoop.hbase.replication.ReplicationPeers;
028import org.apache.hadoop.hbase.replication.ReplicationUtils;
029import org.apache.hadoop.hbase.util.KeyLocker;
030import org.apache.yetus.audience.InterfaceAudience;
031
032@InterfaceAudience.Private
033public class PeerProcedureHandlerImpl implements PeerProcedureHandler {
034
035  private final ReplicationSourceManager replicationSourceManager;
036  private final KeyLocker<String> peersLock = new KeyLocker<>();
037
038  public PeerProcedureHandlerImpl(ReplicationSourceManager replicationSourceManager) {
039    this.replicationSourceManager = replicationSourceManager;
040  }
041
042  @Override
043  public void addPeer(String peerId) throws IOException {
044    Lock peerLock = peersLock.acquireLock(peerId);
045    try {
046      replicationSourceManager.addPeer(peerId);
047    } finally {
048      peerLock.unlock();
049    }
050  }
051
052  @Override
053  public void removePeer(String peerId) throws IOException {
054    Lock peerLock = peersLock.acquireLock(peerId);
055    try {
056      if (replicationSourceManager.getReplicationPeers().getPeer(peerId) != null) {
057        replicationSourceManager.removePeer(peerId);
058      }
059    } finally {
060      peerLock.unlock();
061    }
062  }
063
064  private void refreshPeerState(String peerId) throws ReplicationException, IOException {
065    PeerState newState;
066    Lock peerLock = peersLock.acquireLock(peerId);
067    ReplicationPeerImpl peer = null;
068    PeerState oldState = null;
069    boolean success = false;
070    try {
071      peer = replicationSourceManager.getReplicationPeers().getPeer(peerId);
072      if (peer == null) {
073        throw new ReplicationException("Peer with id=" + peerId + " is not cached.");
074      }
075      oldState = peer.getPeerState();
076      newState = replicationSourceManager.getReplicationPeers().refreshPeerState(peerId);
077      // RS need to start work with the new replication state change
078      if (oldState.equals(PeerState.ENABLED) && newState.equals(PeerState.DISABLED)) {
079        replicationSourceManager.refreshSources(peerId);
080      }
081      success = true;
082    } finally {
083      if (!success && peer != null) {
084        // Reset peer state if refresh source failed
085        peer.setPeerState(oldState.equals(PeerState.ENABLED));
086      }
087      peerLock.unlock();
088    }
089  }
090
091  @Override
092  public void enablePeer(String peerId) throws ReplicationException, IOException {
093    refreshPeerState(peerId);
094  }
095
096  @Override
097  public void disablePeer(String peerId) throws ReplicationException, IOException {
098    refreshPeerState(peerId);
099  }
100
101  @Override
102  public void updatePeerConfig(String peerId) throws ReplicationException, IOException {
103    Lock peerLock = peersLock.acquireLock(peerId);
104    ReplicationPeers peers = replicationSourceManager.getReplicationPeers();
105    ReplicationPeerImpl peer = null;
106    ReplicationPeerConfig oldConfig = null;
107    PeerState oldState = null;
108    boolean success = false;
109    try {
110      peer = peers.getPeer(peerId);
111      if (peer == null) {
112        throw new ReplicationException("Peer with id=" + peerId + " is not cached.");
113      }
114      oldConfig = peer.getPeerConfig();
115      oldState = peer.getPeerState();
116      ReplicationPeerConfig newConfig = peers.refreshPeerConfig(peerId);
117      // also need to refresh peer state here. When updating a serial replication peer we may
118      // disable it first and then enable it.
119      PeerState newState = peers.refreshPeerState(peerId);
120      // RS need to start work with the new replication config change
121      if (
122        !ReplicationUtils.isNamespacesAndTableCFsEqual(oldConfig, newConfig)
123          || oldConfig.isSerial() != newConfig.isSerial()
124          || (oldState.equals(PeerState.ENABLED) && newState.equals(PeerState.DISABLED))
125      ) {
126        replicationSourceManager.refreshSources(peerId);
127      }
128      success = true;
129    } finally {
130      if (!success && peer != null) {
131        // Reset peer config if refresh source failed
132        peer.setPeerConfig(oldConfig);
133        peer.setPeerState(oldState.equals(PeerState.ENABLED));
134      }
135      peerLock.unlock();
136    }
137  }
138
139  @Override
140  public void claimReplicationQueue(ServerName crashedServer, String queue)
141    throws ReplicationException, IOException {
142    replicationSourceManager.claimQueue(crashedServer, queue);
143  }
144}