001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import java.util.ArrayList; 021import java.util.List; 022import java.util.Map; 023import java.util.Set; 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.hbase.TableName; 026import org.apache.hadoop.hbase.conf.ConfigurationObserver; 027import org.apache.hadoop.hbase.util.Pair; 028import org.apache.yetus.audience.InterfaceAudience; 029 030@InterfaceAudience.Private 031public class ReplicationPeerImpl implements ReplicationPeer, ConfigurationObserver { 032 033 private volatile Configuration conf; 034 035 private final String id; 036 037 private volatile ReplicationPeerConfig peerConfig; 038 039 private volatile PeerState peerState; 040 041 // The lower 16 bits are the current sync replication state, the higher 16 bits are the new sync 042 // replication state. Embedded in one int so user can not get an inconsistency view of state and 043 // new state. 044 private volatile int syncReplicationStateBits; 045 046 private static final int SHIFT = 16; 047 048 private static final int AND_BITS = 0xFFFF; 049 050 private final List<ReplicationPeerConfigListener> peerConfigListeners; 051 052 /** 053 * Constructor that takes all the objects required to communicate with the specified peer, except 054 * for the region server addresses. 055 * @param conf configuration object to this peer 056 * @param id string representation of this peer's identifier 057 * @param peerConfig configuration for the replication peer 058 */ 059 public ReplicationPeerImpl(Configuration conf, String id, ReplicationPeerConfig peerConfig, 060 boolean peerState, SyncReplicationState syncReplicationState, 061 SyncReplicationState newSyncReplicationState) { 062 this.conf = conf; 063 this.id = id; 064 setPeerState(peerState); 065 this.peerConfig = peerConfig; 066 this.syncReplicationStateBits = 067 syncReplicationState.value() | (newSyncReplicationState.value() << SHIFT); 068 this.peerConfigListeners = new ArrayList<>(); 069 } 070 071 public void setPeerState(boolean enabled) { 072 this.peerState = enabled ? PeerState.ENABLED : PeerState.DISABLED; 073 } 074 075 public void setPeerConfig(ReplicationPeerConfig peerConfig) { 076 this.peerConfig = peerConfig; 077 peerConfigListeners.forEach(listener -> listener.peerConfigUpdated(peerConfig)); 078 } 079 080 public void setNewSyncReplicationState(SyncReplicationState newState) { 081 this.syncReplicationStateBits = 082 (this.syncReplicationStateBits & AND_BITS) | (newState.value() << SHIFT); 083 } 084 085 public void transitSyncReplicationState() { 086 this.syncReplicationStateBits = 087 (this.syncReplicationStateBits >>> SHIFT) | (SyncReplicationState.NONE.value() << SHIFT); 088 } 089 090 /** 091 * Get the identifier of this peer 092 * @return string representation of the id (short) 093 */ 094 @Override 095 public String getId() { 096 return id; 097 } 098 099 @Override 100 public PeerState getPeerState() { 101 return peerState; 102 } 103 104 private static SyncReplicationState getSyncReplicationState(int bits) { 105 return SyncReplicationState.valueOf(bits & AND_BITS); 106 } 107 108 private static SyncReplicationState getNewSyncReplicationState(int bits) { 109 return SyncReplicationState.valueOf(bits >>> SHIFT); 110 } 111 112 public Pair<SyncReplicationState, SyncReplicationState> getSyncReplicationStateAndNewState() { 113 int bits = this.syncReplicationStateBits; 114 return Pair.newPair(getSyncReplicationState(bits), getNewSyncReplicationState(bits)); 115 } 116 117 public SyncReplicationState getNewSyncReplicationState() { 118 return getNewSyncReplicationState(syncReplicationStateBits); 119 } 120 121 @Override 122 public SyncReplicationState getSyncReplicationState() { 123 return getSyncReplicationState(syncReplicationStateBits); 124 } 125 126 @Override 127 public ReplicationPeerConfig getPeerConfig() { 128 return peerConfig; 129 } 130 131 @Override 132 public Configuration getConfiguration() { 133 return conf; 134 } 135 136 @Override 137 public Map<TableName, List<String>> getTableCFs() { 138 return this.peerConfig.getTableCFsMap(); 139 } 140 141 @Override 142 public Set<String> getNamespaces() { 143 return this.peerConfig.getNamespaces(); 144 } 145 146 @Override 147 public long getPeerBandwidth() { 148 return this.peerConfig.getBandwidth(); 149 } 150 151 @Override 152 public void registerPeerConfigListener(ReplicationPeerConfigListener listener) { 153 this.peerConfigListeners.add(listener); 154 } 155 156 @Override 157 public void onConfigurationChange(Configuration conf) { 158 this.conf = conf; 159 } 160}