001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023import static org.junit.Assert.fail; 024 025import org.apache.hadoop.hbase.ServerName; 026import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState; 027import org.apache.hadoop.hbase.zookeeper.ZKConfig; 028import org.junit.Test; 029import org.slf4j.Logger; 030import org.slf4j.LoggerFactory; 031 032/** 033 * White box testing for replication state interfaces. Implementations should extend this class, and 034 * initialize the interfaces properly. 035 */ 036public abstract class TestReplicationStateBasic { 037 038 private static final Logger LOG = LoggerFactory.getLogger(TestReplicationStateBasic.class); 039 040 protected ServerName server1 = ServerName.valueOf("hostname1.example.org", 1234, 12345); 041 protected ServerName server2 = ServerName.valueOf("hostname2.example.org", 1234, 12345); 042 protected ServerName server3 = ServerName.valueOf("hostname3.example.org", 1234, 12345); 043 protected ReplicationPeers rp; 044 protected static final String ID_ONE = "1"; 045 protected static final String ID_TWO = "2"; 046 protected static String KEY_ONE; 047 protected static String KEY_TWO; 048 049 // For testing when we try to replicate to ourself 050 protected String OUR_KEY; 051 052 protected static int zkTimeoutCount; 053 protected static final int ZK_MAX_COUNT = 300; 054 protected static final int ZK_SLEEP_INTERVAL = 100; // millis 055 056 @Test 057 public void testReplicationPeers() throws Exception { 058 rp.init(); 059 060 try { 061 rp.getPeerStorage().setPeerState("bogus", true); 062 fail("Should have thrown an IllegalArgumentException when passed a bogus peerId"); 063 } catch (ReplicationException e) { 064 } 065 try { 066 rp.getPeerStorage().setPeerState("bogus", false); 067 fail("Should have thrown an IllegalArgumentException when passed a bogus peerId"); 068 } catch (ReplicationException e) { 069 } 070 071 try { 072 assertFalse(rp.addPeer("bogus")); 073 fail("Should have thrown an ReplicationException when passed a bogus peerId"); 074 } catch (ReplicationException e) { 075 } 076 077 assertNumberOfPeers(0); 078 079 // Add some peers 080 rp.getPeerStorage().addPeer(ID_ONE, 081 ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build(), true, 082 SyncReplicationState.NONE); 083 assertNumberOfPeers(1); 084 rp.getPeerStorage().addPeer(ID_TWO, 085 ReplicationPeerConfig.newBuilder().setClusterKey(KEY_TWO).build(), true, 086 SyncReplicationState.NONE); 087 assertNumberOfPeers(2); 088 089 assertEquals(KEY_ONE, ZKConfig.getZooKeeperClusterKey(ReplicationUtils 090 .getPeerClusterConfiguration(rp.getPeerStorage().getPeerConfig(ID_ONE), rp.getConf()))); 091 rp.getPeerStorage().removePeer(ID_ONE); 092 rp.removePeer(ID_ONE); 093 assertNumberOfPeers(1); 094 095 // Add one peer 096 rp.getPeerStorage().addPeer(ID_ONE, 097 ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build(), true, 098 SyncReplicationState.NONE); 099 rp.addPeer(ID_ONE); 100 assertNumberOfPeers(2); 101 assertTrue(rp.getPeer(ID_ONE).isPeerEnabled()); 102 rp.getPeerStorage().setPeerState(ID_ONE, false); 103 // now we do not rely on zk watcher to trigger the state change so we need to trigger it 104 // manually... 105 ReplicationPeerImpl peer = rp.getPeer(ID_ONE); 106 rp.refreshPeerState(peer.getId()); 107 assertEquals(PeerState.DISABLED, peer.getPeerState()); 108 assertConnectedPeerStatus(false, ID_ONE); 109 rp.getPeerStorage().setPeerState(ID_ONE, true); 110 // now we do not rely on zk watcher to trigger the state change so we need to trigger it 111 // manually... 112 rp.refreshPeerState(peer.getId()); 113 assertEquals(PeerState.ENABLED, peer.getPeerState()); 114 assertConnectedPeerStatus(true, ID_ONE); 115 116 // Disconnect peer 117 rp.removePeer(ID_ONE); 118 assertNumberOfPeers(2); 119 } 120 121 private void assertConnectedPeerStatus(boolean status, String peerId) throws Exception { 122 // we can first check if the value was changed in the store, if it wasn't then fail right away 123 if (status != rp.getPeerStorage().isPeerEnabled(peerId)) { 124 fail("ConnectedPeerStatus was " + !status + " but expected " + status + " in ZK"); 125 } 126 while (true) { 127 if (status == rp.getPeer(peerId).isPeerEnabled()) { 128 return; 129 } 130 if (zkTimeoutCount < ZK_MAX_COUNT) { 131 LOG.debug("ConnectedPeerStatus was " + !status + " but expected " + status 132 + ", sleeping and trying again."); 133 Thread.sleep(ZK_SLEEP_INTERVAL); 134 } else { 135 fail("Timed out waiting for ConnectedPeerStatus to be " + status); 136 } 137 } 138 } 139 140 private void assertNumberOfPeers(int total) throws ReplicationException { 141 assertEquals(total, rp.getPeerStorage().listPeerIds().size()); 142 } 143}