001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertTrue; 023 024import java.util.List; 025import org.apache.hadoop.hbase.HBaseClassTestRule; 026import org.apache.hadoop.hbase.HBaseConfiguration; 027import org.apache.hadoop.hbase.HBaseTestingUtility; 028import org.apache.hadoop.hbase.Server; 029import org.apache.hadoop.hbase.ServerName; 030import org.apache.hadoop.hbase.replication.ReplicationFactory; 031import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; 032import org.apache.hadoop.hbase.replication.ReplicationQueues; 033import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments; 034import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments; 035import org.apache.hadoop.hbase.replication.ReplicationQueuesClientZKImpl; 036import org.apache.hadoop.hbase.replication.ReplicationSourceDummy; 037import org.apache.hadoop.hbase.testclassification.MediumTests; 038import org.apache.hadoop.hbase.testclassification.ReplicationTests; 039import org.junit.BeforeClass; 040import org.junit.ClassRule; 041import org.junit.Test; 042import org.junit.experimental.categories.Category; 043 044/** 045 * Tests the ReplicationSourceManager with ReplicationQueueZkImpl's and 046 * ReplicationQueuesClientZkImpl. Also includes extra tests outside of those in 047 * TestReplicationSourceManager that test ReplicationQueueZkImpl-specific behaviors. 048 */ 049@Category({ReplicationTests.class, MediumTests.class}) 050public class TestReplicationSourceManagerZkImpl extends TestReplicationSourceManager { 051 052 @ClassRule 053 public static final HBaseClassTestRule CLASS_RULE = 054 HBaseClassTestRule.forClass(TestReplicationSourceManagerZkImpl.class); 055 056 @BeforeClass 057 public static void setUpBeforeClass() throws Exception { 058 conf = HBaseConfiguration.create(); 059 conf.set("replication.replicationsource.implementation", 060 ReplicationSourceDummy.class.getCanonicalName()); 061 conf.setLong("replication.sleep.before.failover", 2000); 062 conf.setInt("replication.source.maxretriesmultiplier", 10); 063 utility = new HBaseTestingUtility(conf); 064 utility.startMiniZKCluster(); 065 setupZkAndReplication(); 066 } 067 068 // Tests the naming convention of adopted queues for ReplicationQueuesZkImpl 069 @Test 070 public void testNodeFailoverDeadServerParsing() throws Exception { 071 final Server server = new DummyServer("ec2-54-234-230-108.compute-1.amazonaws.com"); 072 ReplicationQueues repQueues = 073 ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, server, 074 server.getZooKeeper())); 075 repQueues.init(server.getServerName().toString()); 076 // populate some znodes in the peer znode 077 files.add("log1"); 078 files.add("log2"); 079 for (String file : files) { 080 repQueues.addLog("1", file); 081 } 082 083 // create 3 DummyServers 084 Server s1 = new DummyServer("ip-10-8-101-114.ec2.internal"); 085 Server s2 = new DummyServer("ec2-107-20-52-47.compute-1.amazonaws.com"); 086 Server s3 = new DummyServer("ec2-23-20-187-167.compute-1.amazonaws.com"); 087 088 // simulate three servers fail sequentially 089 ReplicationQueues rq1 = 090 ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(s1.getConfiguration(), s1, 091 s1.getZooKeeper())); 092 rq1.init(s1.getServerName().toString()); 093 String serverName = server.getServerName().getServerName(); 094 List<String> unclaimed = rq1.getUnClaimedQueueIds(serverName); 095 rq1.claimQueue(serverName, unclaimed.get(0)).getSecond(); 096 rq1.removeReplicatorIfQueueIsEmpty(unclaimed.get(0)); 097 ReplicationQueues rq2 = 098 ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(s2.getConfiguration(), s2, 099 s2.getZooKeeper())); 100 rq2.init(s2.getServerName().toString()); 101 serverName = s1.getServerName().getServerName(); 102 unclaimed = rq2.getUnClaimedQueueIds(serverName); 103 rq2.claimQueue(serverName, unclaimed.get(0)).getSecond(); 104 rq2.removeReplicatorIfQueueIsEmpty(unclaimed.get(0)); 105 ReplicationQueues rq3 = 106 ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(s3.getConfiguration(), s3, 107 s3.getZooKeeper())); 108 rq3.init(s3.getServerName().toString()); 109 serverName = s2.getServerName().getServerName(); 110 unclaimed = rq3.getUnClaimedQueueIds(serverName); 111 String queue3 = rq3.claimQueue(serverName, unclaimed.get(0)).getFirst(); 112 rq3.removeReplicatorIfQueueIsEmpty(unclaimed.get(0)); 113 ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(queue3); 114 List<ServerName> result = replicationQueueInfo.getDeadRegionServers(); 115 // verify 116 assertTrue(result.contains(server.getServerName())); 117 assertTrue(result.contains(s1.getServerName())); 118 assertTrue(result.contains(s2.getServerName())); 119 120 server.stop(""); 121 } 122 123 @Test 124 public void testFailoverDeadServerCversionChange() throws Exception { 125 final Server s0 = new DummyServer("cversion-change0.example.org"); 126 ReplicationQueues repQueues = 127 ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, s0, 128 s0.getZooKeeper())); 129 repQueues.init(s0.getServerName().toString()); 130 // populate some znodes in the peer znode 131 files.add("log1"); 132 files.add("log2"); 133 for (String file : files) { 134 repQueues.addLog("1", file); 135 } 136 // simulate queue transfer 137 Server s1 = new DummyServer("cversion-change1.example.org"); 138 ReplicationQueues rq1 = 139 ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(s1.getConfiguration(), s1, 140 s1.getZooKeeper())); 141 rq1.init(s1.getServerName().toString()); 142 143 ReplicationQueuesClientZKImpl client = 144 (ReplicationQueuesClientZKImpl)ReplicationFactory.getReplicationQueuesClient( 145 new ReplicationQueuesClientArguments(s1.getConfiguration(), s1, s1.getZooKeeper())); 146 147 int v0 = client.getQueuesZNodeCversion(); 148 List<String> queues = rq1.getUnClaimedQueueIds(s0.getServerName().getServerName()); 149 for(String queue : queues) { 150 rq1.claimQueue(s0.getServerName().getServerName(), queue); 151 } 152 rq1.removeReplicatorIfQueueIsEmpty(s0.getServerName().getServerName()); 153 int v1 = client.getQueuesZNodeCversion(); 154 // cversion should increase by 1 since a child node is deleted 155 assertEquals(v0 + 1, v1); 156 157 s0.stop(""); 158 } 159}