001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import java.io.IOException; 021import java.io.InterruptedIOException; 022import java.util.Map; 023import java.util.concurrent.locks.Lock; 024import org.apache.hadoop.hbase.regionserver.HRegionServer; 025import org.apache.hadoop.hbase.regionserver.LogRoller; 026import org.apache.hadoop.hbase.replication.ReplicationException; 027import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState; 028import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 029import org.apache.hadoop.hbase.replication.ReplicationPeerImpl; 030import org.apache.hadoop.hbase.replication.ReplicationPeers; 031import org.apache.hadoop.hbase.replication.ReplicationQueueId; 032import org.apache.hadoop.hbase.replication.ReplicationUtils; 033import org.apache.hadoop.hbase.replication.SyncReplicationState; 034import org.apache.hadoop.hbase.util.KeyLocker; 035import org.apache.yetus.audience.InterfaceAudience; 036import org.slf4j.Logger; 037import org.slf4j.LoggerFactory; 038 039@InterfaceAudience.Private 040public class PeerProcedureHandlerImpl implements PeerProcedureHandler { 041 042 private static final Logger LOG = LoggerFactory.getLogger(PeerProcedureHandlerImpl.class); 043 044 private final ReplicationSourceManager replicationSourceManager; 045 private final PeerActionListener peerActionListener; 046 private final KeyLocker<String> peersLock = new KeyLocker<>(); 047 048 public PeerProcedureHandlerImpl(ReplicationSourceManager replicationSourceManager, 049 PeerActionListener peerActionListener) { 050 this.replicationSourceManager = replicationSourceManager; 051 this.peerActionListener = peerActionListener; 052 } 053 054 @Override 055 public void addPeer(String peerId) throws IOException { 056 Lock peerLock = peersLock.acquireLock(peerId); 057 try { 058 replicationSourceManager.addPeer(peerId); 059 } finally { 060 peerLock.unlock(); 061 } 062 } 063 064 @Override 065 public void removePeer(String peerId) throws IOException { 066 Lock peerLock = peersLock.acquireLock(peerId); 067 try { 068 if (replicationSourceManager.getReplicationPeers().getPeer(peerId) != null) { 069 replicationSourceManager.removePeer(peerId); 070 } 071 } finally { 072 peerLock.unlock(); 073 } 074 } 075 076 private void refreshPeerState(String peerId) throws ReplicationException, IOException { 077 Lock peerLock = peersLock.acquireLock(peerId); 078 ReplicationPeerImpl peer = null; 079 PeerState oldState = null; 080 boolean success = false; 081 try { 082 peer = replicationSourceManager.getReplicationPeers().getPeer(peerId); 083 if (peer == null) { 084 throw new ReplicationException("Peer with id=" + peerId + " is not cached."); 085 } 086 oldState = peer.getPeerState(); 087 PeerState newState = replicationSourceManager.getReplicationPeers().refreshPeerState(peerId); 088 // RS need to start work with the new replication state change 089 if (oldState.equals(PeerState.ENABLED) && newState.equals(PeerState.DISABLED)) { 090 replicationSourceManager.refreshSources(peerId); 091 } 092 success = true; 093 } finally { 094 if (!success && peer != null) { 095 // Reset peer state if refresh source failed 096 peer.setPeerState(oldState.equals(PeerState.ENABLED)); 097 } 098 peerLock.unlock(); 099 } 100 } 101 102 @Override 103 public void enablePeer(String peerId) throws ReplicationException, IOException { 104 refreshPeerState(peerId); 105 } 106 107 @Override 108 public void disablePeer(String peerId) throws ReplicationException, IOException { 109 refreshPeerState(peerId); 110 } 111 112 private boolean hasReplicationConfigChange(ReplicationPeerConfig oldConfig, 113 ReplicationPeerConfig newConfig) { 114 Map<String, String> oldReplicationConfigs = oldConfig.getConfiguration(); 115 Map<String, String> newReplicationConfigs = newConfig.getConfiguration(); 116 117 // Check if any replication.source.* keys have changed values 118 for (Map.Entry<String, String> entry : newReplicationConfigs.entrySet()) { 119 String key = entry.getKey(); 120 if (key.startsWith("replication.source.")) { 121 String oldValue = oldReplicationConfigs.get(key); 122 String newValue = entry.getValue(); 123 if (!newValue.equals(oldValue)) { 124 return true; 125 } 126 } 127 } 128 129 // Check if any replication.source.* keys were removed 130 for (String key : oldReplicationConfigs.keySet()) { 131 if (key.startsWith("replication.source.") && !newReplicationConfigs.containsKey(key)) { 132 return true; 133 } 134 } 135 136 return false; 137 } 138 139 @Override 140 public void updatePeerConfig(String peerId) throws ReplicationException, IOException { 141 Lock peerLock = peersLock.acquireLock(peerId); 142 ReplicationPeers peers = replicationSourceManager.getReplicationPeers(); 143 ReplicationPeerImpl peer = null; 144 ReplicationPeerConfig oldConfig = null; 145 PeerState oldState = null; 146 boolean success = false; 147 try { 148 peer = peers.getPeer(peerId); 149 if (peer == null) { 150 throw new ReplicationException("Peer with id=" + peerId + " is not cached."); 151 } 152 oldConfig = peer.getPeerConfig(); 153 oldState = peer.getPeerState(); 154 ReplicationPeerConfig newConfig = peers.refreshPeerConfig(peerId); 155 // also need to refresh peer state here. When updating a serial replication peer we may 156 // disable it first and then enable it. 157 PeerState newState = peers.refreshPeerState(peerId); 158 // RS need to start work with the new replication config change 159 if ( 160 !ReplicationUtils.isNamespacesAndTableCFsEqual(oldConfig, newConfig) 161 || oldConfig.isSerial() != newConfig.isSerial() 162 || hasReplicationConfigChange(oldConfig, newConfig) 163 || (oldState.equals(PeerState.ENABLED) && newState.equals(PeerState.DISABLED)) 164 ) { 165 replicationSourceManager.refreshSources(peerId); 166 } 167 success = true; 168 } finally { 169 if (!success && peer != null) { 170 // Reset peer config if refresh source failed 171 peer.setPeerConfig(oldConfig); 172 peer.setPeerState(oldState.equals(PeerState.ENABLED)); 173 } 174 peerLock.unlock(); 175 } 176 } 177 178 @Override 179 public void transitSyncReplicationPeerState(String peerId, int stage, HRegionServer rs) 180 throws ReplicationException, IOException { 181 ReplicationPeers replicationPeers = replicationSourceManager.getReplicationPeers(); 182 Lock peerLock = peersLock.acquireLock(peerId); 183 try { 184 ReplicationPeerImpl peer = replicationPeers.getPeer(peerId); 185 if (peer == null) { 186 throw new ReplicationException("Peer with id=" + peerId + " is not cached."); 187 } 188 if (!peer.getPeerConfig().isSyncReplication()) { 189 throw new ReplicationException("Peer with id=" + peerId + " is not synchronous."); 190 } 191 SyncReplicationState newSyncReplicationState = peer.getNewSyncReplicationState(); 192 if (stage == 0) { 193 if (newSyncReplicationState != SyncReplicationState.NONE) { 194 LOG.warn("The new sync replication state for peer {} has already been set to {}, " 195 + "this should be a retry, give up", peerId, newSyncReplicationState); 196 return; 197 } 198 // refresh the peer state first, as when we transit to STANDBY, we may need to disable the 199 // peer before processing the sync replication state. 200 PeerState oldState = peer.getPeerState(); 201 boolean success = false; 202 try { 203 PeerState newState = replicationPeers.refreshPeerState(peerId); 204 if (oldState.equals(PeerState.ENABLED) && newState.equals(PeerState.DISABLED)) { 205 replicationSourceManager.refreshSources(peerId); 206 } 207 success = true; 208 } finally { 209 if (!success) { 210 peer.setPeerState(oldState.equals(PeerState.ENABLED)); 211 } 212 } 213 newSyncReplicationState = replicationPeers.refreshPeerNewSyncReplicationState(peerId); 214 SyncReplicationState oldSyncReplicationState = peer.getSyncReplicationState(); 215 peerActionListener.peerSyncReplicationStateChange(peerId, oldSyncReplicationState, 216 newSyncReplicationState, stage); 217 } else { 218 if (newSyncReplicationState == SyncReplicationState.NONE) { 219 LOG.warn( 220 "The new sync replication state for peer {} has already been clear, and the " 221 + "current state is {}, this should be a retry, give up", 222 peerId, newSyncReplicationState); 223 return; 224 } 225 if (newSyncReplicationState == SyncReplicationState.STANDBY) { 226 replicationSourceManager.drainSources(peerId); 227 // Need to roll the wals and make the ReplicationSource for this peer track the new file. 228 // If we do not do this, there will be two problems that can not be addressed at the same 229 // time. First, if we just throw away the current wal file, and later when we transit the 230 // peer to DA, and the wal has not been rolled yet, then the new data written to the wal 231 // file will not be replicated and cause data inconsistency. But if we just track the 232 // current wal file without rolling, it may contains some data before we transit the peer 233 // to S, later if we transit the peer to DA, the data will also be replicated and cause 234 // data inconsistency. So here we need to roll the wal, and let the ReplicationSource 235 // track the new wal file, and throw the old wal files away. 236 LogRoller roller = rs.getWalRoller(); 237 roller.requestRollAll(); 238 try { 239 roller.waitUntilWalRollFinished(); 240 } catch (InterruptedException e) { 241 // reset the interrupted flag 242 Thread.currentThread().interrupt(); 243 throw (IOException) new InterruptedIOException( 244 "Interrupted while waiting for wal roll finish").initCause(e); 245 } 246 } 247 SyncReplicationState oldState = peer.getSyncReplicationState(); 248 peerActionListener.peerSyncReplicationStateChange(peerId, oldState, newSyncReplicationState, 249 stage); 250 peer.transitSyncReplicationState(); 251 } 252 } finally { 253 peerLock.unlock(); 254 } 255 } 256 257 @Override 258 public void claimReplicationQueue(ReplicationQueueId queueId) 259 throws ReplicationException, IOException { 260 replicationSourceManager.claimQueue(queueId); 261 } 262}