001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.junit.Assert.assertFalse;
021import static org.junit.Assert.assertTrue;
022import static org.junit.Assert.fail;
023
024import java.io.IOException;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.fs.FileSystem;
027import org.apache.hadoop.hbase.ChoreService;
028import org.apache.hadoop.hbase.ClusterId;
029import org.apache.hadoop.hbase.CoordinatedStateManager;
030import org.apache.hadoop.hbase.HBaseClassTestRule;
031import org.apache.hadoop.hbase.HBaseTestingUtility;
032import org.apache.hadoop.hbase.HConstants;
033import org.apache.hadoop.hbase.Server;
034import org.apache.hadoop.hbase.ServerName;
035import org.apache.hadoop.hbase.client.ClusterConnection;
036import org.apache.hadoop.hbase.client.Connection;
037import org.apache.hadoop.hbase.testclassification.MediumTests;
038import org.apache.hadoop.hbase.testclassification.ReplicationTests;
039import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
040import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
041import org.apache.hadoop.hbase.zookeeper.ZKConfig;
042import org.apache.hadoop.hbase.zookeeper.ZKUtil;
043import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
044import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
045import org.apache.zookeeper.KeeperException;
046import org.junit.After;
047import org.junit.AfterClass;
048import org.junit.Before;
049import org.junit.BeforeClass;
050import org.junit.ClassRule;
051import org.junit.Test;
052import org.junit.experimental.categories.Category;
053import org.slf4j.Logger;
054import org.slf4j.LoggerFactory;
055
056@Category({ReplicationTests.class, MediumTests.class})
057public class TestReplicationStateZKImpl extends TestReplicationStateBasic {
058
059  @ClassRule
060  public static final HBaseClassTestRule CLASS_RULE =
061      HBaseClassTestRule.forClass(TestReplicationStateZKImpl.class);
062
063  private static final Logger LOG = LoggerFactory.getLogger(TestReplicationStateZKImpl.class);
064
065  private static Configuration conf;
066  private static HBaseTestingUtility utility;
067  private static ZKWatcher zkw;
068  private static String replicationZNode;
069  private ReplicationQueuesZKImpl rqZK;
070
071  @BeforeClass
072  public static void setUpBeforeClass() throws Exception {
073    utility = new HBaseTestingUtility();
074    utility.startMiniZKCluster();
075    conf = utility.getConfiguration();
076    conf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
077    zkw = HBaseTestingUtility.getZooKeeperWatcher(utility);
078    String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication");
079    replicationZNode = ZNodePaths.joinZNode(zkw.znodePaths.baseZNode, replicationZNodeName);
080    KEY_ONE = initPeerClusterState("/hbase1");
081    KEY_TWO = initPeerClusterState("/hbase2");
082  }
083
084  private static String initPeerClusterState(String baseZKNode)
085      throws IOException, KeeperException {
086    // Add a dummy region server and set up the cluster id
087    Configuration testConf = new Configuration(conf);
088    testConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, baseZKNode);
089    ZKWatcher zkw1 = new ZKWatcher(testConf, "test1", null);
090    String fakeRs = ZNodePaths.joinZNode(zkw1.znodePaths.rsZNode, "hostname1.example.org:1234");
091    ZKUtil.createWithParents(zkw1, fakeRs);
092    ZKClusterId.setClusterId(zkw1, new ClusterId());
093    return ZKConfig.getZooKeeperClusterKey(testConf);
094  }
095
096  @Before
097  @Override
098  public void setUp() {
099    super.setUp();
100    DummyServer ds1 = new DummyServer(server1);
101    DummyServer ds2 = new DummyServer(server2);
102    DummyServer ds3 = new DummyServer(server3);
103    try {
104      rq1 = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, ds1, zkw));
105      rq2 = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, ds2, zkw));
106      rq3 = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, ds3, zkw));
107      rqc = ReplicationFactory.getReplicationQueuesClient(
108        new ReplicationQueuesClientArguments(conf, ds1, zkw));
109    } catch (Exception e) {
110      // This should not occur, because getReplicationQueues() only throws for
111      // TableBasedReplicationQueuesImpl
112      fail("ReplicationFactory.getReplicationQueues() threw an IO Exception");
113    }
114    rp = ReplicationFactory.getReplicationPeers(zkw, conf, zkw);
115    OUR_KEY = ZKConfig.getZooKeeperClusterKey(conf);
116    rqZK = new ReplicationQueuesZKImpl(zkw, conf, ds1);
117  }
118
119  @After
120  public void tearDown() throws KeeperException, IOException {
121    ZKUtil.deleteNodeRecursively(zkw, replicationZNode);
122  }
123
124  @AfterClass
125  public static void tearDownAfterClass() throws Exception {
126    utility.shutdownMiniZKCluster();
127  }
128
129  @Test
130  public void testIsPeerPath_PathToParentOfPeerNode() {
131    assertFalse(rqZK.isPeerPath(rqZK.peersZNode));
132  }
133
134  @Test
135  public void testIsPeerPath_PathToChildOfPeerNode() {
136    String peerChild = ZNodePaths.joinZNode(ZNodePaths.joinZNode(rqZK.peersZNode, "1"), "child");
137    assertFalse(rqZK.isPeerPath(peerChild));
138  }
139
140  @Test
141  public void testIsPeerPath_ActualPeerPath() {
142    String peerPath = ZNodePaths.joinZNode(rqZK.peersZNode, "1");
143    assertTrue(rqZK.isPeerPath(peerPath));
144  }
145
146  static class DummyServer implements Server {
147    private String serverName;
148    private boolean isAborted = false;
149    private boolean isStopped = false;
150
151    public DummyServer(String serverName) {
152      this.serverName = serverName;
153    }
154
155    @Override
156    public Configuration getConfiguration() {
157      return conf;
158    }
159
160    @Override
161    public ZKWatcher getZooKeeper() {
162      return zkw;
163    }
164
165    @Override
166    public CoordinatedStateManager getCoordinatedStateManager() {
167      return null;
168    }
169
170    @Override
171    public ClusterConnection getConnection() {
172      return null;
173    }
174
175    @Override
176    public MetaTableLocator getMetaTableLocator() {
177      return null;
178    }
179
180    @Override
181    public ServerName getServerName() {
182      return ServerName.valueOf(this.serverName);
183    }
184
185    @Override
186    public void abort(String why, Throwable e) {
187      LOG.info("Aborting " + serverName);
188      this.isAborted = true;
189    }
190
191    @Override
192    public boolean isAborted() {
193      return this.isAborted;
194    }
195
196    @Override
197    public void stop(String why) {
198      this.isStopped = true;
199    }
200
201    @Override
202    public boolean isStopped() {
203      return this.isStopped;
204    }
205
206    @Override
207    public ChoreService getChoreService() {
208      return null;
209    }
210
211    @Override
212    public ClusterConnection getClusterConnection() {
213      // TODO Auto-generated method stub
214      return null;
215    }
216
217    @Override
218    public FileSystem getFileSystem() {
219      return null;
220    }
221
222    @Override
223    public boolean isStopping() {
224      return false;
225    }
226
227    @Override
228    public Connection createConnection(Configuration conf) throws IOException {
229      return null;
230    }
231  }
232}