001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.mockito.ArgumentMatchers.anyString;
021import static org.mockito.Mockito.mock;
022import static org.mockito.Mockito.never;
023import static org.mockito.Mockito.times;
024import static org.mockito.Mockito.verify;
025import static org.mockito.Mockito.when;
026
027import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
028import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
029import org.apache.hadoop.hbase.replication.ReplicationPeerImpl;
030import org.apache.hadoop.hbase.replication.ReplicationPeers;
031import org.apache.hadoop.hbase.testclassification.ReplicationTests;
032import org.apache.hadoop.hbase.testclassification.SmallTests;
033import org.junit.jupiter.api.BeforeEach;
034import org.junit.jupiter.api.Tag;
035import org.junit.jupiter.api.Test;
036
037@Tag(ReplicationTests.TAG)
038@Tag(SmallTests.TAG)
039public class TestPeerProcedureHandlerImpl {
040
041  private ReplicationSourceManager mockSourceManager;
042  private ReplicationPeers mockReplicationPeers;
043  private ReplicationPeerImpl mockPeer;
044  private PeerProcedureHandlerImpl handler;
045  private static final String PEER_ID = "testPeer";
046
047  @BeforeEach
048  public void setup() throws Exception {
049    mockSourceManager = mock(ReplicationSourceManager.class);
050    mockReplicationPeers = mock(ReplicationPeers.class);
051    mockPeer = mock(ReplicationPeerImpl.class);
052
053    when(mockSourceManager.getReplicationPeers()).thenReturn(mockReplicationPeers);
054    when(mockReplicationPeers.getPeer(PEER_ID)).thenReturn(mockPeer);
055
056    handler = new PeerProcedureHandlerImpl(mockSourceManager, null);
057  }
058
059  @Test
060  public void testReplicationSourceConfigChangeTriggers() throws Exception {
061    ReplicationPeerConfig oldConfig = ReplicationPeerConfig.newBuilder().setClusterKey("oldCluster")
062      .putConfiguration("replication.source.sleepforretries", "1000").build();
063
064    ReplicationPeerConfig newConfig = ReplicationPeerConfig.newBuilder().setClusterKey("oldCluster")
065      .putConfiguration("replication.source.sleepforretries", "5000").build();
066
067    when(mockPeer.getPeerConfig()).thenReturn(oldConfig);
068    when(mockPeer.getPeerState()).thenReturn(PeerState.ENABLED);
069    when(mockReplicationPeers.refreshPeerConfig(PEER_ID)).thenReturn(newConfig);
070    when(mockReplicationPeers.refreshPeerState(PEER_ID)).thenReturn(PeerState.ENABLED);
071
072    handler.updatePeerConfig(PEER_ID);
073
074    verify(mockSourceManager, times(1)).refreshSources(PEER_ID);
075  }
076
077  @Test
078  public void testNonReplicationSourceConfigDoesNotTrigger() throws Exception {
079    ReplicationPeerConfig oldConfig = ReplicationPeerConfig.newBuilder().setClusterKey("oldCluster")
080      .putConfiguration("some.other.config", "value1").build();
081
082    ReplicationPeerConfig newConfig = ReplicationPeerConfig.newBuilder().setClusterKey("oldCluster")
083      .putConfiguration("some.other.config", "value2").build();
084
085    when(mockPeer.getPeerConfig()).thenReturn(oldConfig);
086    when(mockPeer.getPeerState()).thenReturn(PeerState.ENABLED);
087    when(mockReplicationPeers.refreshPeerConfig(PEER_ID)).thenReturn(newConfig);
088    when(mockReplicationPeers.refreshPeerState(PEER_ID)).thenReturn(PeerState.ENABLED);
089
090    handler.updatePeerConfig(PEER_ID);
091
092    verify(mockSourceManager, never()).refreshSources(anyString());
093  }
094
095  @Test
096  public void testNewReplicationSourceConfigTriggers() throws Exception {
097    ReplicationPeerConfig oldConfig =
098      ReplicationPeerConfig.newBuilder().setClusterKey("oldCluster").build();
099
100    ReplicationPeerConfig newConfig = ReplicationPeerConfig.newBuilder().setClusterKey("oldCluster")
101      .putConfiguration("replication.source.sleepforretries", "5000").build();
102
103    when(mockPeer.getPeerConfig()).thenReturn(oldConfig);
104    when(mockPeer.getPeerState()).thenReturn(PeerState.ENABLED);
105    when(mockReplicationPeers.refreshPeerConfig(PEER_ID)).thenReturn(newConfig);
106    when(mockReplicationPeers.refreshPeerState(PEER_ID)).thenReturn(PeerState.ENABLED);
107
108    handler.updatePeerConfig(PEER_ID);
109
110    verify(mockSourceManager, times(1)).refreshSources(PEER_ID);
111  }
112
113  @Test
114  public void testRemovedReplicationSourceConfigTriggers() throws Exception {
115    ReplicationPeerConfig oldConfig = ReplicationPeerConfig.newBuilder().setClusterKey("oldCluster")
116      .putConfiguration("replication.source.sleepforretries", "2000").build();
117
118    ReplicationPeerConfig newConfig =
119      ReplicationPeerConfig.newBuilder().setClusterKey("oldCluster").build();
120
121    when(mockPeer.getPeerConfig()).thenReturn(oldConfig);
122    when(mockPeer.getPeerState()).thenReturn(PeerState.ENABLED);
123    when(mockReplicationPeers.refreshPeerConfig(PEER_ID)).thenReturn(newConfig);
124    when(mockReplicationPeers.refreshPeerState(PEER_ID)).thenReturn(PeerState.ENABLED);
125
126    handler.updatePeerConfig(PEER_ID);
127
128    verify(mockSourceManager, times(1)).refreshSources(PEER_ID);
129  }
130}