001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import java.io.IOException;
021import java.util.concurrent.locks.Lock;
022import org.apache.hadoop.hbase.replication.ReplicationException;
023import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
024import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
025import org.apache.hadoop.hbase.replication.ReplicationPeerImpl;
026import org.apache.hadoop.hbase.replication.ReplicationPeers;
027import org.apache.hadoop.hbase.replication.ReplicationUtils;
028import org.apache.hadoop.hbase.util.KeyLocker;
029import org.apache.yetus.audience.InterfaceAudience;
030
031@InterfaceAudience.Private
032public class PeerProcedureHandlerImpl implements PeerProcedureHandler {
033
034  private final ReplicationSourceManager replicationSourceManager;
035  private final KeyLocker<String> peersLock = new KeyLocker<>();
036
037  public PeerProcedureHandlerImpl(ReplicationSourceManager replicationSourceManager) {
038    this.replicationSourceManager = replicationSourceManager;
039  }
040
041  @Override
042  public void addPeer(String peerId) throws IOException {
043    Lock peerLock = peersLock.acquireLock(peerId);
044    try {
045      replicationSourceManager.addPeer(peerId);
046    } finally {
047      peerLock.unlock();
048    }
049  }
050
051  @Override
052  public void removePeer(String peerId) throws IOException {
053    Lock peerLock = peersLock.acquireLock(peerId);
054    try {
055      if (replicationSourceManager.getReplicationPeers().getPeer(peerId) != null) {
056        replicationSourceManager.removePeer(peerId);
057      }
058    } finally {
059      peerLock.unlock();
060    }
061  }
062
063  private void refreshPeerState(String peerId) throws ReplicationException, IOException {
064    PeerState newState;
065    Lock peerLock = peersLock.acquireLock(peerId);
066    ReplicationPeerImpl peer = null;
067    PeerState oldState = null;
068    boolean success = false;
069    try {
070      peer = replicationSourceManager.getReplicationPeers().getPeer(peerId);
071      if (peer == null) {
072        throw new ReplicationException("Peer with id=" + peerId + " is not cached.");
073      }
074      oldState = peer.getPeerState();
075      newState = replicationSourceManager.getReplicationPeers().refreshPeerState(peerId);
076      // RS need to start work with the new replication state change
077      if (oldState.equals(PeerState.ENABLED) && newState.equals(PeerState.DISABLED)) {
078        replicationSourceManager.refreshSources(peerId);
079      }
080      success = true;
081    } finally {
082      if (!success && peer != null) {
083        // Reset peer state if refresh source failed
084        peer.setPeerState(oldState.equals(PeerState.ENABLED));
085      }
086      peerLock.unlock();
087    }
088  }
089
090  @Override
091  public void enablePeer(String peerId) throws ReplicationException, IOException {
092    refreshPeerState(peerId);
093  }
094
095  @Override
096  public void disablePeer(String peerId) throws ReplicationException, IOException {
097    refreshPeerState(peerId);
098  }
099
100  @Override
101  public void updatePeerConfig(String peerId) throws ReplicationException, IOException {
102    Lock peerLock = peersLock.acquireLock(peerId);
103    ReplicationPeers peers = replicationSourceManager.getReplicationPeers();
104    ReplicationPeerImpl peer = null;
105    ReplicationPeerConfig oldConfig = null;
106    PeerState oldState = null;
107    boolean success = false;
108    try {
109      peer = peers.getPeer(peerId);
110      if (peer == null) {
111        throw new ReplicationException("Peer with id=" + peerId + " is not cached.");
112      }
113      oldConfig = peer.getPeerConfig();
114      oldState = peer.getPeerState();
115      ReplicationPeerConfig newConfig = peers.refreshPeerConfig(peerId);
116      // also need to refresh peer state here. When updating a serial replication peer we may
117      // disable it first and then enable it.
118      PeerState newState = peers.refreshPeerState(peerId);
119      // RS need to start work with the new replication config change
120      if (!ReplicationUtils.isNamespacesAndTableCFsEqual(oldConfig, newConfig) ||
121        oldConfig.isSerial() != newConfig.isSerial() ||
122        (oldState.equals(PeerState.ENABLED) && newState.equals(PeerState.DISABLED))) {
123        replicationSourceManager.refreshSources(peerId);
124      }
125      success = true;
126    } finally {
127      if (!success && peer != null) {
128        // Reset peer config if refresh source failed
129        peer.setPeerConfig(oldConfig);
130        peer.setPeerState(oldState.equals(PeerState.ENABLED));
131      }
132      peerLock.unlock();
133    }
134  }
135}