001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import java.io.IOException;
021import java.io.InterruptedIOException;
022import java.util.concurrent.locks.Lock;
023import org.apache.hadoop.hbase.ServerName;
024import org.apache.hadoop.hbase.regionserver.HRegionServer;
025import org.apache.hadoop.hbase.regionserver.LogRoller;
026import org.apache.hadoop.hbase.replication.ReplicationException;
027import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
028import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
029import org.apache.hadoop.hbase.replication.ReplicationPeerImpl;
030import org.apache.hadoop.hbase.replication.ReplicationPeers;
031import org.apache.hadoop.hbase.replication.ReplicationUtils;
032import org.apache.hadoop.hbase.replication.SyncReplicationState;
033import org.apache.hadoop.hbase.util.KeyLocker;
034import org.apache.yetus.audience.InterfaceAudience;
035import org.slf4j.Logger;
036import org.slf4j.LoggerFactory;
037
038@InterfaceAudience.Private
039public class PeerProcedureHandlerImpl implements PeerProcedureHandler {
040
041  private static final Logger LOG = LoggerFactory.getLogger(PeerProcedureHandlerImpl.class);
042
043  private final ReplicationSourceManager replicationSourceManager;
044  private final PeerActionListener peerActionListener;
045  private final KeyLocker<String> peersLock = new KeyLocker<>();
046
047  public PeerProcedureHandlerImpl(ReplicationSourceManager replicationSourceManager,
048    PeerActionListener peerActionListener) {
049    this.replicationSourceManager = replicationSourceManager;
050    this.peerActionListener = peerActionListener;
051  }
052
053  @Override
054  public void addPeer(String peerId) throws IOException {
055    Lock peerLock = peersLock.acquireLock(peerId);
056    try {
057      replicationSourceManager.addPeer(peerId);
058    } finally {
059      peerLock.unlock();
060    }
061  }
062
063  @Override
064  public void removePeer(String peerId) throws IOException {
065    Lock peerLock = peersLock.acquireLock(peerId);
066    try {
067      if (replicationSourceManager.getReplicationPeers().getPeer(peerId) != null) {
068        replicationSourceManager.removePeer(peerId);
069      }
070    } finally {
071      peerLock.unlock();
072    }
073  }
074
075  private void refreshPeerState(String peerId) throws ReplicationException, IOException {
076    Lock peerLock = peersLock.acquireLock(peerId);
077    ReplicationPeerImpl peer = null;
078    PeerState oldState = null;
079    boolean success = false;
080    try {
081      peer = replicationSourceManager.getReplicationPeers().getPeer(peerId);
082      if (peer == null) {
083        throw new ReplicationException("Peer with id=" + peerId + " is not cached.");
084      }
085      oldState = peer.getPeerState();
086      PeerState newState = replicationSourceManager.getReplicationPeers().refreshPeerState(peerId);
087      // RS need to start work with the new replication state change
088      if (oldState.equals(PeerState.ENABLED) && newState.equals(PeerState.DISABLED)) {
089        replicationSourceManager.refreshSources(peerId);
090      }
091      success = true;
092    } finally {
093      if (!success && peer != null) {
094        // Reset peer state if refresh source failed
095        peer.setPeerState(oldState.equals(PeerState.ENABLED));
096      }
097      peerLock.unlock();
098    }
099  }
100
101  @Override
102  public void enablePeer(String peerId) throws ReplicationException, IOException {
103    refreshPeerState(peerId);
104  }
105
106  @Override
107  public void disablePeer(String peerId) throws ReplicationException, IOException {
108    refreshPeerState(peerId);
109  }
110
111  @Override
112  public void updatePeerConfig(String peerId) throws ReplicationException, IOException {
113    Lock peerLock = peersLock.acquireLock(peerId);
114    ReplicationPeers peers = replicationSourceManager.getReplicationPeers();
115    ReplicationPeerImpl peer = null;
116    ReplicationPeerConfig oldConfig = null;
117    PeerState oldState = null;
118    boolean success = false;
119    try {
120      peer = peers.getPeer(peerId);
121      if (peer == null) {
122        throw new ReplicationException("Peer with id=" + peerId + " is not cached.");
123      }
124      oldConfig = peer.getPeerConfig();
125      oldState = peer.getPeerState();
126      ReplicationPeerConfig newConfig = peers.refreshPeerConfig(peerId);
127      // also need to refresh peer state here. When updating a serial replication peer we may
128      // disable it first and then enable it.
129      PeerState newState = peers.refreshPeerState(peerId);
130      // RS need to start work with the new replication config change
131      if (
132        !ReplicationUtils.isNamespacesAndTableCFsEqual(oldConfig, newConfig)
133          || oldConfig.isSerial() != newConfig.isSerial()
134          || (oldState.equals(PeerState.ENABLED) && newState.equals(PeerState.DISABLED))
135      ) {
136        replicationSourceManager.refreshSources(peerId);
137      }
138      success = true;
139    } finally {
140      if (!success && peer != null) {
141        // Reset peer config if refresh source failed
142        peer.setPeerConfig(oldConfig);
143        peer.setPeerState(oldState.equals(PeerState.ENABLED));
144      }
145      peerLock.unlock();
146    }
147  }
148
149  @Override
150  public void transitSyncReplicationPeerState(String peerId, int stage, HRegionServer rs)
151    throws ReplicationException, IOException {
152    ReplicationPeers replicationPeers = replicationSourceManager.getReplicationPeers();
153    Lock peerLock = peersLock.acquireLock(peerId);
154    try {
155      ReplicationPeerImpl peer = replicationPeers.getPeer(peerId);
156      if (peer == null) {
157        throw new ReplicationException("Peer with id=" + peerId + " is not cached.");
158      }
159      if (!peer.getPeerConfig().isSyncReplication()) {
160        throw new ReplicationException("Peer with id=" + peerId + " is not synchronous.");
161      }
162      SyncReplicationState newSyncReplicationState = peer.getNewSyncReplicationState();
163      if (stage == 0) {
164        if (newSyncReplicationState != SyncReplicationState.NONE) {
165          LOG.warn("The new sync replication state for peer {} has already been set to {}, "
166            + "this should be a retry, give up", peerId, newSyncReplicationState);
167          return;
168        }
169        // refresh the peer state first, as when we transit to STANDBY, we may need to disable the
170        // peer before processing the sync replication state.
171        PeerState oldState = peer.getPeerState();
172        boolean success = false;
173        try {
174          PeerState newState = replicationPeers.refreshPeerState(peerId);
175          if (oldState.equals(PeerState.ENABLED) && newState.equals(PeerState.DISABLED)) {
176            replicationSourceManager.refreshSources(peerId);
177          }
178          success = true;
179        } finally {
180          if (!success) {
181            peer.setPeerState(oldState.equals(PeerState.ENABLED));
182          }
183        }
184        newSyncReplicationState = replicationPeers.refreshPeerNewSyncReplicationState(peerId);
185        SyncReplicationState oldSyncReplicationState = peer.getSyncReplicationState();
186        peerActionListener.peerSyncReplicationStateChange(peerId, oldSyncReplicationState,
187          newSyncReplicationState, stage);
188      } else {
189        if (newSyncReplicationState == SyncReplicationState.NONE) {
190          LOG.warn(
191            "The new sync replication state for peer {} has already been clear, and the "
192              + "current state is {}, this should be a retry, give up",
193            peerId, newSyncReplicationState);
194          return;
195        }
196        if (newSyncReplicationState == SyncReplicationState.STANDBY) {
197          replicationSourceManager.drainSources(peerId);
198          // Need to roll the wals and make the ReplicationSource for this peer track the new file.
199          // If we do not do this, there will be two problems that can not be addressed at the same
200          // time. First, if we just throw away the current wal file, and later when we transit the
201          // peer to DA, and the wal has not been rolled yet, then the new data written to the wal
202          // file will not be replicated and cause data inconsistency. But if we just track the
203          // current wal file without rolling, it may contains some data before we transit the peer
204          // to S, later if we transit the peer to DA, the data will also be replicated and cause
205          // data inconsistency. So here we need to roll the wal, and let the ReplicationSource
206          // track the new wal file, and throw the old wal files away.
207          LogRoller roller = rs.getWalRoller();
208          roller.requestRollAll();
209          try {
210            roller.waitUntilWalRollFinished();
211          } catch (InterruptedException e) {
212            // reset the interrupted flag
213            Thread.currentThread().interrupt();
214            throw (IOException) new InterruptedIOException(
215              "Interrupted while waiting for wal roll finish").initCause(e);
216          }
217        }
218        SyncReplicationState oldState = peer.getSyncReplicationState();
219        peerActionListener.peerSyncReplicationStateChange(peerId, oldState, newSyncReplicationState,
220          stage);
221        peer.transitSyncReplicationState();
222      }
223    } finally {
224      peerLock.unlock();
225    }
226  }
227
228  @Override
229  public void claimReplicationQueue(ServerName crashedServer, String queue)
230    throws ReplicationException, IOException {
231    replicationSourceManager.claimQueue(crashedServer, queue);
232  }
233}