001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.junit.Assert.assertTrue;
021
022import java.util.List;
023import org.apache.hadoop.hbase.HBaseClassTestRule;
024import org.apache.hadoop.hbase.HBaseConfiguration;
025import org.apache.hadoop.hbase.HBaseTestingUtil;
026import org.apache.hadoop.hbase.Server;
027import org.apache.hadoop.hbase.ServerName;
028import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
029import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
030import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
031import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
032import org.apache.hadoop.hbase.testclassification.MediumTests;
033import org.apache.hadoop.hbase.testclassification.ReplicationTests;
034import org.junit.BeforeClass;
035import org.junit.ClassRule;
036import org.junit.Test;
037import org.junit.experimental.categories.Category;
038
039/**
040 * Tests the ReplicationSourceManager with ReplicationQueueZkImpl's and
041 * ReplicationQueuesClientZkImpl. Also includes extra tests outside of those in
042 * TestReplicationSourceManager that test ReplicationQueueZkImpl-specific behaviors.
043 */
044@Category({ ReplicationTests.class, MediumTests.class })
045public class TestReplicationSourceManagerZkImpl extends TestReplicationSourceManager {
046
047  @ClassRule
048  public static final HBaseClassTestRule CLASS_RULE =
049    HBaseClassTestRule.forClass(TestReplicationSourceManagerZkImpl.class);
050
051  @BeforeClass
052  public static void setUpBeforeClass() throws Exception {
053    conf = HBaseConfiguration.create();
054    conf.set("replication.replicationsource.implementation",
055      ReplicationSourceDummy.class.getCanonicalName());
056    conf.setLong("replication.sleep.before.failover", 2000);
057    conf.setInt("replication.source.maxretriesmultiplier", 10);
058    utility = new HBaseTestingUtil(conf);
059    utility.startMiniZKCluster();
060    setupZkAndReplication();
061  }
062
063  // Tests the naming convention of adopted queues for ReplicationQueuesZkImpl
064  @Test
065  public void testNodeFailoverDeadServerParsing() throws Exception {
066    Server server = new DummyServer("ec2-54-234-230-108.compute-1.amazonaws.com");
067    ReplicationQueueStorage queueStorage =
068      ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(), conf);
069    // populate some znodes in the peer znode
070    files.add("log1");
071    files.add("log2");
072    for (String file : files) {
073      queueStorage.addWAL(server.getServerName(), "1", file);
074    }
075
076    // create 3 DummyServers
077    Server s1 = new DummyServer("ip-10-8-101-114.ec2.internal");
078    Server s2 = new DummyServer("ec2-107-20-52-47.compute-1.amazonaws.com");
079    Server s3 = new DummyServer("ec2-23-20-187-167.compute-1.amazonaws.com");
080
081    // simulate three servers fail sequentially
082    ServerName serverName = server.getServerName();
083    List<String> unclaimed = queueStorage.getAllQueues(serverName);
084    queueStorage.claimQueue(serverName, unclaimed.get(0), s1.getServerName());
085    queueStorage.removeReplicatorIfQueueIsEmpty(serverName);
086
087    serverName = s1.getServerName();
088    unclaimed = queueStorage.getAllQueues(serverName);
089    queueStorage.claimQueue(serverName, unclaimed.get(0), s2.getServerName());
090    queueStorage.removeReplicatorIfQueueIsEmpty(serverName);
091
092    serverName = s2.getServerName();
093    unclaimed = queueStorage.getAllQueues(serverName);
094    String queue3 =
095      queueStorage.claimQueue(serverName, unclaimed.get(0), s3.getServerName()).getFirst();
096    queueStorage.removeReplicatorIfQueueIsEmpty(serverName);
097
098    ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(queue3);
099    List<ServerName> result = replicationQueueInfo.getDeadRegionServers();
100    // verify
101    assertTrue(result.contains(server.getServerName()));
102    assertTrue(result.contains(s1.getServerName()));
103    assertTrue(result.contains(s2.getServerName()));
104
105    server.stop("");
106  }
107}