001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import java.io.IOException;
021import java.io.InterruptedIOException;
022import java.util.concurrent.locks.Lock;
023import org.apache.hadoop.hbase.regionserver.HRegionServer;
024import org.apache.hadoop.hbase.regionserver.LogRoller;
025import org.apache.hadoop.hbase.replication.ReplicationException;
026import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
027import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
028import org.apache.hadoop.hbase.replication.ReplicationPeerImpl;
029import org.apache.hadoop.hbase.replication.ReplicationPeers;
030import org.apache.hadoop.hbase.replication.ReplicationUtils;
031import org.apache.hadoop.hbase.replication.SyncReplicationState;
032import org.apache.hadoop.hbase.util.KeyLocker;
033import org.apache.yetus.audience.InterfaceAudience;
034import org.slf4j.Logger;
035import org.slf4j.LoggerFactory;
036
037@InterfaceAudience.Private
038public class PeerProcedureHandlerImpl implements PeerProcedureHandler {
039
040  private static final Logger LOG = LoggerFactory.getLogger(PeerProcedureHandlerImpl.class);
041
042  private final ReplicationSourceManager replicationSourceManager;
043  private final PeerActionListener peerActionListener;
044  private final KeyLocker<String> peersLock = new KeyLocker<>();
045
046  public PeerProcedureHandlerImpl(ReplicationSourceManager replicationSourceManager,
047      PeerActionListener peerActionListener) {
048    this.replicationSourceManager = replicationSourceManager;
049    this.peerActionListener = peerActionListener;
050  }
051
052  @Override
053  public void addPeer(String peerId) throws IOException {
054    Lock peerLock = peersLock.acquireLock(peerId);
055    try {
056      replicationSourceManager.addPeer(peerId);
057    } finally {
058      peerLock.unlock();
059    }
060  }
061
062  @Override
063  public void removePeer(String peerId) throws IOException {
064    Lock peerLock = peersLock.acquireLock(peerId);
065    try {
066      if (replicationSourceManager.getReplicationPeers().getPeer(peerId) != null) {
067        replicationSourceManager.removePeer(peerId);
068      }
069    } finally {
070      peerLock.unlock();
071    }
072  }
073
074  private void refreshPeerState(String peerId) throws ReplicationException, IOException {
075    Lock peerLock = peersLock.acquireLock(peerId);
076    ReplicationPeerImpl peer = null;
077    PeerState oldState = null;
078    boolean success = false;
079    try {
080      peer = replicationSourceManager.getReplicationPeers().getPeer(peerId);
081      if (peer == null) {
082        throw new ReplicationException("Peer with id=" + peerId + " is not cached.");
083      }
084      oldState = peer.getPeerState();
085      PeerState newState = replicationSourceManager.getReplicationPeers().refreshPeerState(peerId);
086      // RS need to start work with the new replication state change
087      if (oldState.equals(PeerState.ENABLED) && newState.equals(PeerState.DISABLED)) {
088        replicationSourceManager.refreshSources(peerId);
089      }
090      success = true;
091    } finally {
092      if (!success && peer != null) {
093        // Reset peer state if refresh source failed
094        peer.setPeerState(oldState.equals(PeerState.ENABLED));
095      }
096      peerLock.unlock();
097    }
098  }
099
100  @Override
101  public void enablePeer(String peerId) throws ReplicationException, IOException {
102    refreshPeerState(peerId);
103  }
104
105  @Override
106  public void disablePeer(String peerId) throws ReplicationException, IOException {
107    refreshPeerState(peerId);
108  }
109
110  @Override
111  public void updatePeerConfig(String peerId) throws ReplicationException, IOException {
112    Lock peerLock = peersLock.acquireLock(peerId);
113    ReplicationPeers peers = replicationSourceManager.getReplicationPeers();
114    ReplicationPeerImpl peer = null;
115    ReplicationPeerConfig oldConfig = null;
116    PeerState oldState = null;
117    boolean success = false;
118    try {
119      peer = peers.getPeer(peerId);
120      if (peer == null) {
121        throw new ReplicationException("Peer with id=" + peerId + " is not cached.");
122      }
123      oldConfig = peer.getPeerConfig();
124      oldState = peer.getPeerState();
125      ReplicationPeerConfig newConfig = peers.refreshPeerConfig(peerId);
126      // also need to refresh peer state here. When updating a serial replication peer we may
127      // disable it first and then enable it.
128      PeerState newState = peers.refreshPeerState(peerId);
129      // RS need to start work with the new replication config change
130      if (!ReplicationUtils.isNamespacesAndTableCFsEqual(oldConfig, newConfig) ||
131        oldConfig.isSerial() != newConfig.isSerial() ||
132        (oldState.equals(PeerState.ENABLED) && newState.equals(PeerState.DISABLED))) {
133        replicationSourceManager.refreshSources(peerId);
134      }
135      success = true;
136    } finally {
137      if (!success && peer != null) {
138        // Reset peer config if refresh source failed
139        peer.setPeerConfig(oldConfig);
140        peer.setPeerState(oldState.equals(PeerState.ENABLED));
141      }
142      peerLock.unlock();
143    }
144  }
145
146  @Override
147  public void transitSyncReplicationPeerState(String peerId, int stage, HRegionServer rs)
148      throws ReplicationException, IOException {
149    ReplicationPeers replicationPeers = replicationSourceManager.getReplicationPeers();
150    Lock peerLock = peersLock.acquireLock(peerId);
151    try {
152      ReplicationPeerImpl peer = replicationPeers.getPeer(peerId);
153      if (peer == null) {
154        throw new ReplicationException("Peer with id=" + peerId + " is not cached.");
155      }
156      if (!peer.getPeerConfig().isSyncReplication()) {
157        throw new ReplicationException("Peer with id=" + peerId + " is not synchronous.");
158      }
159      SyncReplicationState newSyncReplicationState = peer.getNewSyncReplicationState();
160      if (stage == 0) {
161        if (newSyncReplicationState != SyncReplicationState.NONE) {
162          LOG.warn("The new sync replication state for peer {} has already been set to {}, " +
163            "this should be a retry, give up", peerId, newSyncReplicationState);
164          return;
165        }
166        // refresh the peer state first, as when we transit to STANDBY, we may need to disable the
167        // peer before processing the sync replication state.
168        PeerState oldState = peer.getPeerState();
169        boolean success = false;
170        try {
171          PeerState newState = replicationPeers.refreshPeerState(peerId);
172          if (oldState.equals(PeerState.ENABLED) && newState.equals(PeerState.DISABLED)) {
173            replicationSourceManager.refreshSources(peerId);
174          }
175          success = true;
176        } finally {
177          if (!success) {
178            peer.setPeerState(oldState.equals(PeerState.ENABLED));
179          }
180        }
181        newSyncReplicationState = replicationPeers.refreshPeerNewSyncReplicationState(peerId);
182        SyncReplicationState oldSyncReplicationState = peer.getSyncReplicationState();
183        peerActionListener.peerSyncReplicationStateChange(peerId, oldSyncReplicationState,
184          newSyncReplicationState, stage);
185      } else {
186        if (newSyncReplicationState == SyncReplicationState.NONE) {
187          LOG.warn(
188            "The new sync replication state for peer {} has already been clear, and the " +
189              "current state is {}, this should be a retry, give up",
190            peerId, newSyncReplicationState);
191          return;
192        }
193        if (newSyncReplicationState == SyncReplicationState.STANDBY) {
194          replicationSourceManager.drainSources(peerId);
195          // Need to roll the wals and make the ReplicationSource for this peer track the new file.
196          // If we do not do this, there will be two problems that can not be addressed at the same
197          // time. First, if we just throw away the current wal file, and later when we transit the
198          // peer to DA, and the wal has not been rolled yet, then the new data written to the wal
199          // file will not be replicated and cause data inconsistency. But if we just track the
200          // current wal file without rolling, it may contains some data before we transit the peer
201          // to S, later if we transit the peer to DA, the data will also be replicated and cause
202          // data inconsistency. So here we need to roll the wal, and let the ReplicationSource
203          // track the new wal file, and throw the old wal files away.
204          LogRoller roller = rs.getWalRoller();
205          roller.requestRollAll();
206          try {
207            roller.waitUntilWalRollFinished();
208          } catch (InterruptedException e) {
209            // reset the interrupted flag
210            Thread.currentThread().interrupt();
211            throw (IOException) new InterruptedIOException(
212              "Interrupted while waiting for wal roll finish").initCause(e);
213          }
214        }
215        SyncReplicationState oldState = peer.getSyncReplicationState();
216        peerActionListener.peerSyncReplicationStateChange(peerId, oldState, newSyncReplicationState,
217          stage);
218        peer.transitSyncReplicationState();
219      }
220    } finally {
221      peerLock.unlock();
222    }
223  }
224}