001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023import static org.junit.Assert.fail;
024
025import org.apache.hadoop.hbase.ServerName;
026import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
027import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
028import org.apache.hadoop.hbase.zookeeper.ZKConfig;
029import org.junit.Test;
030import org.slf4j.Logger;
031import org.slf4j.LoggerFactory;
032
033/**
034 * White box testing for replication state interfaces. Implementations should extend this class, and
035 * initialize the interfaces properly.
036 */
037public abstract class TestReplicationStateBasic {
038
039  private static final Logger LOG = LoggerFactory.getLogger(TestReplicationStateBasic.class);
040
041  protected ServerName server1 = ServerName.valueOf("hostname1.example.org", 1234, 12345);
042  protected ServerName server2 = ServerName.valueOf("hostname2.example.org", 1234, 12345);
043  protected ServerName server3 = ServerName.valueOf("hostname3.example.org", 1234, 12345);
044  protected ReplicationPeers rp;
045  protected static final String ID_ONE = "1";
046  protected static final String ID_TWO = "2";
047  protected static String KEY_ONE;
048  protected static String KEY_TWO;
049
050  // For testing when we try to replicate to ourself
051  protected String OUR_KEY;
052
053  protected static int zkTimeoutCount;
054  protected static final int ZK_MAX_COUNT = 300;
055  protected static final int ZK_SLEEP_INTERVAL = 100; // millis
056
057  @Test
058  public void testReplicationPeers() throws Exception {
059    rp.init();
060
061    try {
062      rp.getPeerStorage().setPeerState("bogus", true);
063      fail("Should have thrown an IllegalArgumentException when passed a bogus peerId");
064    } catch (ReplicationException e) {
065    }
066    try {
067      rp.getPeerStorage().setPeerState("bogus", false);
068      fail("Should have thrown an IllegalArgumentException when passed a bogus peerId");
069    } catch (ReplicationException e) {
070    }
071
072    try {
073      assertFalse(rp.addPeer("bogus"));
074      fail("Should have thrown an ReplicationException when passed a bogus peerId");
075    } catch (ReplicationException e) {
076    }
077
078    assertNumberOfPeers(0);
079
080    // Add some peers
081    rp.getPeerStorage().addPeer(ID_ONE,
082      ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build(), true,
083      SyncReplicationState.NONE);
084    assertNumberOfPeers(1);
085    rp.getPeerStorage().addPeer(ID_TWO,
086      ReplicationPeerConfig.newBuilder().setClusterKey(KEY_TWO).build(), true,
087      SyncReplicationState.NONE);
088    assertNumberOfPeers(2);
089
090    assertEquals(KEY_ONE, ZKConfig.getZooKeeperClusterKey(ReplicationPeerConfigUtil
091      .getPeerClusterConfiguration(rp.getConf(), rp.getPeerStorage().getPeerConfig(ID_ONE))));
092    rp.getPeerStorage().removePeer(ID_ONE);
093    rp.removePeer(ID_ONE);
094    assertNumberOfPeers(1);
095
096    // Add one peer
097    rp.getPeerStorage().addPeer(ID_ONE,
098      ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build(), true,
099      SyncReplicationState.NONE);
100    rp.addPeer(ID_ONE);
101    assertNumberOfPeers(2);
102    assertTrue(rp.getPeer(ID_ONE).isPeerEnabled());
103    rp.getPeerStorage().setPeerState(ID_ONE, false);
104    // now we do not rely on zk watcher to trigger the state change so we need to trigger it
105    // manually...
106    ReplicationPeerImpl peer = rp.getPeer(ID_ONE);
107    rp.refreshPeerState(peer.getId());
108    assertEquals(PeerState.DISABLED, peer.getPeerState());
109    assertConnectedPeerStatus(false, ID_ONE);
110    rp.getPeerStorage().setPeerState(ID_ONE, true);
111    // now we do not rely on zk watcher to trigger the state change so we need to trigger it
112    // manually...
113    rp.refreshPeerState(peer.getId());
114    assertEquals(PeerState.ENABLED, peer.getPeerState());
115    assertConnectedPeerStatus(true, ID_ONE);
116
117    // Disconnect peer
118    rp.removePeer(ID_ONE);
119    assertNumberOfPeers(2);
120  }
121
122  private void assertConnectedPeerStatus(boolean status, String peerId) throws Exception {
123    // we can first check if the value was changed in the store, if it wasn't then fail right away
124    if (status != rp.getPeerStorage().isPeerEnabled(peerId)) {
125      fail("ConnectedPeerStatus was " + !status + " but expected " + status + " in ZK");
126    }
127    while (true) {
128      if (status == rp.getPeer(peerId).isPeerEnabled()) {
129        return;
130      }
131      if (zkTimeoutCount < ZK_MAX_COUNT) {
132        LOG.debug("ConnectedPeerStatus was " + !status + " but expected " + status
133          + ", sleeping and trying again.");
134        Thread.sleep(ZK_SLEEP_INTERVAL);
135      } else {
136        fail("Timed out waiting for ConnectedPeerStatus to be " + status);
137      }
138    }
139  }
140
141  private void assertNumberOfPeers(int total) throws ReplicationException {
142    assertEquals(total, rp.getPeerStorage().listPeerIds().size());
143  }
144}