001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import java.io.IOException; 021import java.util.concurrent.locks.Lock; 022import org.apache.hadoop.hbase.replication.ReplicationException; 023import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState; 024import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 025import org.apache.hadoop.hbase.replication.ReplicationPeerImpl; 026import org.apache.hadoop.hbase.replication.ReplicationPeers; 027import org.apache.hadoop.hbase.replication.ReplicationUtils; 028import org.apache.hadoop.hbase.util.KeyLocker; 029import org.apache.yetus.audience.InterfaceAudience; 030 031@InterfaceAudience.Private 032public class PeerProcedureHandlerImpl implements PeerProcedureHandler { 033 034 private final ReplicationSourceManager replicationSourceManager; 035 private final KeyLocker<String> peersLock = new KeyLocker<>(); 036 037 public PeerProcedureHandlerImpl(ReplicationSourceManager replicationSourceManager) { 038 this.replicationSourceManager = replicationSourceManager; 039 } 040 041 @Override 042 public void addPeer(String peerId) throws IOException { 043 Lock peerLock = peersLock.acquireLock(peerId); 044 try { 045 replicationSourceManager.addPeer(peerId); 046 } finally { 047 peerLock.unlock(); 048 } 049 } 050 051 @Override 052 public void removePeer(String peerId) throws IOException { 053 Lock peerLock = peersLock.acquireLock(peerId); 054 try { 055 if (replicationSourceManager.getReplicationPeers().getPeer(peerId) != null) { 056 replicationSourceManager.removePeer(peerId); 057 } 058 } finally { 059 peerLock.unlock(); 060 } 061 } 062 063 private void refreshPeerState(String peerId) throws ReplicationException, IOException { 064 PeerState newState; 065 Lock peerLock = peersLock.acquireLock(peerId); 066 ReplicationPeerImpl peer = null; 067 PeerState oldState = null; 068 boolean success = false; 069 try { 070 peer = replicationSourceManager.getReplicationPeers().getPeer(peerId); 071 if (peer == null) { 072 throw new ReplicationException("Peer with id=" + peerId + " is not cached."); 073 } 074 oldState = peer.getPeerState(); 075 newState = replicationSourceManager.getReplicationPeers().refreshPeerState(peerId); 076 // RS need to start work with the new replication state change 077 if (oldState.equals(PeerState.ENABLED) && newState.equals(PeerState.DISABLED)) { 078 replicationSourceManager.refreshSources(peerId); 079 } 080 success = true; 081 } finally { 082 if (!success && peer != null) { 083 // Reset peer state if refresh source failed 084 peer.setPeerState(oldState.equals(PeerState.ENABLED)); 085 } 086 peerLock.unlock(); 087 } 088 } 089 090 @Override 091 public void enablePeer(String peerId) throws ReplicationException, IOException { 092 refreshPeerState(peerId); 093 } 094 095 @Override 096 public void disablePeer(String peerId) throws ReplicationException, IOException { 097 refreshPeerState(peerId); 098 } 099 100 @Override 101 public void updatePeerConfig(String peerId) throws ReplicationException, IOException { 102 Lock peerLock = peersLock.acquireLock(peerId); 103 ReplicationPeers peers = replicationSourceManager.getReplicationPeers(); 104 ReplicationPeerImpl peer = null; 105 ReplicationPeerConfig oldConfig = null; 106 PeerState oldState = null; 107 boolean success = false; 108 try { 109 peer = peers.getPeer(peerId); 110 if (peer == null) { 111 throw new ReplicationException("Peer with id=" + peerId + " is not cached."); 112 } 113 oldConfig = peer.getPeerConfig(); 114 oldState = peer.getPeerState(); 115 ReplicationPeerConfig newConfig = peers.refreshPeerConfig(peerId); 116 // also need to refresh peer state here. When updating a serial replication peer we may 117 // disable it first and then enable it. 118 PeerState newState = peers.refreshPeerState(peerId); 119 // RS need to start work with the new replication config change 120 if (!ReplicationUtils.isNamespacesAndTableCFsEqual(oldConfig, newConfig) || 121 oldConfig.isSerial() != newConfig.isSerial() || 122 (oldState.equals(PeerState.ENABLED) && newState.equals(PeerState.DISABLED))) { 123 replicationSourceManager.refreshSources(peerId); 124 } 125 success = true; 126 } finally { 127 if (!success && peer != null) { 128 // Reset peer config if refresh source failed 129 peer.setPeerConfig(oldConfig); 130 peer.setPeerState(oldState.equals(PeerState.ENABLED)); 131 } 132 peerLock.unlock(); 133 } 134 } 135}