001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023import static org.junit.Assert.fail;
024
025import org.apache.hadoop.hbase.ServerName;
026import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
027import org.apache.hadoop.hbase.zookeeper.ZKConfig;
028import org.junit.Test;
029import org.slf4j.Logger;
030import org.slf4j.LoggerFactory;
031
032/**
033 * White box testing for replication state interfaces. Implementations should extend this class, and
034 * initialize the interfaces properly.
035 */
036public abstract class TestReplicationStateBasic {
037
038  private static final Logger LOG = LoggerFactory.getLogger(TestReplicationStateBasic.class);
039
040  protected ServerName server1 = ServerName.valueOf("hostname1.example.org", 1234, 12345);
041  protected ServerName server2 = ServerName.valueOf("hostname2.example.org", 1234, 12345);
042  protected ServerName server3 = ServerName.valueOf("hostname3.example.org", 1234, 12345);
043  protected ReplicationPeers rp;
044  protected static final String ID_ONE = "1";
045  protected static final String ID_TWO = "2";
046  protected static String KEY_ONE;
047  protected static String KEY_TWO;
048
049  // For testing when we try to replicate to ourself
050  protected String OUR_KEY;
051
052  protected static int zkTimeoutCount;
053  protected static final int ZK_MAX_COUNT = 300;
054  protected static final int ZK_SLEEP_INTERVAL = 100; // millis
055
056  @Test
057  public void testReplicationPeers() throws Exception {
058    rp.init();
059
060    try {
061      rp.getPeerStorage().setPeerState("bogus", true);
062      fail("Should have thrown an IllegalArgumentException when passed a bogus peerId");
063    } catch (ReplicationException e) {
064    }
065    try {
066      rp.getPeerStorage().setPeerState("bogus", false);
067      fail("Should have thrown an IllegalArgumentException when passed a bogus peerId");
068    } catch (ReplicationException e) {
069    }
070
071    try {
072      assertFalse(rp.addPeer("bogus"));
073      fail("Should have thrown an ReplicationException when passed a bogus peerId");
074    } catch (ReplicationException e) {
075    }
076
077    assertNumberOfPeers(0);
078
079    // Add some peers
080    rp.getPeerStorage().addPeer(ID_ONE,
081      ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build(), true,
082      SyncReplicationState.NONE);
083    assertNumberOfPeers(1);
084    rp.getPeerStorage().addPeer(ID_TWO,
085      ReplicationPeerConfig.newBuilder().setClusterKey(KEY_TWO).build(), true,
086      SyncReplicationState.NONE);
087    assertNumberOfPeers(2);
088
089    assertEquals(KEY_ONE, ZKConfig.getZooKeeperClusterKey(ReplicationUtils
090      .getPeerClusterConfiguration(rp.getPeerStorage().getPeerConfig(ID_ONE), rp.getConf())));
091    rp.getPeerStorage().removePeer(ID_ONE);
092    rp.removePeer(ID_ONE);
093    assertNumberOfPeers(1);
094
095    // Add one peer
096    rp.getPeerStorage().addPeer(ID_ONE,
097      ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build(), true,
098      SyncReplicationState.NONE);
099    rp.addPeer(ID_ONE);
100    assertNumberOfPeers(2);
101    assertTrue(rp.getPeer(ID_ONE).isPeerEnabled());
102    rp.getPeerStorage().setPeerState(ID_ONE, false);
103    // now we do not rely on zk watcher to trigger the state change so we need to trigger it
104    // manually...
105    ReplicationPeerImpl peer = rp.getPeer(ID_ONE);
106    rp.refreshPeerState(peer.getId());
107    assertEquals(PeerState.DISABLED, peer.getPeerState());
108    assertConnectedPeerStatus(false, ID_ONE);
109    rp.getPeerStorage().setPeerState(ID_ONE, true);
110    // now we do not rely on zk watcher to trigger the state change so we need to trigger it
111    // manually...
112    rp.refreshPeerState(peer.getId());
113    assertEquals(PeerState.ENABLED, peer.getPeerState());
114    assertConnectedPeerStatus(true, ID_ONE);
115
116    // Disconnect peer
117    rp.removePeer(ID_ONE);
118    assertNumberOfPeers(2);
119  }
120
121  private void assertConnectedPeerStatus(boolean status, String peerId) throws Exception {
122    // we can first check if the value was changed in the store, if it wasn't then fail right away
123    if (status != rp.getPeerStorage().isPeerEnabled(peerId)) {
124      fail("ConnectedPeerStatus was " + !status + " but expected " + status + " in ZK");
125    }
126    while (true) {
127      if (status == rp.getPeer(peerId).isPeerEnabled()) {
128        return;
129      }
130      if (zkTimeoutCount < ZK_MAX_COUNT) {
131        LOG.debug("ConnectedPeerStatus was " + !status + " but expected " + status
132          + ", sleeping and trying again.");
133        Thread.sleep(ZK_SLEEP_INTERVAL);
134      } else {
135        fail("Timed out waiting for ConnectedPeerStatus to be " + status);
136      }
137    }
138  }
139
140  private void assertNumberOfPeers(int total) throws ReplicationException {
141    assertEquals(total, rp.getPeerStorage().listPeerIds().size());
142  }
143}