001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertTrue;
023
024import java.util.List;
025import org.apache.hadoop.hbase.HBaseClassTestRule;
026import org.apache.hadoop.hbase.HBaseConfiguration;
027import org.apache.hadoop.hbase.HBaseTestingUtility;
028import org.apache.hadoop.hbase.Server;
029import org.apache.hadoop.hbase.ServerName;
030import org.apache.hadoop.hbase.replication.ReplicationFactory;
031import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
032import org.apache.hadoop.hbase.replication.ReplicationQueues;
033import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
034import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments;
035import org.apache.hadoop.hbase.replication.ReplicationQueuesClientZKImpl;
036import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
037import org.apache.hadoop.hbase.testclassification.MediumTests;
038import org.apache.hadoop.hbase.testclassification.ReplicationTests;
039import org.junit.BeforeClass;
040import org.junit.ClassRule;
041import org.junit.Test;
042import org.junit.experimental.categories.Category;
043
044/**
045 * Tests the ReplicationSourceManager with ReplicationQueueZkImpl's and
046 * ReplicationQueuesClientZkImpl. Also includes extra tests outside of those in
047 * TestReplicationSourceManager that test ReplicationQueueZkImpl-specific behaviors.
048 */
049@Category({ReplicationTests.class, MediumTests.class})
050public class TestReplicationSourceManagerZkImpl extends TestReplicationSourceManager {
051
052  @ClassRule
053  public static final HBaseClassTestRule CLASS_RULE =
054      HBaseClassTestRule.forClass(TestReplicationSourceManagerZkImpl.class);
055
056  @BeforeClass
057  public static void setUpBeforeClass() throws Exception {
058    conf = HBaseConfiguration.create();
059    conf.set("replication.replicationsource.implementation",
060      ReplicationSourceDummy.class.getCanonicalName());
061    conf.setLong("replication.sleep.before.failover", 2000);
062    conf.setInt("replication.source.maxretriesmultiplier", 10);
063    utility = new HBaseTestingUtility(conf);
064    utility.startMiniZKCluster();
065    setupZkAndReplication();
066  }
067
068  // Tests the naming convention of adopted queues for ReplicationQueuesZkImpl
069  @Test
070  public void testNodeFailoverDeadServerParsing() throws Exception {
071    final Server server = new DummyServer("ec2-54-234-230-108.compute-1.amazonaws.com");
072    ReplicationQueues repQueues =
073      ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, server,
074        server.getZooKeeper()));
075    repQueues.init(server.getServerName().toString());
076    // populate some znodes in the peer znode
077    files.add("log1");
078    files.add("log2");
079    for (String file : files) {
080      repQueues.addLog("1", file);
081    }
082
083    // create 3 DummyServers
084    Server s1 = new DummyServer("ip-10-8-101-114.ec2.internal");
085    Server s2 = new DummyServer("ec2-107-20-52-47.compute-1.amazonaws.com");
086    Server s3 = new DummyServer("ec2-23-20-187-167.compute-1.amazonaws.com");
087
088    // simulate three servers fail sequentially
089    ReplicationQueues rq1 =
090      ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(s1.getConfiguration(), s1,
091        s1.getZooKeeper()));
092    rq1.init(s1.getServerName().toString());
093    String serverName = server.getServerName().getServerName();
094    List<String> unclaimed = rq1.getUnClaimedQueueIds(serverName);
095    rq1.claimQueue(serverName, unclaimed.get(0)).getSecond();
096    rq1.removeReplicatorIfQueueIsEmpty(unclaimed.get(0));
097    ReplicationQueues rq2 =
098      ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(s2.getConfiguration(), s2,
099        s2.getZooKeeper()));
100    rq2.init(s2.getServerName().toString());
101    serverName = s1.getServerName().getServerName();
102    unclaimed = rq2.getUnClaimedQueueIds(serverName);
103    rq2.claimQueue(serverName, unclaimed.get(0)).getSecond();
104    rq2.removeReplicatorIfQueueIsEmpty(unclaimed.get(0));
105    ReplicationQueues rq3 =
106      ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(s3.getConfiguration(), s3,
107        s3.getZooKeeper()));
108    rq3.init(s3.getServerName().toString());
109    serverName = s2.getServerName().getServerName();
110    unclaimed = rq3.getUnClaimedQueueIds(serverName);
111    String queue3 = rq3.claimQueue(serverName, unclaimed.get(0)).getFirst();
112    rq3.removeReplicatorIfQueueIsEmpty(unclaimed.get(0));
113    ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(queue3);
114    List<ServerName> result = replicationQueueInfo.getDeadRegionServers();
115    // verify
116    assertTrue(result.contains(server.getServerName()));
117    assertTrue(result.contains(s1.getServerName()));
118    assertTrue(result.contains(s2.getServerName()));
119
120    server.stop("");
121  }
122
123  @Test
124  public void testFailoverDeadServerCversionChange() throws Exception {
125    final Server s0 = new DummyServer("cversion-change0.example.org");
126    ReplicationQueues repQueues =
127      ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, s0,
128        s0.getZooKeeper()));
129    repQueues.init(s0.getServerName().toString());
130    // populate some znodes in the peer znode
131    files.add("log1");
132    files.add("log2");
133    for (String file : files) {
134      repQueues.addLog("1", file);
135    }
136    // simulate queue transfer
137    Server s1 = new DummyServer("cversion-change1.example.org");
138    ReplicationQueues rq1 =
139      ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(s1.getConfiguration(), s1,
140        s1.getZooKeeper()));
141    rq1.init(s1.getServerName().toString());
142
143    ReplicationQueuesClientZKImpl client =
144      (ReplicationQueuesClientZKImpl)ReplicationFactory.getReplicationQueuesClient(
145        new ReplicationQueuesClientArguments(s1.getConfiguration(), s1, s1.getZooKeeper()));
146
147    int v0 = client.getQueuesZNodeCversion();
148    List<String> queues = rq1.getUnClaimedQueueIds(s0.getServerName().getServerName());
149    for(String queue : queues) {
150      rq1.claimQueue(s0.getServerName().getServerName(), queue);
151    }
152    rq1.removeReplicatorIfQueueIsEmpty(s0.getServerName().getServerName());
153    int v1 = client.getQueuesZNodeCversion();
154    // cversion should increase by 1 since a child node is deleted
155    assertEquals(v0 + 1, v1);
156
157    s0.stop("");
158  }
159}