001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import java.util.ArrayList;
021import java.util.List;
022import java.util.Map;
023import java.util.Set;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.hbase.TableName;
026import org.apache.hadoop.hbase.conf.ConfigurationObserver;
027import org.apache.hadoop.hbase.util.Pair;
028import org.apache.yetus.audience.InterfaceAudience;
029
030@InterfaceAudience.Private
031public class ReplicationPeerImpl implements ReplicationPeer, ConfigurationObserver {
032
033  private volatile Configuration conf;
034
035  private final String id;
036
037  private volatile ReplicationPeerConfig peerConfig;
038
039  private volatile PeerState peerState;
040
041  // The lower 16 bits are the current sync replication state, the higher 16 bits are the new sync
042  // replication state. Embedded in one int so user can not get an inconsistency view of state and
043  // new state.
044  private volatile int syncReplicationStateBits;
045
046  private static final int SHIFT = 16;
047
048  private static final int AND_BITS = 0xFFFF;
049
050  private final List<ReplicationPeerConfigListener> peerConfigListeners;
051
052  /**
053   * Constructor that takes all the objects required to communicate with the specified peer, except
054   * for the region server addresses.
055   * @param conf       configuration object to this peer
056   * @param id         string representation of this peer's identifier
057   * @param peerConfig configuration for the replication peer
058   */
059  public ReplicationPeerImpl(Configuration conf, String id, ReplicationPeerConfig peerConfig,
060    boolean peerState, SyncReplicationState syncReplicationState,
061    SyncReplicationState newSyncReplicationState) {
062    this.conf = conf;
063    this.id = id;
064    setPeerState(peerState);
065    this.peerConfig = peerConfig;
066    this.syncReplicationStateBits =
067      syncReplicationState.value() | (newSyncReplicationState.value() << SHIFT);
068    this.peerConfigListeners = new ArrayList<>();
069  }
070
071  public void setPeerState(boolean enabled) {
072    this.peerState = enabled ? PeerState.ENABLED : PeerState.DISABLED;
073  }
074
075  public void setPeerConfig(ReplicationPeerConfig peerConfig) {
076    this.peerConfig = peerConfig;
077    peerConfigListeners.forEach(listener -> listener.peerConfigUpdated(peerConfig));
078  }
079
080  public void setNewSyncReplicationState(SyncReplicationState newState) {
081    this.syncReplicationStateBits =
082      (this.syncReplicationStateBits & AND_BITS) | (newState.value() << SHIFT);
083  }
084
085  public void transitSyncReplicationState() {
086    this.syncReplicationStateBits =
087      (this.syncReplicationStateBits >>> SHIFT) | (SyncReplicationState.NONE.value() << SHIFT);
088  }
089
090  /**
091   * Get the identifier of this peer
092   * @return string representation of the id (short)
093   */
094  @Override
095  public String getId() {
096    return id;
097  }
098
099  @Override
100  public PeerState getPeerState() {
101    return peerState;
102  }
103
104  private static SyncReplicationState getSyncReplicationState(int bits) {
105    return SyncReplicationState.valueOf(bits & AND_BITS);
106  }
107
108  private static SyncReplicationState getNewSyncReplicationState(int bits) {
109    return SyncReplicationState.valueOf(bits >>> SHIFT);
110  }
111
112  public Pair<SyncReplicationState, SyncReplicationState> getSyncReplicationStateAndNewState() {
113    int bits = this.syncReplicationStateBits;
114    return Pair.newPair(getSyncReplicationState(bits), getNewSyncReplicationState(bits));
115  }
116
117  public SyncReplicationState getNewSyncReplicationState() {
118    return getNewSyncReplicationState(syncReplicationStateBits);
119  }
120
121  @Override
122  public SyncReplicationState getSyncReplicationState() {
123    return getSyncReplicationState(syncReplicationStateBits);
124  }
125
126  @Override
127  public ReplicationPeerConfig getPeerConfig() {
128    return peerConfig;
129  }
130
131  @Override
132  public Configuration getConfiguration() {
133    return conf;
134  }
135
136  @Override
137  public Map<TableName, List<String>> getTableCFs() {
138    return this.peerConfig.getTableCFsMap();
139  }
140
141  @Override
142  public Set<String> getNamespaces() {
143    return this.peerConfig.getNamespaces();
144  }
145
146  @Override
147  public long getPeerBandwidth() {
148    return this.peerConfig.getBandwidth();
149  }
150
151  @Override
152  public void registerPeerConfigListener(ReplicationPeerConfigListener listener) {
153    this.peerConfigListeners.add(listener);
154  }
155
156  @Override
157  public void onConfigurationChange(Configuration conf) {
158    this.conf = conf;
159  }
160}