001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.fail;
022
023import java.io.IOException;
024import java.util.concurrent.atomic.AtomicInteger;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.fs.FileSystem;
027import org.apache.hadoop.hbase.ChoreService;
028import org.apache.hadoop.hbase.ClusterId;
029import org.apache.hadoop.hbase.CoordinatedStateManager;
030import org.apache.hadoop.hbase.HBaseClassTestRule;
031import org.apache.hadoop.hbase.HBaseTestingUtility;
032import org.apache.hadoop.hbase.HConstants;
033import org.apache.hadoop.hbase.Server;
034import org.apache.hadoop.hbase.ServerName;
035import org.apache.hadoop.hbase.client.ClusterConnection;
036import org.apache.hadoop.hbase.client.Connection;
037import org.apache.hadoop.hbase.testclassification.MediumTests;
038import org.apache.hadoop.hbase.testclassification.ReplicationTests;
039import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
040import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
041import org.apache.hadoop.hbase.zookeeper.ZKUtil;
042import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
043import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
044import org.apache.zookeeper.KeeperException;
045import org.junit.AfterClass;
046import org.junit.Before;
047import org.junit.BeforeClass;
048import org.junit.ClassRule;
049import org.junit.Test;
050import org.junit.experimental.categories.Category;
051import org.slf4j.Logger;
052import org.slf4j.LoggerFactory;
053
054/**
055 * This class tests the ReplicationTrackerZKImpl class and ReplicationListener interface. One
056 * MiniZKCluster is used throughout the entire class. The cluster is initialized with the creation
057 * of the rsZNode. All other znode creation/initialization is handled by the replication state
058 * interfaces (i.e. ReplicationPeers, etc.). Each test case in this class should ensure that the
059 * MiniZKCluster is cleaned and returned to it's initial state (i.e. nothing but the rsZNode).
060 */
061@Category({ReplicationTests.class, MediumTests.class})
062public class TestReplicationTrackerZKImpl {
063
064  @ClassRule
065  public static final HBaseClassTestRule CLASS_RULE =
066      HBaseClassTestRule.forClass(TestReplicationTrackerZKImpl.class);
067
068  private static final Logger LOG = LoggerFactory.getLogger(TestReplicationTrackerZKImpl.class);
069
070  private static Configuration conf;
071  private static HBaseTestingUtility utility;
072
073  // Each one of the below variables are reinitialized before every test case
074  private ZKWatcher zkw;
075  private ReplicationPeers rp;
076  private ReplicationTracker rt;
077  private AtomicInteger rsRemovedCount;
078  private String rsRemovedData;
079
080  @BeforeClass
081  public static void setUpBeforeClass() throws Exception {
082    utility = new HBaseTestingUtility();
083    utility.startMiniZKCluster();
084    conf = utility.getConfiguration();
085    ZKWatcher zk = HBaseTestingUtility.getZooKeeperWatcher(utility);
086    ZKUtil.createWithParents(zk, zk.getZNodePaths().rsZNode);
087  }
088
089  @Before
090  public void setUp() throws Exception {
091    zkw = HBaseTestingUtility.getZooKeeperWatcher(utility);
092    String fakeRs1 = ZNodePaths.joinZNode(zkw.getZNodePaths().rsZNode,
093            "hostname1.example.org:1234");
094    try {
095      ZKClusterId.setClusterId(zkw, new ClusterId());
096      rp = ReplicationFactory.getReplicationPeers(zkw, conf);
097      rp.init();
098      rt = ReplicationFactory.getReplicationTracker(zkw, new DummyServer(fakeRs1),
099        new DummyServer(fakeRs1));
100    } catch (Exception e) {
101      fail("Exception during test setup: " + e);
102    }
103    rsRemovedCount = new AtomicInteger(0);
104    rsRemovedData = "";
105  }
106
107  @AfterClass
108  public static void tearDownAfterClass() throws Exception {
109    utility.shutdownMiniZKCluster();
110  }
111
112  @Test
113  public void testGetListOfRegionServers() throws Exception {
114    // 0 region servers
115    assertEquals(0, rt.getListOfRegionServers().size());
116
117    // 1 region server
118    ZKUtil.createWithParents(zkw,
119      ZNodePaths.joinZNode(zkw.getZNodePaths().rsZNode, "hostname1.example.org:1234"));
120    assertEquals(1, rt.getListOfRegionServers().size());
121
122    // 2 region servers
123    ZKUtil.createWithParents(zkw,
124      ZNodePaths.joinZNode(zkw.getZNodePaths().rsZNode, "hostname2.example.org:1234"));
125    assertEquals(2, rt.getListOfRegionServers().size());
126
127    // 1 region server
128    ZKUtil.deleteNode(zkw,
129      ZNodePaths.joinZNode(zkw.getZNodePaths().rsZNode, "hostname2.example.org:1234"));
130    assertEquals(1, rt.getListOfRegionServers().size());
131
132    // 0 region server
133    ZKUtil.deleteNode(zkw,
134      ZNodePaths.joinZNode(zkw.getZNodePaths().rsZNode, "hostname1.example.org:1234"));
135    assertEquals(0, rt.getListOfRegionServers().size());
136  }
137
138  @Test
139  public void testRegionServerRemovedEvent() throws Exception {
140    ZKUtil.createAndWatch(zkw,
141      ZNodePaths.joinZNode(zkw.getZNodePaths().rsZNode, "hostname2.example.org:1234"),
142      HConstants.EMPTY_BYTE_ARRAY);
143    rt.registerListener(new DummyReplicationListener());
144    // delete one
145    ZKUtil.deleteNode(zkw,
146      ZNodePaths.joinZNode(zkw.getZNodePaths().rsZNode, "hostname2.example.org:1234"));
147    // wait for event
148    while (rsRemovedCount.get() < 1) {
149      Thread.sleep(5);
150    }
151    assertEquals("hostname2.example.org:1234", rsRemovedData);
152  }
153
154  @Test
155  public void testPeerNameControl() throws Exception {
156    int exists = 0;
157    rp.getPeerStorage().addPeer("6",
158      ReplicationPeerConfig.newBuilder().setClusterKey(utility.getClusterKey()).build(), true);
159
160    try {
161      rp.getPeerStorage().addPeer("6",
162        ReplicationPeerConfig.newBuilder().setClusterKey(utility.getClusterKey()).build(), true);
163    } catch (ReplicationException e) {
164      if (e.getCause() instanceof KeeperException.NodeExistsException) {
165        exists++;
166      }
167    }
168
169    assertEquals(1, exists);
170
171    // clean up
172    rp.getPeerStorage().removePeer("6");
173  }
174
175  private class DummyReplicationListener implements ReplicationListener {
176
177    @Override
178    public void regionServerRemoved(String regionServer) {
179      rsRemovedData = regionServer;
180      rsRemovedCount.getAndIncrement();
181      LOG.debug("Received regionServerRemoved event: " + regionServer);
182    }
183  }
184
185  private class DummyServer implements Server {
186    private String serverName;
187    private boolean isAborted = false;
188    private boolean isStopped = false;
189
190    public DummyServer(String serverName) {
191      this.serverName = serverName;
192    }
193
194    @Override
195    public Configuration getConfiguration() {
196      return conf;
197    }
198
199    @Override
200    public ZKWatcher getZooKeeper() {
201      return zkw;
202    }
203
204    @Override
205    public CoordinatedStateManager getCoordinatedStateManager() {
206      return null;
207    }
208
209    @Override
210    public ClusterConnection getConnection() {
211      return null;
212    }
213
214    @Override
215    public MetaTableLocator getMetaTableLocator() {
216      return null;
217    }
218
219    @Override
220    public ServerName getServerName() {
221      return ServerName.valueOf(this.serverName);
222    }
223
224    @Override
225    public void abort(String why, Throwable e) {
226      LOG.info("Aborting " + serverName);
227      this.isAborted = true;
228    }
229
230    @Override
231    public boolean isAborted() {
232      return this.isAborted;
233    }
234
235    @Override
236    public void stop(String why) {
237      this.isStopped = true;
238    }
239
240    @Override
241    public boolean isStopped() {
242      return this.isStopped;
243    }
244
245    @Override
246    public ChoreService getChoreService() {
247      return null;
248    }
249
250    @Override
251    public ClusterConnection getClusterConnection() {
252      // TODO Auto-generated method stub
253      return null;
254    }
255
256    @Override
257    public FileSystem getFileSystem() {
258      return null;
259    }
260
261    @Override
262    public boolean isStopping() {
263      return false;
264    }
265
266    @Override
267    public Connection createConnection(Configuration conf) throws IOException {
268      return null;
269    }
270  }
271}