001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import java.util.Collections; 021import java.util.Map; 022import java.util.Set; 023import java.util.concurrent.ConcurrentHashMap; 024import java.util.concurrent.ConcurrentMap; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState; 027import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 028import org.apache.yetus.audience.InterfaceAudience; 029 030import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 031 032/** 033 * This provides an class for maintaining a set of peer clusters. These peers are remote slave 034 * clusters that data is replicated to. 035 */ 036@InterfaceAudience.Private 037public class ReplicationPeers { 038 039 private final Configuration conf; 040 041 // Map of peer clusters keyed by their id 042 private final ConcurrentMap<String, ReplicationPeerImpl> peerCache; 043 private final ReplicationPeerStorage peerStorage; 044 045 ReplicationPeers(ZKWatcher zookeeper, Configuration conf) { 046 this.conf = conf; 047 this.peerCache = new ConcurrentHashMap<>(); 048 this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(zookeeper, conf); 049 } 050 051 public Configuration getConf() { 052 return conf; 053 } 054 055 public void init() throws ReplicationException { 056 // Loading all existing peerIds into peer cache. 057 for (String peerId : this.peerStorage.listPeerIds()) { 058 addPeer(peerId); 059 } 060 } 061 062 @VisibleForTesting 063 public ReplicationPeerStorage getPeerStorage() { 064 return this.peerStorage; 065 } 066 067 /** 068 * Method called after a peer has been connected. It will create a ReplicationPeer to track the 069 * newly connected cluster. 070 * @param peerId a short that identifies the cluster 071 * @return whether a ReplicationPeer was successfully created 072 * @throws ReplicationException 073 */ 074 public boolean addPeer(String peerId) throws ReplicationException { 075 if (this.peerCache.containsKey(peerId)) { 076 return false; 077 } 078 079 peerCache.put(peerId, createPeer(peerId)); 080 return true; 081 } 082 083 public void removePeer(String peerId) { 084 peerCache.remove(peerId); 085 } 086 087 /** 088 * Returns the ReplicationPeerImpl for the specified cached peer. This ReplicationPeer will 089 * continue to track changes to the Peer's state and config. This method returns null if no peer 090 * has been cached with the given peerId. 091 * @param peerId id for the peer 092 * @return ReplicationPeer object 093 */ 094 public ReplicationPeerImpl getPeer(String peerId) { 095 return peerCache.get(peerId); 096 } 097 098 /** 099 * Returns the set of peerIds of the clusters that have been connected and have an underlying 100 * ReplicationPeer. 101 * @return a Set of Strings for peerIds 102 */ 103 public Set<String> getAllPeerIds() { 104 return Collections.unmodifiableSet(peerCache.keySet()); 105 } 106 107 public Map<String, ReplicationPeerImpl> getPeerCache() { 108 return Collections.unmodifiableMap(peerCache); 109 } 110 111 public PeerState refreshPeerState(String peerId) throws ReplicationException { 112 ReplicationPeerImpl peer = peerCache.get(peerId); 113 if (peer == null) { 114 throw new ReplicationException("Peer with id=" + peerId + " is not cached."); 115 } 116 peer.setPeerState(peerStorage.isPeerEnabled(peerId)); 117 return peer.getPeerState(); 118 } 119 120 public ReplicationPeerConfig refreshPeerConfig(String peerId) throws ReplicationException { 121 ReplicationPeerImpl peer = peerCache.get(peerId); 122 if (peer == null) { 123 throw new ReplicationException("Peer with id=" + peerId + " is not cached."); 124 } 125 peer.setPeerConfig(peerStorage.getPeerConfig(peerId)); 126 return peer.getPeerConfig(); 127 } 128 129 /** 130 * Helper method to connect to a peer 131 * @param peerId peer's identifier 132 * @return object representing the peer 133 */ 134 private ReplicationPeerImpl createPeer(String peerId) throws ReplicationException { 135 ReplicationPeerConfig peerConfig = peerStorage.getPeerConfig(peerId); 136 boolean enabled = peerStorage.isPeerEnabled(peerId); 137 return new ReplicationPeerImpl(ReplicationUtils.getPeerClusterConfiguration(peerConfig, conf), 138 peerId, enabled, peerConfig); 139 } 140}