001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.fail;
022
023import java.io.IOException;
024import java.util.concurrent.atomic.AtomicInteger;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.fs.FileSystem;
027import org.apache.hadoop.hbase.ChoreService;
028import org.apache.hadoop.hbase.ClusterId;
029import org.apache.hadoop.hbase.CoordinatedStateManager;
030import org.apache.hadoop.hbase.HBaseClassTestRule;
031import org.apache.hadoop.hbase.HBaseTestingUtility;
032import org.apache.hadoop.hbase.HConstants;
033import org.apache.hadoop.hbase.Server;
034import org.apache.hadoop.hbase.ServerName;
035import org.apache.hadoop.hbase.client.ClusterConnection;
036import org.apache.hadoop.hbase.client.Connection;
037import org.apache.hadoop.hbase.testclassification.MediumTests;
038import org.apache.hadoop.hbase.testclassification.ReplicationTests;
039import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
040import org.apache.hadoop.hbase.zookeeper.ZKUtil;
041import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
042import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
043import org.apache.zookeeper.KeeperException;
044import org.junit.AfterClass;
045import org.junit.Before;
046import org.junit.BeforeClass;
047import org.junit.ClassRule;
048import org.junit.Test;
049import org.junit.experimental.categories.Category;
050import org.slf4j.Logger;
051import org.slf4j.LoggerFactory;
052
053/**
054 * This class tests the ReplicationTrackerZKImpl class and ReplicationListener interface. One
055 * MiniZKCluster is used throughout the entire class. The cluster is initialized with the creation
056 * of the rsZNode. All other znode creation/initialization is handled by the replication state
057 * interfaces (i.e. ReplicationPeers, etc.). Each test case in this class should ensure that the
058 * MiniZKCluster is cleaned and returned to it's initial state (i.e. nothing but the rsZNode).
059 */
060@Category({ReplicationTests.class, MediumTests.class})
061public class TestReplicationTrackerZKImpl {
062
063  @ClassRule
064  public static final HBaseClassTestRule CLASS_RULE =
065      HBaseClassTestRule.forClass(TestReplicationTrackerZKImpl.class);
066
067  private static final Logger LOG = LoggerFactory.getLogger(TestReplicationTrackerZKImpl.class);
068
069  private static Configuration conf;
070  private static HBaseTestingUtility utility;
071
072  // Each one of the below variables are reinitialized before every test case
073  private ZKWatcher zkw;
074  private ReplicationPeers rp;
075  private ReplicationTracker rt;
076  private AtomicInteger rsRemovedCount;
077  private String rsRemovedData;
078
079  @BeforeClass
080  public static void setUpBeforeClass() throws Exception {
081    utility = new HBaseTestingUtility();
082    utility.startMiniZKCluster();
083    conf = utility.getConfiguration();
084    ZKWatcher zk = HBaseTestingUtility.getZooKeeperWatcher(utility);
085    ZKUtil.createWithParents(zk, zk.getZNodePaths().rsZNode);
086  }
087
088  @Before
089  public void setUp() throws Exception {
090    zkw = HBaseTestingUtility.getZooKeeperWatcher(utility);
091    String fakeRs1 = ZNodePaths.joinZNode(zkw.getZNodePaths().rsZNode,
092            "hostname1.example.org:1234");
093    try {
094      ZKClusterId.setClusterId(zkw, new ClusterId());
095      rp = ReplicationFactory.getReplicationPeers(zkw, conf);
096      rp.init();
097      rt = ReplicationFactory.getReplicationTracker(zkw, new DummyServer(fakeRs1),
098        new DummyServer(fakeRs1));
099    } catch (Exception e) {
100      fail("Exception during test setup: " + e);
101    }
102    rsRemovedCount = new AtomicInteger(0);
103    rsRemovedData = "";
104  }
105
106  @AfterClass
107  public static void tearDownAfterClass() throws Exception {
108    utility.shutdownMiniZKCluster();
109  }
110
111  @Test
112  public void testGetListOfRegionServers() throws Exception {
113    // 0 region servers
114    assertEquals(0, rt.getListOfRegionServers().size());
115
116    // 1 region server
117    ZKUtil.createWithParents(zkw,
118      ZNodePaths.joinZNode(zkw.getZNodePaths().rsZNode, "hostname1.example.org:1234"));
119    assertEquals(1, rt.getListOfRegionServers().size());
120
121    // 2 region servers
122    ZKUtil.createWithParents(zkw,
123      ZNodePaths.joinZNode(zkw.getZNodePaths().rsZNode, "hostname2.example.org:1234"));
124    assertEquals(2, rt.getListOfRegionServers().size());
125
126    // 1 region server
127    ZKUtil.deleteNode(zkw,
128      ZNodePaths.joinZNode(zkw.getZNodePaths().rsZNode, "hostname2.example.org:1234"));
129    assertEquals(1, rt.getListOfRegionServers().size());
130
131    // 0 region server
132    ZKUtil.deleteNode(zkw,
133      ZNodePaths.joinZNode(zkw.getZNodePaths().rsZNode, "hostname1.example.org:1234"));
134    assertEquals(0, rt.getListOfRegionServers().size());
135  }
136
137  @Test
138  public void testRegionServerRemovedEvent() throws Exception {
139    ZKUtil.createAndWatch(zkw,
140      ZNodePaths.joinZNode(zkw.getZNodePaths().rsZNode, "hostname2.example.org:1234"),
141      HConstants.EMPTY_BYTE_ARRAY);
142    rt.registerListener(new DummyReplicationListener());
143    // delete one
144    ZKUtil.deleteNode(zkw,
145      ZNodePaths.joinZNode(zkw.getZNodePaths().rsZNode, "hostname2.example.org:1234"));
146    // wait for event
147    while (rsRemovedCount.get() < 1) {
148      Thread.sleep(5);
149    }
150    assertEquals("hostname2.example.org:1234", rsRemovedData);
151  }
152
153  @Test
154  public void testPeerNameControl() throws Exception {
155    int exists = 0;
156    rp.getPeerStorage().addPeer("6",
157      ReplicationPeerConfig.newBuilder().setClusterKey(utility.getClusterKey()).build(), true);
158
159    try {
160      rp.getPeerStorage().addPeer("6",
161        ReplicationPeerConfig.newBuilder().setClusterKey(utility.getClusterKey()).build(), true);
162    } catch (ReplicationException e) {
163      if (e.getCause() instanceof KeeperException.NodeExistsException) {
164        exists++;
165      }
166    }
167
168    assertEquals(1, exists);
169
170    // clean up
171    rp.getPeerStorage().removePeer("6");
172  }
173
174  private class DummyReplicationListener implements ReplicationListener {
175
176    @Override
177    public void regionServerRemoved(String regionServer) {
178      rsRemovedData = regionServer;
179      rsRemovedCount.getAndIncrement();
180      LOG.debug("Received regionServerRemoved event: " + regionServer);
181    }
182  }
183
184  private class DummyServer implements Server {
185    private String serverName;
186    private boolean isAborted = false;
187    private boolean isStopped = false;
188
189    public DummyServer(String serverName) {
190      this.serverName = serverName;
191    }
192
193    @Override
194    public Configuration getConfiguration() {
195      return conf;
196    }
197
198    @Override
199    public ZKWatcher getZooKeeper() {
200      return zkw;
201    }
202
203    @Override
204    public CoordinatedStateManager getCoordinatedStateManager() {
205      return null;
206    }
207
208    @Override
209    public ClusterConnection getConnection() {
210      return null;
211    }
212
213    @Override
214    public ServerName getServerName() {
215      return ServerName.valueOf(this.serverName);
216    }
217
218    @Override
219    public void abort(String why, Throwable e) {
220      LOG.info("Aborting " + serverName);
221      this.isAborted = true;
222    }
223
224    @Override
225    public boolean isAborted() {
226      return this.isAborted;
227    }
228
229    @Override
230    public void stop(String why) {
231      this.isStopped = true;
232    }
233
234    @Override
235    public boolean isStopped() {
236      return this.isStopped;
237    }
238
239    @Override
240    public ChoreService getChoreService() {
241      return null;
242    }
243
244    @Override
245    public ClusterConnection getClusterConnection() {
246      // TODO Auto-generated method stub
247      return null;
248    }
249
250    @Override
251    public FileSystem getFileSystem() {
252      return null;
253    }
254
255    @Override
256    public boolean isStopping() {
257      return false;
258    }
259
260    @Override
261    public Connection createConnection(Configuration conf) throws IOException {
262      return null;
263    }
264  }
265}