001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023import static org.mockito.ArgumentMatchers.any;
024import static org.mockito.ArgumentMatchers.anyBoolean;
025import static org.mockito.ArgumentMatchers.anyString;
026import static org.mockito.Mockito.doAnswer;
027import static org.mockito.Mockito.spy;
028
029import java.io.IOException;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.hbase.HBaseClassTestRule;
032import org.apache.hadoop.hbase.HBaseTestingUtility;
033import org.apache.hadoop.hbase.HConstants;
034import org.apache.hadoop.hbase.client.Admin;
035import org.apache.hadoop.hbase.master.HMaster;
036import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
037import org.apache.hadoop.hbase.testclassification.MediumTests;
038import org.apache.hadoop.hbase.testclassification.ReplicationTests;
039import org.junit.After;
040import org.junit.AfterClass;
041import org.junit.BeforeClass;
042import org.junit.ClassRule;
043import org.junit.Test;
044import org.junit.experimental.categories.Category;
045import org.mockito.invocation.InvocationOnMock;
046
047/**
048 * All the modification method will fail once in the test and should finally succeed.
049 */
050@Category({ ReplicationTests.class, MediumTests.class })
051public class TestReplicationProcedureRetry {
052
053  @ClassRule
054  public static final HBaseClassTestRule CLASS_RULE =
055    HBaseClassTestRule.forClass(TestReplicationProcedureRetry.class);
056
057  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
058
059  @BeforeClass
060  public static void setUp() throws Exception {
061    UTIL.getConfiguration().setClass(HConstants.MASTER_IMPL, MockHMaster.class, HMaster.class);
062    UTIL.startMiniCluster(3);
063  }
064
065  @AfterClass
066  public static void tearDown() throws Exception {
067    UTIL.shutdownMiniCluster();
068  }
069
070  @After
071  public void tearDownAfterTest() throws IOException {
072    for (ReplicationPeerDescription desc : UTIL.getAdmin().listReplicationPeers()) {
073      UTIL.getAdmin().removeReplicationPeer(desc.getPeerId());
074    }
075  }
076
077  private void doTest() throws IOException {
078    Admin admin = UTIL.getAdmin();
079    String peerId = "1";
080    ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder()
081      .setClusterKey(UTIL.getZkCluster().getAddress().toString() + ":/hbase2").build();
082    admin.addReplicationPeer(peerId, peerConfig, true);
083
084    assertEquals(peerConfig.getClusterKey(),
085      admin.getReplicationPeerConfig(peerId).getClusterKey());
086    ReplicationPeerConfig newPeerConfig =
087      ReplicationPeerConfig.newBuilder(peerConfig).setBandwidth(123456).build();
088    admin.updateReplicationPeerConfig(peerId, newPeerConfig);
089    assertEquals(newPeerConfig.getBandwidth(),
090      admin.getReplicationPeerConfig(peerId).getBandwidth());
091
092    admin.disableReplicationPeer(peerId);
093    assertFalse(admin.listReplicationPeers().get(0).isEnabled());
094
095    admin.enableReplicationPeer(peerId);
096    assertTrue(admin.listReplicationPeers().get(0).isEnabled());
097
098    admin.removeReplicationPeer(peerId);
099    assertTrue(admin.listReplicationPeers().isEmpty());
100
101    // make sure that we have run into the mocked method
102    MockHMaster master = (MockHMaster) UTIL.getHBaseCluster().getMaster();
103    assertTrue(master.addPeerCalled);
104    assertTrue(master.removePeerCalled);
105    assertTrue(master.updatePeerConfigCalled);
106    assertTrue(master.enablePeerCalled);
107    assertTrue(master.disablePeerCalled);
108  }
109
110  @Test
111  public void testErrorBeforeUpdate() throws IOException, ReplicationException {
112    ((MockHMaster) UTIL.getHBaseCluster().getMaster()).reset(true);
113    doTest();
114  }
115
116  @Test
117  public void testErrorAfterUpdate() throws IOException, ReplicationException {
118    ((MockHMaster) UTIL.getHBaseCluster().getMaster()).reset(false);
119    doTest();
120  }
121
122  public static final class MockHMaster extends HMaster {
123
124    volatile boolean addPeerCalled;
125
126    volatile boolean removePeerCalled;
127
128    volatile boolean updatePeerConfigCalled;
129
130    volatile boolean enablePeerCalled;
131
132    volatile boolean disablePeerCalled;
133
134    private ReplicationPeerManager manager;
135
136    public MockHMaster(Configuration conf) throws IOException {
137      super(conf);
138    }
139
140    private Object invokeWithError(InvocationOnMock invocation, boolean errorBeforeUpdate)
141      throws Throwable {
142      if (errorBeforeUpdate) {
143        throw new ReplicationException("mock error before update");
144      }
145      invocation.callRealMethod();
146      throw new ReplicationException("mock error after update");
147    }
148
149    public void reset(boolean errorBeforeUpdate) throws ReplicationException {
150      addPeerCalled = false;
151      removePeerCalled = false;
152      updatePeerConfigCalled = false;
153      enablePeerCalled = false;
154      disablePeerCalled = false;
155      ReplicationPeerManager m = super.getReplicationPeerManager();
156      manager = spy(m);
157      doAnswer(invocation -> {
158        if (!addPeerCalled) {
159          addPeerCalled = true;
160          return invokeWithError(invocation, errorBeforeUpdate);
161        } else {
162          return invocation.callRealMethod();
163        }
164      }).when(manager).addPeer(anyString(), any(ReplicationPeerConfig.class), anyBoolean());
165      doAnswer(invocation -> {
166        if (!removePeerCalled) {
167          removePeerCalled = true;
168          return invokeWithError(invocation, errorBeforeUpdate);
169        } else {
170          return invocation.callRealMethod();
171        }
172      }).when(manager).removePeer(anyString());
173      doAnswer(invocation -> {
174        if (!updatePeerConfigCalled) {
175          updatePeerConfigCalled = true;
176          return invokeWithError(invocation, errorBeforeUpdate);
177        } else {
178          return invocation.callRealMethod();
179        }
180      }).when(manager).updatePeerConfig(anyString(), any(ReplicationPeerConfig.class));
181      doAnswer(invocation -> {
182        if (!enablePeerCalled) {
183          enablePeerCalled = true;
184          return invokeWithError(invocation, errorBeforeUpdate);
185        } else {
186          return invocation.callRealMethod();
187        }
188      }).when(manager).enablePeer(anyString());
189      doAnswer(invocation -> {
190        if (!disablePeerCalled) {
191          disablePeerCalled = true;
192          return invokeWithError(invocation, errorBeforeUpdate);
193        } else {
194          return invocation.callRealMethod();
195        }
196      }).when(manager).disablePeer(anyString());
197    }
198
199    @Override
200    public ReplicationPeerManager getReplicationPeerManager() {
201      return manager;
202    }
203  }
204}