001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import java.io.IOException;
021import java.io.InterruptedIOException;
022import java.util.Map;
023import java.util.concurrent.locks.Lock;
024import org.apache.hadoop.hbase.regionserver.HRegionServer;
025import org.apache.hadoop.hbase.regionserver.LogRoller;
026import org.apache.hadoop.hbase.replication.ReplicationException;
027import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
028import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
029import org.apache.hadoop.hbase.replication.ReplicationPeerImpl;
030import org.apache.hadoop.hbase.replication.ReplicationPeers;
031import org.apache.hadoop.hbase.replication.ReplicationQueueId;
032import org.apache.hadoop.hbase.replication.ReplicationUtils;
033import org.apache.hadoop.hbase.replication.SyncReplicationState;
034import org.apache.hadoop.hbase.util.KeyLocker;
035import org.apache.yetus.audience.InterfaceAudience;
036import org.slf4j.Logger;
037import org.slf4j.LoggerFactory;
038
039@InterfaceAudience.Private
040public class PeerProcedureHandlerImpl implements PeerProcedureHandler {
041
042  private static final Logger LOG = LoggerFactory.getLogger(PeerProcedureHandlerImpl.class);
043
044  private final ReplicationSourceManager replicationSourceManager;
045  private final PeerActionListener peerActionListener;
046  private final KeyLocker<String> peersLock = new KeyLocker<>();
047
048  public PeerProcedureHandlerImpl(ReplicationSourceManager replicationSourceManager,
049    PeerActionListener peerActionListener) {
050    this.replicationSourceManager = replicationSourceManager;
051    this.peerActionListener = peerActionListener;
052  }
053
054  @Override
055  public void addPeer(String peerId) throws IOException {
056    Lock peerLock = peersLock.acquireLock(peerId);
057    try {
058      replicationSourceManager.addPeer(peerId);
059    } finally {
060      peerLock.unlock();
061    }
062  }
063
064  @Override
065  public void removePeer(String peerId) throws IOException {
066    Lock peerLock = peersLock.acquireLock(peerId);
067    try {
068      if (replicationSourceManager.getReplicationPeers().getPeer(peerId) != null) {
069        replicationSourceManager.removePeer(peerId);
070      }
071    } finally {
072      peerLock.unlock();
073    }
074  }
075
076  private void refreshPeerState(String peerId) throws ReplicationException, IOException {
077    Lock peerLock = peersLock.acquireLock(peerId);
078    ReplicationPeerImpl peer = null;
079    PeerState oldState = null;
080    boolean success = false;
081    try {
082      peer = replicationSourceManager.getReplicationPeers().getPeer(peerId);
083      if (peer == null) {
084        throw new ReplicationException("Peer with id=" + peerId + " is not cached.");
085      }
086      oldState = peer.getPeerState();
087      PeerState newState = replicationSourceManager.getReplicationPeers().refreshPeerState(peerId);
088      // RS need to start work with the new replication state change
089      if (oldState.equals(PeerState.ENABLED) && newState.equals(PeerState.DISABLED)) {
090        replicationSourceManager.refreshSources(peerId);
091      }
092      success = true;
093    } finally {
094      if (!success && peer != null) {
095        // Reset peer state if refresh source failed
096        peer.setPeerState(oldState.equals(PeerState.ENABLED));
097      }
098      peerLock.unlock();
099    }
100  }
101
102  @Override
103  public void enablePeer(String peerId) throws ReplicationException, IOException {
104    refreshPeerState(peerId);
105  }
106
107  @Override
108  public void disablePeer(String peerId) throws ReplicationException, IOException {
109    refreshPeerState(peerId);
110  }
111
112  private boolean hasReplicationConfigChange(ReplicationPeerConfig oldConfig,
113    ReplicationPeerConfig newConfig) {
114    Map<String, String> oldReplicationConfigs = oldConfig.getConfiguration();
115    Map<String, String> newReplicationConfigs = newConfig.getConfiguration();
116
117    // Check if any replication.source.* keys have changed values
118    for (Map.Entry<String, String> entry : newReplicationConfigs.entrySet()) {
119      String key = entry.getKey();
120      if (key.startsWith("replication.source.")) {
121        String oldValue = oldReplicationConfigs.get(key);
122        String newValue = entry.getValue();
123        if (!newValue.equals(oldValue)) {
124          return true;
125        }
126      }
127    }
128
129    // Check if any replication.source.* keys were removed
130    for (String key : oldReplicationConfigs.keySet()) {
131      if (key.startsWith("replication.source.") && !newReplicationConfigs.containsKey(key)) {
132        return true;
133      }
134    }
135
136    return false;
137  }
138
139  @Override
140  public void updatePeerConfig(String peerId) throws ReplicationException, IOException {
141    Lock peerLock = peersLock.acquireLock(peerId);
142    ReplicationPeers peers = replicationSourceManager.getReplicationPeers();
143    ReplicationPeerImpl peer = null;
144    ReplicationPeerConfig oldConfig = null;
145    PeerState oldState = null;
146    boolean success = false;
147    try {
148      peer = peers.getPeer(peerId);
149      if (peer == null) {
150        throw new ReplicationException("Peer with id=" + peerId + " is not cached.");
151      }
152      oldConfig = peer.getPeerConfig();
153      oldState = peer.getPeerState();
154      ReplicationPeerConfig newConfig = peers.refreshPeerConfig(peerId);
155      // also need to refresh peer state here. When updating a serial replication peer we may
156      // disable it first and then enable it.
157      PeerState newState = peers.refreshPeerState(peerId);
158      // RS need to start work with the new replication config change
159      if (
160        !ReplicationUtils.isNamespacesAndTableCFsEqual(oldConfig, newConfig)
161          || oldConfig.isSerial() != newConfig.isSerial()
162          || hasReplicationConfigChange(oldConfig, newConfig)
163          || (oldState.equals(PeerState.ENABLED) && newState.equals(PeerState.DISABLED))
164      ) {
165        replicationSourceManager.refreshSources(peerId);
166      }
167      success = true;
168    } finally {
169      if (!success && peer != null) {
170        // Reset peer config if refresh source failed
171        peer.setPeerConfig(oldConfig);
172        peer.setPeerState(oldState.equals(PeerState.ENABLED));
173      }
174      peerLock.unlock();
175    }
176  }
177
178  @Override
179  public void transitSyncReplicationPeerState(String peerId, int stage, HRegionServer rs)
180    throws ReplicationException, IOException {
181    ReplicationPeers replicationPeers = replicationSourceManager.getReplicationPeers();
182    Lock peerLock = peersLock.acquireLock(peerId);
183    try {
184      ReplicationPeerImpl peer = replicationPeers.getPeer(peerId);
185      if (peer == null) {
186        throw new ReplicationException("Peer with id=" + peerId + " is not cached.");
187      }
188      if (!peer.getPeerConfig().isSyncReplication()) {
189        throw new ReplicationException("Peer with id=" + peerId + " is not synchronous.");
190      }
191      SyncReplicationState newSyncReplicationState = peer.getNewSyncReplicationState();
192      if (stage == 0) {
193        if (newSyncReplicationState != SyncReplicationState.NONE) {
194          LOG.warn("The new sync replication state for peer {} has already been set to {}, "
195            + "this should be a retry, give up", peerId, newSyncReplicationState);
196          return;
197        }
198        // refresh the peer state first, as when we transit to STANDBY, we may need to disable the
199        // peer before processing the sync replication state.
200        PeerState oldState = peer.getPeerState();
201        boolean success = false;
202        try {
203          PeerState newState = replicationPeers.refreshPeerState(peerId);
204          if (oldState.equals(PeerState.ENABLED) && newState.equals(PeerState.DISABLED)) {
205            replicationSourceManager.refreshSources(peerId);
206          }
207          success = true;
208        } finally {
209          if (!success) {
210            peer.setPeerState(oldState.equals(PeerState.ENABLED));
211          }
212        }
213        newSyncReplicationState = replicationPeers.refreshPeerNewSyncReplicationState(peerId);
214        SyncReplicationState oldSyncReplicationState = peer.getSyncReplicationState();
215        peerActionListener.peerSyncReplicationStateChange(peerId, oldSyncReplicationState,
216          newSyncReplicationState, stage);
217      } else {
218        if (newSyncReplicationState == SyncReplicationState.NONE) {
219          LOG.warn(
220            "The new sync replication state for peer {} has already been clear, and the "
221              + "current state is {}, this should be a retry, give up",
222            peerId, newSyncReplicationState);
223          return;
224        }
225        if (newSyncReplicationState == SyncReplicationState.STANDBY) {
226          replicationSourceManager.drainSources(peerId);
227          // Need to roll the wals and make the ReplicationSource for this peer track the new file.
228          // If we do not do this, there will be two problems that can not be addressed at the same
229          // time. First, if we just throw away the current wal file, and later when we transit the
230          // peer to DA, and the wal has not been rolled yet, then the new data written to the wal
231          // file will not be replicated and cause data inconsistency. But if we just track the
232          // current wal file without rolling, it may contains some data before we transit the peer
233          // to S, later if we transit the peer to DA, the data will also be replicated and cause
234          // data inconsistency. So here we need to roll the wal, and let the ReplicationSource
235          // track the new wal file, and throw the old wal files away.
236          LogRoller roller = rs.getWalRoller();
237          roller.requestRollAll();
238          try {
239            roller.waitUntilWalRollFinished();
240          } catch (InterruptedException e) {
241            // reset the interrupted flag
242            Thread.currentThread().interrupt();
243            throw (IOException) new InterruptedIOException(
244              "Interrupted while waiting for wal roll finish").initCause(e);
245          }
246        }
247        SyncReplicationState oldState = peer.getSyncReplicationState();
248        peerActionListener.peerSyncReplicationStateChange(peerId, oldState, newSyncReplicationState,
249          stage);
250        peer.transitSyncReplicationState();
251      }
252    } finally {
253      peerLock.unlock();
254    }
255  }
256
257  @Override
258  public void claimReplicationQueue(ReplicationQueueId queueId)
259    throws ReplicationException, IOException {
260    replicationSourceManager.claimQueue(queueId);
261  }
262}