001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import java.util.Collections; 021import java.util.Map; 022import java.util.Set; 023import java.util.concurrent.ConcurrentHashMap; 024import java.util.concurrent.ConcurrentMap; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.fs.FileSystem; 027import org.apache.hadoop.hbase.conf.ConfigurationObserver; 028import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState; 029import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 030import org.apache.yetus.audience.InterfaceAudience; 031import org.slf4j.Logger; 032import org.slf4j.LoggerFactory; 033 034/** 035 * This provides an class for maintaining a set of peer clusters. These peers are remote slave 036 * clusters that data is replicated to. 037 * <p> 038 * We implement {@link ConfigurationObserver} mainly for recreating the 039 * {@link ReplicationPeerStorage}, so we can change the {@link ReplicationPeerStorage} without 040 * restarting the region server. 041 */ 042@InterfaceAudience.Private 043public class ReplicationPeers implements ConfigurationObserver { 044 045 private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeers.class); 046 047 private volatile Configuration conf; 048 049 // Map of peer clusters keyed by their id 050 private final ConcurrentMap<String, ReplicationPeerImpl> peerCache; 051 private final FileSystem fs; 052 private final ZKWatcher zookeeper; 053 private volatile ReplicationPeerStorage peerStorage; 054 055 ReplicationPeers(FileSystem fs, ZKWatcher zookeeper, Configuration conf) { 056 this.conf = conf; 057 this.fs = fs; 058 this.zookeeper = zookeeper; 059 this.peerCache = new ConcurrentHashMap<>(); 060 this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(fs, zookeeper, conf); 061 } 062 063 public Configuration getConf() { 064 return conf; 065 } 066 067 public void init() throws ReplicationException { 068 // Loading all existing peerIds into peer cache. 069 for (String peerId : this.peerStorage.listPeerIds()) { 070 addPeer(peerId); 071 } 072 } 073 074 public ReplicationPeerStorage getPeerStorage() { 075 return this.peerStorage; 076 } 077 078 /** 079 * Method called after a peer has been connected. It will create a ReplicationPeer to track the 080 * newly connected cluster. 081 * @param peerId a short that identifies the cluster 082 * @return whether a ReplicationPeer was successfully created 083 * @throws ReplicationException if connecting to the peer fails 084 */ 085 public boolean addPeer(String peerId) throws ReplicationException { 086 if (this.peerCache.containsKey(peerId)) { 087 return false; 088 } 089 090 peerCache.put(peerId, createPeer(peerId)); 091 return true; 092 } 093 094 public ReplicationPeerImpl removePeer(String peerId) { 095 return peerCache.remove(peerId); 096 } 097 098 /** 099 * Returns the ReplicationPeerImpl for the specified cached peer. This ReplicationPeer will 100 * continue to track changes to the Peer's state and config. This method returns null if no peer 101 * has been cached with the given peerId. 102 * @param peerId id for the peer 103 * @return ReplicationPeer object 104 */ 105 public ReplicationPeerImpl getPeer(String peerId) { 106 return peerCache.get(peerId); 107 } 108 109 /** 110 * Returns the set of peerIds of the clusters that have been connected and have an underlying 111 * ReplicationPeer. 112 * @return a Set of Strings for peerIds 113 */ 114 public Set<String> getAllPeerIds() { 115 return Collections.unmodifiableSet(peerCache.keySet()); 116 } 117 118 public Map<String, ReplicationPeerImpl> getPeerCache() { 119 return Collections.unmodifiableMap(peerCache); 120 } 121 122 public PeerState refreshPeerState(String peerId) throws ReplicationException { 123 ReplicationPeerImpl peer = peerCache.get(peerId); 124 peer.setPeerState(peerStorage.isPeerEnabled(peerId)); 125 return peer.getPeerState(); 126 } 127 128 public ReplicationPeerConfig refreshPeerConfig(String peerId) throws ReplicationException { 129 ReplicationPeerImpl peer = peerCache.get(peerId); 130 peer.setPeerConfig(peerStorage.getPeerConfig(peerId)); 131 return peer.getPeerConfig(); 132 } 133 134 public SyncReplicationState refreshPeerNewSyncReplicationState(String peerId) 135 throws ReplicationException { 136 ReplicationPeerImpl peer = peerCache.get(peerId); 137 SyncReplicationState newState = peerStorage.getPeerNewSyncReplicationState(peerId); 138 peer.setNewSyncReplicationState(newState); 139 return newState; 140 } 141 142 public void transitPeerSyncReplicationState(String peerId) { 143 ReplicationPeerImpl peer = peerCache.get(peerId); 144 peer.transitSyncReplicationState(); 145 } 146 147 /** 148 * Helper method to connect to a peer 149 * @param peerId peer's identifier 150 * @return object representing the peer 151 */ 152 private ReplicationPeerImpl createPeer(String peerId) throws ReplicationException { 153 ReplicationPeerConfig peerConfig = peerStorage.getPeerConfig(peerId); 154 boolean enabled = peerStorage.isPeerEnabled(peerId); 155 SyncReplicationState syncReplicationState = peerStorage.getPeerSyncReplicationState(peerId); 156 SyncReplicationState newSyncReplicationState = 157 peerStorage.getPeerNewSyncReplicationState(peerId); 158 return new ReplicationPeerImpl(ReplicationUtils.getPeerClusterConfiguration(peerConfig, conf), 159 peerId, peerConfig, enabled, syncReplicationState, newSyncReplicationState); 160 } 161 162 @Override 163 public void onConfigurationChange(Configuration conf) { 164 this.conf = conf; 165 this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(fs, zookeeper, conf); 166 for (ReplicationPeerImpl peer : peerCache.values()) { 167 try { 168 peer.onConfigurationChange( 169 ReplicationUtils.getPeerClusterConfiguration(peer.getPeerConfig(), conf)); 170 } catch (ReplicationException e) { 171 LOG.warn("failed to reload configuration for peer {}", peer.getId(), e); 172 } 173 } 174 } 175}