001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import java.io.IOException; 021import java.util.Collections; 022import java.util.Map; 023import java.util.Set; 024import java.util.concurrent.ConcurrentHashMap; 025import java.util.concurrent.ConcurrentMap; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.fs.FileSystem; 028import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; 029import org.apache.hadoop.hbase.conf.ConfigurationObserver; 030import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState; 031import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 032import org.apache.yetus.audience.InterfaceAudience; 033import org.slf4j.Logger; 034import org.slf4j.LoggerFactory; 035 036/** 037 * This provides an class for maintaining a set of peer clusters. These peers are remote slave 038 * clusters that data is replicated to. 039 * <p> 040 * We implement {@link ConfigurationObserver} mainly for recreating the 041 * {@link ReplicationPeerStorage}, so we can change the {@link ReplicationPeerStorage} without 042 * restarting the region server. 043 */ 044@InterfaceAudience.Private 045public class ReplicationPeers implements ConfigurationObserver { 046 047 private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeers.class); 048 049 private volatile Configuration conf; 050 051 // Map of peer clusters keyed by their id 052 private final ConcurrentMap<String, ReplicationPeerImpl> peerCache; 053 private final FileSystem fs; 054 private final ZKWatcher zookeeper; 055 private volatile ReplicationPeerStorage peerStorage; 056 057 ReplicationPeers(FileSystem fs, ZKWatcher zookeeper, Configuration conf) { 058 this.conf = conf; 059 this.fs = fs; 060 this.zookeeper = zookeeper; 061 this.peerCache = new ConcurrentHashMap<>(); 062 this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(fs, zookeeper, conf); 063 } 064 065 public Configuration getConf() { 066 return conf; 067 } 068 069 public void init() throws ReplicationException { 070 // Loading all existing peerIds into peer cache. 071 for (String peerId : this.peerStorage.listPeerIds()) { 072 addPeer(peerId); 073 } 074 } 075 076 public ReplicationPeerStorage getPeerStorage() { 077 return this.peerStorage; 078 } 079 080 /** 081 * Method called after a peer has been connected. It will create a ReplicationPeer to track the 082 * newly connected cluster. 083 * @param peerId a short that identifies the cluster 084 * @return whether a ReplicationPeer was successfully created 085 * @throws ReplicationException if connecting to the peer fails 086 */ 087 public boolean addPeer(String peerId) throws ReplicationException { 088 if (this.peerCache.containsKey(peerId)) { 089 return false; 090 } 091 092 peerCache.put(peerId, createPeer(peerId)); 093 return true; 094 } 095 096 public ReplicationPeerImpl removePeer(String peerId) { 097 return peerCache.remove(peerId); 098 } 099 100 /** 101 * Returns the ReplicationPeerImpl for the specified cached peer. This ReplicationPeer will 102 * continue to track changes to the Peer's state and config. This method returns null if no peer 103 * has been cached with the given peerId. 104 * @param peerId id for the peer 105 * @return ReplicationPeer object 106 */ 107 public ReplicationPeerImpl getPeer(String peerId) { 108 return peerCache.get(peerId); 109 } 110 111 /** 112 * Returns the set of peerIds of the clusters that have been connected and have an underlying 113 * ReplicationPeer. 114 * @return a Set of Strings for peerIds 115 */ 116 public Set<String> getAllPeerIds() { 117 return Collections.unmodifiableSet(peerCache.keySet()); 118 } 119 120 public Map<String, ReplicationPeerImpl> getPeerCache() { 121 return Collections.unmodifiableMap(peerCache); 122 } 123 124 public PeerState refreshPeerState(String peerId) throws ReplicationException { 125 ReplicationPeerImpl peer = peerCache.get(peerId); 126 peer.setPeerState(peerStorage.isPeerEnabled(peerId)); 127 return peer.getPeerState(); 128 } 129 130 public ReplicationPeerConfig refreshPeerConfig(String peerId) throws ReplicationException { 131 ReplicationPeerImpl peer = peerCache.get(peerId); 132 peer.setPeerConfig(peerStorage.getPeerConfig(peerId)); 133 return peer.getPeerConfig(); 134 } 135 136 public SyncReplicationState refreshPeerNewSyncReplicationState(String peerId) 137 throws ReplicationException { 138 ReplicationPeerImpl peer = peerCache.get(peerId); 139 SyncReplicationState newState = peerStorage.getPeerNewSyncReplicationState(peerId); 140 peer.setNewSyncReplicationState(newState); 141 return newState; 142 } 143 144 public void transitPeerSyncReplicationState(String peerId) { 145 ReplicationPeerImpl peer = peerCache.get(peerId); 146 peer.transitSyncReplicationState(); 147 } 148 149 /** 150 * Helper method to connect to a peer 151 * @param peerId peer's identifier 152 * @return object representing the peer 153 */ 154 private ReplicationPeerImpl createPeer(String peerId) throws ReplicationException { 155 ReplicationPeerConfig peerConfig = peerStorage.getPeerConfig(peerId); 156 boolean enabled = peerStorage.isPeerEnabled(peerId); 157 SyncReplicationState syncReplicationState = peerStorage.getPeerSyncReplicationState(peerId); 158 SyncReplicationState newSyncReplicationState = 159 peerStorage.getPeerNewSyncReplicationState(peerId); 160 Configuration peerClusterConf; 161 try { 162 peerClusterConf = ReplicationPeerConfigUtil.getPeerClusterConfiguration(conf, peerConfig); 163 } catch (IOException e) { 164 throw new ReplicationException( 165 "failed to apply cluster key to configuration for peer config " + peerConfig, e); 166 } 167 return new ReplicationPeerImpl(peerClusterConf, peerId, peerConfig, enabled, 168 syncReplicationState, newSyncReplicationState); 169 } 170 171 @Override 172 public void onConfigurationChange(Configuration conf) { 173 this.conf = conf; 174 this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(fs, zookeeper, conf); 175 for (ReplicationPeerImpl peer : peerCache.values()) { 176 try { 177 peer.onConfigurationChange( 178 ReplicationPeerConfigUtil.getPeerClusterConfiguration(conf, peer.getPeerConfig())); 179 } catch (IOException e) { 180 LOG.warn("failed to reload configuration for peer {}", peer.getId(), e); 181 } 182 } 183 } 184}