001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import java.io.IOException;
021import java.util.Collections;
022import java.util.Map;
023import java.util.Set;
024import java.util.concurrent.ConcurrentHashMap;
025import java.util.concurrent.ConcurrentMap;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.fs.FileSystem;
028import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
029import org.apache.hadoop.hbase.conf.ConfigurationObserver;
030import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
031import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
032import org.apache.yetus.audience.InterfaceAudience;
033import org.slf4j.Logger;
034import org.slf4j.LoggerFactory;
035
036/**
037 * This provides an class for maintaining a set of peer clusters. These peers are remote slave
038 * clusters that data is replicated to.
039 * <p>
040 * We implement {@link ConfigurationObserver} mainly for recreating the
041 * {@link ReplicationPeerStorage}, so we can change the {@link ReplicationPeerStorage} without
042 * restarting the region server.
043 */
044@InterfaceAudience.Private
045public class ReplicationPeers implements ConfigurationObserver {
046
047  private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeers.class);
048
049  private volatile Configuration conf;
050
051  // Map of peer clusters keyed by their id
052  private final ConcurrentMap<String, ReplicationPeerImpl> peerCache;
053  private final FileSystem fs;
054  private final ZKWatcher zookeeper;
055  private volatile ReplicationPeerStorage peerStorage;
056
057  ReplicationPeers(FileSystem fs, ZKWatcher zookeeper, Configuration conf) {
058    this.conf = conf;
059    this.fs = fs;
060    this.zookeeper = zookeeper;
061    this.peerCache = new ConcurrentHashMap<>();
062    this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(fs, zookeeper, conf);
063  }
064
065  public Configuration getConf() {
066    return conf;
067  }
068
069  public void init() throws ReplicationException {
070    // Loading all existing peerIds into peer cache.
071    for (String peerId : this.peerStorage.listPeerIds()) {
072      addPeer(peerId);
073    }
074  }
075
076  public ReplicationPeerStorage getPeerStorage() {
077    return this.peerStorage;
078  }
079
080  /**
081   * Method called after a peer has been connected. It will create a ReplicationPeer to track the
082   * newly connected cluster.
083   * @param peerId a short that identifies the cluster
084   * @return whether a ReplicationPeer was successfully created
085   * @throws ReplicationException if connecting to the peer fails
086   */
087  public boolean addPeer(String peerId) throws ReplicationException {
088    if (this.peerCache.containsKey(peerId)) {
089      return false;
090    }
091
092    peerCache.put(peerId, createPeer(peerId));
093    return true;
094  }
095
096  public ReplicationPeerImpl removePeer(String peerId) {
097    return peerCache.remove(peerId);
098  }
099
100  /**
101   * Returns the ReplicationPeerImpl for the specified cached peer. This ReplicationPeer will
102   * continue to track changes to the Peer's state and config. This method returns null if no peer
103   * has been cached with the given peerId.
104   * @param peerId id for the peer
105   * @return ReplicationPeer object
106   */
107  public ReplicationPeerImpl getPeer(String peerId) {
108    return peerCache.get(peerId);
109  }
110
111  /**
112   * Returns the set of peerIds of the clusters that have been connected and have an underlying
113   * ReplicationPeer.
114   * @return a Set of Strings for peerIds
115   */
116  public Set<String> getAllPeerIds() {
117    return Collections.unmodifiableSet(peerCache.keySet());
118  }
119
120  public Map<String, ReplicationPeerImpl> getPeerCache() {
121    return Collections.unmodifiableMap(peerCache);
122  }
123
124  public PeerState refreshPeerState(String peerId) throws ReplicationException {
125    ReplicationPeerImpl peer = peerCache.get(peerId);
126    peer.setPeerState(peerStorage.isPeerEnabled(peerId));
127    return peer.getPeerState();
128  }
129
130  public ReplicationPeerConfig refreshPeerConfig(String peerId) throws ReplicationException {
131    ReplicationPeerImpl peer = peerCache.get(peerId);
132    peer.setPeerConfig(peerStorage.getPeerConfig(peerId));
133    return peer.getPeerConfig();
134  }
135
136  public SyncReplicationState refreshPeerNewSyncReplicationState(String peerId)
137    throws ReplicationException {
138    ReplicationPeerImpl peer = peerCache.get(peerId);
139    SyncReplicationState newState = peerStorage.getPeerNewSyncReplicationState(peerId);
140    peer.setNewSyncReplicationState(newState);
141    return newState;
142  }
143
144  public void transitPeerSyncReplicationState(String peerId) {
145    ReplicationPeerImpl peer = peerCache.get(peerId);
146    peer.transitSyncReplicationState();
147  }
148
149  /**
150   * Helper method to connect to a peer
151   * @param peerId peer's identifier
152   * @return object representing the peer
153   */
154  private ReplicationPeerImpl createPeer(String peerId) throws ReplicationException {
155    ReplicationPeerConfig peerConfig = peerStorage.getPeerConfig(peerId);
156    boolean enabled = peerStorage.isPeerEnabled(peerId);
157    SyncReplicationState syncReplicationState = peerStorage.getPeerSyncReplicationState(peerId);
158    SyncReplicationState newSyncReplicationState =
159      peerStorage.getPeerNewSyncReplicationState(peerId);
160    Configuration peerClusterConf;
161    try {
162      peerClusterConf = ReplicationPeerConfigUtil.getPeerClusterConfiguration(conf, peerConfig);
163    } catch (IOException e) {
164      throw new ReplicationException(
165        "failed to apply cluster key to configuration for peer config " + peerConfig, e);
166    }
167    return new ReplicationPeerImpl(peerClusterConf, peerId, peerConfig, enabled,
168      syncReplicationState, newSyncReplicationState);
169  }
170
171  @Override
172  public void onConfigurationChange(Configuration conf) {
173    this.conf = conf;
174    this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(fs, zookeeper, conf);
175    for (ReplicationPeerImpl peer : peerCache.values()) {
176      try {
177        peer.onConfigurationChange(
178          ReplicationPeerConfigUtil.getPeerClusterConfiguration(conf, peer.getPeerConfig()));
179      } catch (IOException e) {
180        LOG.warn("failed to reload configuration for peer {}", peer.getId(), e);
181      }
182    }
183  }
184}