001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import java.util.ArrayList;
021import java.util.List;
022import java.util.Map;
023import java.util.Set;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.hbase.TableName;
026import org.apache.hadoop.hbase.util.Pair;
027import org.apache.yetus.audience.InterfaceAudience;
028
029@InterfaceAudience.Private
030public class ReplicationPeerImpl implements ReplicationPeer {
031
032  private final Configuration conf;
033
034  private final String id;
035
036  private volatile ReplicationPeerConfig peerConfig;
037
038  private volatile PeerState peerState;
039
040  // The lower 16 bits are the current sync replication state, the higher 16 bits are the new sync
041  // replication state. Embedded in one int so user can not get an inconsistency view of state and
042  // new state.
043  private volatile int syncReplicationStateBits;
044
045  private static final int SHIFT = 16;
046
047  private static final int AND_BITS = 0xFFFF;
048
049  private final List<ReplicationPeerConfigListener> peerConfigListeners;
050
051  /**
052   * Constructor that takes all the objects required to communicate with the specified peer, except
053   * for the region server addresses.
054   * @param conf       configuration object to this peer
055   * @param id         string representation of this peer's identifier
056   * @param peerConfig configuration for the replication peer
057   */
058  public ReplicationPeerImpl(Configuration conf, String id, ReplicationPeerConfig peerConfig,
059    boolean peerState, SyncReplicationState syncReplicationState,
060    SyncReplicationState newSyncReplicationState) {
061    this.conf = conf;
062    this.id = id;
063    setPeerState(peerState);
064    this.peerConfig = peerConfig;
065    this.syncReplicationStateBits =
066      syncReplicationState.value() | (newSyncReplicationState.value() << SHIFT);
067    this.peerConfigListeners = new ArrayList<>();
068  }
069
070  public void setPeerState(boolean enabled) {
071    this.peerState = enabled ? PeerState.ENABLED : PeerState.DISABLED;
072  }
073
074  public void setPeerConfig(ReplicationPeerConfig peerConfig) {
075    this.peerConfig = peerConfig;
076    peerConfigListeners.forEach(listener -> listener.peerConfigUpdated(peerConfig));
077  }
078
079  public void setNewSyncReplicationState(SyncReplicationState newState) {
080    this.syncReplicationStateBits =
081      (this.syncReplicationStateBits & AND_BITS) | (newState.value() << SHIFT);
082  }
083
084  public void transitSyncReplicationState() {
085    this.syncReplicationStateBits =
086      (this.syncReplicationStateBits >>> SHIFT) | (SyncReplicationState.NONE.value() << SHIFT);
087  }
088
089  /**
090   * Get the identifier of this peer
091   * @return string representation of the id (short)
092   */
093  @Override
094  public String getId() {
095    return id;
096  }
097
098  @Override
099  public PeerState getPeerState() {
100    return peerState;
101  }
102
103  private static SyncReplicationState getSyncReplicationState(int bits) {
104    return SyncReplicationState.valueOf(bits & AND_BITS);
105  }
106
107  private static SyncReplicationState getNewSyncReplicationState(int bits) {
108    return SyncReplicationState.valueOf(bits >>> SHIFT);
109  }
110
111  public Pair<SyncReplicationState, SyncReplicationState> getSyncReplicationStateAndNewState() {
112    int bits = this.syncReplicationStateBits;
113    return Pair.newPair(getSyncReplicationState(bits), getNewSyncReplicationState(bits));
114  }
115
116  public SyncReplicationState getNewSyncReplicationState() {
117    return getNewSyncReplicationState(syncReplicationStateBits);
118  }
119
120  @Override
121  public SyncReplicationState getSyncReplicationState() {
122    return getSyncReplicationState(syncReplicationStateBits);
123  }
124
125  @Override
126  public ReplicationPeerConfig getPeerConfig() {
127    return peerConfig;
128  }
129
130  @Override
131  public Configuration getConfiguration() {
132    return conf;
133  }
134
135  @Override
136  public Map<TableName, List<String>> getTableCFs() {
137    return this.peerConfig.getTableCFsMap();
138  }
139
140  @Override
141  public Set<String> getNamespaces() {
142    return this.peerConfig.getNamespaces();
143  }
144
145  @Override
146  public long getPeerBandwidth() {
147    return this.peerConfig.getBandwidth();
148  }
149
150  @Override
151  public void registerPeerConfigListener(ReplicationPeerConfigListener listener) {
152    this.peerConfigListeners.add(listener);
153  }
154}