001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import java.io.IOException; 021import java.io.InterruptedIOException; 022import java.util.concurrent.locks.Lock; 023import org.apache.hadoop.hbase.regionserver.HRegionServer; 024import org.apache.hadoop.hbase.regionserver.LogRoller; 025import org.apache.hadoop.hbase.replication.ReplicationException; 026import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState; 027import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 028import org.apache.hadoop.hbase.replication.ReplicationPeerImpl; 029import org.apache.hadoop.hbase.replication.ReplicationPeers; 030import org.apache.hadoop.hbase.replication.ReplicationQueueId; 031import org.apache.hadoop.hbase.replication.ReplicationUtils; 032import org.apache.hadoop.hbase.replication.SyncReplicationState; 033import org.apache.hadoop.hbase.util.KeyLocker; 034import org.apache.yetus.audience.InterfaceAudience; 035import org.slf4j.Logger; 036import org.slf4j.LoggerFactory; 037 038@InterfaceAudience.Private 039public class PeerProcedureHandlerImpl implements PeerProcedureHandler { 040 041 private static final Logger LOG = LoggerFactory.getLogger(PeerProcedureHandlerImpl.class); 042 043 private final ReplicationSourceManager replicationSourceManager; 044 private final PeerActionListener peerActionListener; 045 private final KeyLocker<String> peersLock = new KeyLocker<>(); 046 047 public PeerProcedureHandlerImpl(ReplicationSourceManager replicationSourceManager, 048 PeerActionListener peerActionListener) { 049 this.replicationSourceManager = replicationSourceManager; 050 this.peerActionListener = peerActionListener; 051 } 052 053 @Override 054 public void addPeer(String peerId) throws IOException { 055 Lock peerLock = peersLock.acquireLock(peerId); 056 try { 057 replicationSourceManager.addPeer(peerId); 058 } finally { 059 peerLock.unlock(); 060 } 061 } 062 063 @Override 064 public void removePeer(String peerId) throws IOException { 065 Lock peerLock = peersLock.acquireLock(peerId); 066 try { 067 if (replicationSourceManager.getReplicationPeers().getPeer(peerId) != null) { 068 replicationSourceManager.removePeer(peerId); 069 } 070 } finally { 071 peerLock.unlock(); 072 } 073 } 074 075 private void refreshPeerState(String peerId) throws ReplicationException, IOException { 076 Lock peerLock = peersLock.acquireLock(peerId); 077 ReplicationPeerImpl peer = null; 078 PeerState oldState = null; 079 boolean success = false; 080 try { 081 peer = replicationSourceManager.getReplicationPeers().getPeer(peerId); 082 if (peer == null) { 083 throw new ReplicationException("Peer with id=" + peerId + " is not cached."); 084 } 085 oldState = peer.getPeerState(); 086 PeerState newState = replicationSourceManager.getReplicationPeers().refreshPeerState(peerId); 087 // RS need to start work with the new replication state change 088 if (oldState.equals(PeerState.ENABLED) && newState.equals(PeerState.DISABLED)) { 089 replicationSourceManager.refreshSources(peerId); 090 } 091 success = true; 092 } finally { 093 if (!success && peer != null) { 094 // Reset peer state if refresh source failed 095 peer.setPeerState(oldState.equals(PeerState.ENABLED)); 096 } 097 peerLock.unlock(); 098 } 099 } 100 101 @Override 102 public void enablePeer(String peerId) throws ReplicationException, IOException { 103 refreshPeerState(peerId); 104 } 105 106 @Override 107 public void disablePeer(String peerId) throws ReplicationException, IOException { 108 refreshPeerState(peerId); 109 } 110 111 @Override 112 public void updatePeerConfig(String peerId) throws ReplicationException, IOException { 113 Lock peerLock = peersLock.acquireLock(peerId); 114 ReplicationPeers peers = replicationSourceManager.getReplicationPeers(); 115 ReplicationPeerImpl peer = null; 116 ReplicationPeerConfig oldConfig = null; 117 PeerState oldState = null; 118 boolean success = false; 119 try { 120 peer = peers.getPeer(peerId); 121 if (peer == null) { 122 throw new ReplicationException("Peer with id=" + peerId + " is not cached."); 123 } 124 oldConfig = peer.getPeerConfig(); 125 oldState = peer.getPeerState(); 126 ReplicationPeerConfig newConfig = peers.refreshPeerConfig(peerId); 127 // also need to refresh peer state here. When updating a serial replication peer we may 128 // disable it first and then enable it. 129 PeerState newState = peers.refreshPeerState(peerId); 130 // RS need to start work with the new replication config change 131 if ( 132 !ReplicationUtils.isNamespacesAndTableCFsEqual(oldConfig, newConfig) 133 || oldConfig.isSerial() != newConfig.isSerial() 134 || (oldState.equals(PeerState.ENABLED) && newState.equals(PeerState.DISABLED)) 135 ) { 136 replicationSourceManager.refreshSources(peerId); 137 } 138 success = true; 139 } finally { 140 if (!success && peer != null) { 141 // Reset peer config if refresh source failed 142 peer.setPeerConfig(oldConfig); 143 peer.setPeerState(oldState.equals(PeerState.ENABLED)); 144 } 145 peerLock.unlock(); 146 } 147 } 148 149 @Override 150 public void transitSyncReplicationPeerState(String peerId, int stage, HRegionServer rs) 151 throws ReplicationException, IOException { 152 ReplicationPeers replicationPeers = replicationSourceManager.getReplicationPeers(); 153 Lock peerLock = peersLock.acquireLock(peerId); 154 try { 155 ReplicationPeerImpl peer = replicationPeers.getPeer(peerId); 156 if (peer == null) { 157 throw new ReplicationException("Peer with id=" + peerId + " is not cached."); 158 } 159 if (!peer.getPeerConfig().isSyncReplication()) { 160 throw new ReplicationException("Peer with id=" + peerId + " is not synchronous."); 161 } 162 SyncReplicationState newSyncReplicationState = peer.getNewSyncReplicationState(); 163 if (stage == 0) { 164 if (newSyncReplicationState != SyncReplicationState.NONE) { 165 LOG.warn("The new sync replication state for peer {} has already been set to {}, " 166 + "this should be a retry, give up", peerId, newSyncReplicationState); 167 return; 168 } 169 // refresh the peer state first, as when we transit to STANDBY, we may need to disable the 170 // peer before processing the sync replication state. 171 PeerState oldState = peer.getPeerState(); 172 boolean success = false; 173 try { 174 PeerState newState = replicationPeers.refreshPeerState(peerId); 175 if (oldState.equals(PeerState.ENABLED) && newState.equals(PeerState.DISABLED)) { 176 replicationSourceManager.refreshSources(peerId); 177 } 178 success = true; 179 } finally { 180 if (!success) { 181 peer.setPeerState(oldState.equals(PeerState.ENABLED)); 182 } 183 } 184 newSyncReplicationState = replicationPeers.refreshPeerNewSyncReplicationState(peerId); 185 SyncReplicationState oldSyncReplicationState = peer.getSyncReplicationState(); 186 peerActionListener.peerSyncReplicationStateChange(peerId, oldSyncReplicationState, 187 newSyncReplicationState, stage); 188 } else { 189 if (newSyncReplicationState == SyncReplicationState.NONE) { 190 LOG.warn( 191 "The new sync replication state for peer {} has already been clear, and the " 192 + "current state is {}, this should be a retry, give up", 193 peerId, newSyncReplicationState); 194 return; 195 } 196 if (newSyncReplicationState == SyncReplicationState.STANDBY) { 197 replicationSourceManager.drainSources(peerId); 198 // Need to roll the wals and make the ReplicationSource for this peer track the new file. 199 // If we do not do this, there will be two problems that can not be addressed at the same 200 // time. First, if we just throw away the current wal file, and later when we transit the 201 // peer to DA, and the wal has not been rolled yet, then the new data written to the wal 202 // file will not be replicated and cause data inconsistency. But if we just track the 203 // current wal file without rolling, it may contains some data before we transit the peer 204 // to S, later if we transit the peer to DA, the data will also be replicated and cause 205 // data inconsistency. So here we need to roll the wal, and let the ReplicationSource 206 // track the new wal file, and throw the old wal files away. 207 LogRoller roller = rs.getWalRoller(); 208 roller.requestRollAll(); 209 try { 210 roller.waitUntilWalRollFinished(); 211 } catch (InterruptedException e) { 212 // reset the interrupted flag 213 Thread.currentThread().interrupt(); 214 throw (IOException) new InterruptedIOException( 215 "Interrupted while waiting for wal roll finish").initCause(e); 216 } 217 } 218 SyncReplicationState oldState = peer.getSyncReplicationState(); 219 peerActionListener.peerSyncReplicationStateChange(peerId, oldState, newSyncReplicationState, 220 stage); 221 peer.transitSyncReplicationState(); 222 } 223 } finally { 224 peerLock.unlock(); 225 } 226 } 227 228 @Override 229 public void claimReplicationQueue(ReplicationQueueId queueId) 230 throws ReplicationException, IOException { 231 replicationSourceManager.claimQueue(queueId); 232 } 233}