001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import java.util.Collections; 021import java.util.Map; 022import java.util.Set; 023import java.util.concurrent.ConcurrentHashMap; 024import java.util.concurrent.ConcurrentMap; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState; 027import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 028import org.apache.yetus.audience.InterfaceAudience; 029 030/** 031 * This provides an class for maintaining a set of peer clusters. These peers are remote slave 032 * clusters that data is replicated to. 033 */ 034@InterfaceAudience.Private 035public class ReplicationPeers { 036 037 private final Configuration conf; 038 039 // Map of peer clusters keyed by their id 040 private final ConcurrentMap<String, ReplicationPeerImpl> peerCache; 041 private final ReplicationPeerStorage peerStorage; 042 043 ReplicationPeers(ZKWatcher zookeeper, Configuration conf) { 044 this.conf = conf; 045 this.peerCache = new ConcurrentHashMap<>(); 046 this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(zookeeper, conf); 047 } 048 049 public Configuration getConf() { 050 return conf; 051 } 052 053 public void init() throws ReplicationException { 054 // Loading all existing peerIds into peer cache. 055 for (String peerId : this.peerStorage.listPeerIds()) { 056 addPeer(peerId); 057 } 058 } 059 060 public ReplicationPeerStorage getPeerStorage() { 061 return this.peerStorage; 062 } 063 064 /** 065 * Method called after a peer has been connected. It will create a ReplicationPeer to track the 066 * newly connected cluster. 067 * @param peerId a short that identifies the cluster 068 * @return whether a ReplicationPeer was successfully created 069 * @throws ReplicationException 070 */ 071 public boolean addPeer(String peerId) throws ReplicationException { 072 if (this.peerCache.containsKey(peerId)) { 073 return false; 074 } 075 076 peerCache.put(peerId, createPeer(peerId)); 077 return true; 078 } 079 080 public void removePeer(String peerId) { 081 peerCache.remove(peerId); 082 } 083 084 /** 085 * Returns the ReplicationPeerImpl for the specified cached peer. This ReplicationPeer will 086 * continue to track changes to the Peer's state and config. This method returns null if no peer 087 * has been cached with the given peerId. 088 * @param peerId id for the peer 089 * @return ReplicationPeer object 090 */ 091 public ReplicationPeerImpl getPeer(String peerId) { 092 return peerCache.get(peerId); 093 } 094 095 /** 096 * Returns the set of peerIds of the clusters that have been connected and have an underlying 097 * ReplicationPeer. 098 * @return a Set of Strings for peerIds 099 */ 100 public Set<String> getAllPeerIds() { 101 return Collections.unmodifiableSet(peerCache.keySet()); 102 } 103 104 public Map<String, ReplicationPeerImpl> getPeerCache() { 105 return Collections.unmodifiableMap(peerCache); 106 } 107 108 public PeerState refreshPeerState(String peerId) throws ReplicationException { 109 ReplicationPeerImpl peer = peerCache.get(peerId); 110 if (peer == null) { 111 throw new ReplicationException("Peer with id=" + peerId + " is not cached."); 112 } 113 peer.setPeerState(peerStorage.isPeerEnabled(peerId)); 114 return peer.getPeerState(); 115 } 116 117 public ReplicationPeerConfig refreshPeerConfig(String peerId) throws ReplicationException { 118 ReplicationPeerImpl peer = peerCache.get(peerId); 119 if (peer == null) { 120 throw new ReplicationException("Peer with id=" + peerId + " is not cached."); 121 } 122 peer.setPeerConfig(peerStorage.getPeerConfig(peerId)); 123 return peer.getPeerConfig(); 124 } 125 126 /** 127 * Helper method to connect to a peer 128 * @param peerId peer's identifier 129 * @return object representing the peer 130 */ 131 private ReplicationPeerImpl createPeer(String peerId) throws ReplicationException { 132 ReplicationPeerConfig peerConfig = peerStorage.getPeerConfig(peerId); 133 boolean enabled = peerStorage.isPeerEnabled(peerId); 134 return new ReplicationPeerImpl(ReplicationUtils.getPeerClusterConfiguration(peerConfig, conf), 135 peerId, enabled, peerConfig); 136 } 137}