001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import java.util.Collections; 021import java.util.Map; 022import java.util.Set; 023import java.util.concurrent.ConcurrentHashMap; 024import java.util.concurrent.ConcurrentMap; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState; 027import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 028import org.apache.yetus.audience.InterfaceAudience; 029 030/** 031 * This provides an class for maintaining a set of peer clusters. These peers are remote slave 032 * clusters that data is replicated to. 033 */ 034@InterfaceAudience.Private 035public class ReplicationPeers { 036 037 private final Configuration conf; 038 039 // Map of peer clusters keyed by their id 040 private final ConcurrentMap<String, ReplicationPeerImpl> peerCache; 041 private final ReplicationPeerStorage peerStorage; 042 043 ReplicationPeers(ZKWatcher zookeeper, Configuration conf) { 044 this.conf = conf; 045 this.peerCache = new ConcurrentHashMap<>(); 046 this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(zookeeper, conf); 047 } 048 049 public Configuration getConf() { 050 return conf; 051 } 052 053 public void init() throws ReplicationException { 054 // Loading all existing peerIds into peer cache. 055 for (String peerId : this.peerStorage.listPeerIds()) { 056 addPeer(peerId); 057 } 058 } 059 060 public ReplicationPeerStorage getPeerStorage() { 061 return this.peerStorage; 062 } 063 064 /** 065 * Method called after a peer has been connected. It will create a ReplicationPeer to track the 066 * newly connected cluster. 067 * @param peerId a short that identifies the cluster 068 * @return whether a ReplicationPeer was successfully created n 069 */ 070 public boolean addPeer(String peerId) throws ReplicationException { 071 if (this.peerCache.containsKey(peerId)) { 072 return false; 073 } 074 075 peerCache.put(peerId, createPeer(peerId)); 076 return true; 077 } 078 079 public void removePeer(String peerId) { 080 peerCache.remove(peerId); 081 } 082 083 /** 084 * Returns the ReplicationPeerImpl for the specified cached peer. This ReplicationPeer will 085 * continue to track changes to the Peer's state and config. This method returns null if no peer 086 * has been cached with the given peerId. 087 * @param peerId id for the peer 088 * @return ReplicationPeer object 089 */ 090 public ReplicationPeerImpl getPeer(String peerId) { 091 return peerCache.get(peerId); 092 } 093 094 /** 095 * Returns the set of peerIds of the clusters that have been connected and have an underlying 096 * ReplicationPeer. 097 * @return a Set of Strings for peerIds 098 */ 099 public Set<String> getAllPeerIds() { 100 return Collections.unmodifiableSet(peerCache.keySet()); 101 } 102 103 public Map<String, ReplicationPeerImpl> getPeerCache() { 104 return Collections.unmodifiableMap(peerCache); 105 } 106 107 public PeerState refreshPeerState(String peerId) throws ReplicationException { 108 ReplicationPeerImpl peer = peerCache.get(peerId); 109 if (peer == null) { 110 throw new ReplicationException("Peer with id=" + peerId + " is not cached."); 111 } 112 peer.setPeerState(peerStorage.isPeerEnabled(peerId)); 113 return peer.getPeerState(); 114 } 115 116 public ReplicationPeerConfig refreshPeerConfig(String peerId) throws ReplicationException { 117 ReplicationPeerImpl peer = peerCache.get(peerId); 118 if (peer == null) { 119 throw new ReplicationException("Peer with id=" + peerId + " is not cached."); 120 } 121 peer.setPeerConfig(peerStorage.getPeerConfig(peerId)); 122 return peer.getPeerConfig(); 123 } 124 125 /** 126 * Helper method to connect to a peer 127 * @param peerId peer's identifier 128 * @return object representing the peer 129 */ 130 private ReplicationPeerImpl createPeer(String peerId) throws ReplicationException { 131 ReplicationPeerConfig peerConfig = peerStorage.getPeerConfig(peerId); 132 boolean enabled = peerStorage.isPeerEnabled(peerId); 133 return new ReplicationPeerImpl(ReplicationUtils.getPeerClusterConfiguration(peerConfig, conf), 134 peerId, enabled, peerConfig); 135 } 136}