001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import java.util.Collections;
021import java.util.Map;
022import java.util.Set;
023import java.util.concurrent.ConcurrentHashMap;
024import java.util.concurrent.ConcurrentMap;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.fs.FileSystem;
027import org.apache.hadoop.hbase.conf.ConfigurationObserver;
028import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
029import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
030import org.apache.yetus.audience.InterfaceAudience;
031import org.slf4j.Logger;
032import org.slf4j.LoggerFactory;
033
034/**
035 * This provides an class for maintaining a set of peer clusters. These peers are remote slave
036 * clusters that data is replicated to.
037 * <p>
038 * We implement {@link ConfigurationObserver} mainly for recreating the
039 * {@link ReplicationPeerStorage}, so we can change the {@link ReplicationPeerStorage} without
040 * restarting the region server.
041 */
042@InterfaceAudience.Private
043public class ReplicationPeers implements ConfigurationObserver {
044
045  private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeers.class);
046
047  private volatile Configuration conf;
048
049  // Map of peer clusters keyed by their id
050  private final ConcurrentMap<String, ReplicationPeerImpl> peerCache;
051  private final FileSystem fs;
052  private final ZKWatcher zookeeper;
053  private volatile ReplicationPeerStorage peerStorage;
054
055  ReplicationPeers(FileSystem fs, ZKWatcher zookeeper, Configuration conf) {
056    this.conf = conf;
057    this.fs = fs;
058    this.zookeeper = zookeeper;
059    this.peerCache = new ConcurrentHashMap<>();
060    this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(fs, zookeeper, conf);
061  }
062
063  public Configuration getConf() {
064    return conf;
065  }
066
067  public void init() throws ReplicationException {
068    // Loading all existing peerIds into peer cache.
069    for (String peerId : this.peerStorage.listPeerIds()) {
070      addPeer(peerId);
071    }
072  }
073
074  public ReplicationPeerStorage getPeerStorage() {
075    return this.peerStorage;
076  }
077
078  /**
079   * Method called after a peer has been connected. It will create a ReplicationPeer to track the
080   * newly connected cluster.
081   * @param peerId a short that identifies the cluster
082   * @return whether a ReplicationPeer was successfully created
083   * @throws ReplicationException if connecting to the peer fails
084   */
085  public boolean addPeer(String peerId) throws ReplicationException {
086    if (this.peerCache.containsKey(peerId)) {
087      return false;
088    }
089
090    peerCache.put(peerId, createPeer(peerId));
091    return true;
092  }
093
094  public ReplicationPeerImpl removePeer(String peerId) {
095    return peerCache.remove(peerId);
096  }
097
098  /**
099   * Returns the ReplicationPeerImpl for the specified cached peer. This ReplicationPeer will
100   * continue to track changes to the Peer's state and config. This method returns null if no peer
101   * has been cached with the given peerId.
102   * @param peerId id for the peer
103   * @return ReplicationPeer object
104   */
105  public ReplicationPeerImpl getPeer(String peerId) {
106    return peerCache.get(peerId);
107  }
108
109  /**
110   * Returns the set of peerIds of the clusters that have been connected and have an underlying
111   * ReplicationPeer.
112   * @return a Set of Strings for peerIds
113   */
114  public Set<String> getAllPeerIds() {
115    return Collections.unmodifiableSet(peerCache.keySet());
116  }
117
118  public Map<String, ReplicationPeerImpl> getPeerCache() {
119    return Collections.unmodifiableMap(peerCache);
120  }
121
122  public PeerState refreshPeerState(String peerId) throws ReplicationException {
123    ReplicationPeerImpl peer = peerCache.get(peerId);
124    peer.setPeerState(peerStorage.isPeerEnabled(peerId));
125    return peer.getPeerState();
126  }
127
128  public ReplicationPeerConfig refreshPeerConfig(String peerId) throws ReplicationException {
129    ReplicationPeerImpl peer = peerCache.get(peerId);
130    peer.setPeerConfig(peerStorage.getPeerConfig(peerId));
131    return peer.getPeerConfig();
132  }
133
134  public SyncReplicationState refreshPeerNewSyncReplicationState(String peerId)
135    throws ReplicationException {
136    ReplicationPeerImpl peer = peerCache.get(peerId);
137    SyncReplicationState newState = peerStorage.getPeerNewSyncReplicationState(peerId);
138    peer.setNewSyncReplicationState(newState);
139    return newState;
140  }
141
142  public void transitPeerSyncReplicationState(String peerId) {
143    ReplicationPeerImpl peer = peerCache.get(peerId);
144    peer.transitSyncReplicationState();
145  }
146
147  /**
148   * Helper method to connect to a peer
149   * @param peerId peer's identifier
150   * @return object representing the peer
151   */
152  private ReplicationPeerImpl createPeer(String peerId) throws ReplicationException {
153    ReplicationPeerConfig peerConfig = peerStorage.getPeerConfig(peerId);
154    boolean enabled = peerStorage.isPeerEnabled(peerId);
155    SyncReplicationState syncReplicationState = peerStorage.getPeerSyncReplicationState(peerId);
156    SyncReplicationState newSyncReplicationState =
157      peerStorage.getPeerNewSyncReplicationState(peerId);
158    return new ReplicationPeerImpl(ReplicationUtils.getPeerClusterConfiguration(peerConfig, conf),
159      peerId, peerConfig, enabled, syncReplicationState, newSyncReplicationState);
160  }
161
162  @Override
163  public void onConfigurationChange(Configuration conf) {
164    this.conf = conf;
165    this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(fs, zookeeper, conf);
166    for (ReplicationPeerImpl peer : peerCache.values()) {
167      try {
168        peer.onConfigurationChange(
169          ReplicationUtils.getPeerClusterConfiguration(peer.getPeerConfig(), conf));
170      } catch (ReplicationException e) {
171        LOG.warn("failed to reload configuration for peer {}", peer.getId(), e);
172      }
173    }
174  }
175}