001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022import static org.junit.Assert.fail;
023
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.List;
027import java.util.concurrent.atomic.AtomicInteger;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.fs.FileSystem;
030import org.apache.hadoop.hbase.ChoreService;
031import org.apache.hadoop.hbase.ClusterId;
032import org.apache.hadoop.hbase.CoordinatedStateManager;
033import org.apache.hadoop.hbase.HBaseClassTestRule;
034import org.apache.hadoop.hbase.HBaseTestingUtility;
035import org.apache.hadoop.hbase.HConstants;
036import org.apache.hadoop.hbase.Server;
037import org.apache.hadoop.hbase.ServerName;
038import org.apache.hadoop.hbase.client.ClusterConnection;
039import org.apache.hadoop.hbase.client.Connection;
040import org.apache.hadoop.hbase.testclassification.MediumTests;
041import org.apache.hadoop.hbase.testclassification.ReplicationTests;
042import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
043import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
044import org.apache.hadoop.hbase.zookeeper.ZKUtil;
045import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
046import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
047import org.junit.AfterClass;
048import org.junit.Before;
049import org.junit.BeforeClass;
050import org.junit.ClassRule;
051import org.junit.Test;
052import org.junit.experimental.categories.Category;
053import org.slf4j.Logger;
054import org.slf4j.LoggerFactory;
055
056/**
057 * This class tests the ReplicationTrackerZKImpl class and ReplicationListener interface. One
058 * MiniZKCluster is used throughout the entire class. The cluster is initialized with the creation
059 * of the rsZNode. All other znode creation/initialization is handled by the replication state
060 * interfaces (i.e. ReplicationPeers, etc.). Each test case in this class should ensure that the
061 * MiniZKCluster is cleaned and returned to it's initial state (i.e. nothing but the rsZNode).
062 */
063@Category({ReplicationTests.class, MediumTests.class})
064public class TestReplicationTrackerZKImpl {
065
066  @ClassRule
067  public static final HBaseClassTestRule CLASS_RULE =
068      HBaseClassTestRule.forClass(TestReplicationTrackerZKImpl.class);
069
070  private static final Logger LOG = LoggerFactory.getLogger(TestReplicationTrackerZKImpl.class);
071
072  private static Configuration conf;
073  private static HBaseTestingUtility utility;
074
075  // Each one of the below variables are reinitialized before every test case
076  private ZKWatcher zkw;
077  private ReplicationPeers rp;
078  private ReplicationTracker rt;
079  private AtomicInteger rsRemovedCount;
080  private String rsRemovedData;
081  private AtomicInteger plChangedCount;
082  private List<String> plChangedData;
083  private AtomicInteger peerRemovedCount;
084  private String peerRemovedData;
085
086  @BeforeClass
087  public static void setUpBeforeClass() throws Exception {
088    utility = new HBaseTestingUtility();
089    utility.startMiniZKCluster();
090    conf = utility.getConfiguration();
091    ZKWatcher zk = HBaseTestingUtility.getZooKeeperWatcher(utility);
092    ZKUtil.createWithParents(zk, zk.znodePaths.rsZNode);
093  }
094
095  @Before
096  public void setUp() throws Exception {
097    zkw = HBaseTestingUtility.getZooKeeperWatcher(utility);
098    String fakeRs1 = ZNodePaths.joinZNode(zkw.znodePaths.rsZNode, "hostname1.example.org:1234");
099    try {
100      ZKClusterId.setClusterId(zkw, new ClusterId());
101      rp = ReplicationFactory.getReplicationPeers(zkw, conf, zkw);
102      rp.init();
103      rt = ReplicationFactory.getReplicationTracker(zkw, rp, conf, zkw, new DummyServer(fakeRs1));
104    } catch (Exception e) {
105      fail("Exception during test setup: " + e);
106    }
107    rsRemovedCount = new AtomicInteger(0);
108    rsRemovedData = "";
109    plChangedCount = new AtomicInteger(0);
110    plChangedData = new ArrayList<>();
111    peerRemovedCount = new AtomicInteger(0);
112    peerRemovedData = "";
113  }
114
115  @AfterClass
116  public static void tearDownAfterClass() throws Exception {
117    utility.shutdownMiniZKCluster();
118  }
119
120  @Test
121  public void testGetListOfRegionServers() throws Exception {
122    // 0 region servers
123    assertEquals(0, rt.getListOfRegionServers().size());
124
125    // 1 region server
126    ZKUtil.createWithParents(zkw,
127      ZNodePaths.joinZNode(zkw.znodePaths.rsZNode, "hostname1.example.org:1234"));
128    assertEquals(1, rt.getListOfRegionServers().size());
129
130    // 2 region servers
131    ZKUtil.createWithParents(zkw,
132      ZNodePaths.joinZNode(zkw.znodePaths.rsZNode, "hostname2.example.org:1234"));
133    assertEquals(2, rt.getListOfRegionServers().size());
134
135    // 1 region server
136    ZKUtil.deleteNode(zkw,
137      ZNodePaths.joinZNode(zkw.znodePaths.rsZNode, "hostname2.example.org:1234"));
138    assertEquals(1, rt.getListOfRegionServers().size());
139
140    // 0 region server
141    ZKUtil.deleteNode(zkw,
142      ZNodePaths.joinZNode(zkw.znodePaths.rsZNode, "hostname1.example.org:1234"));
143    assertEquals(0, rt.getListOfRegionServers().size());
144  }
145
146  @Test
147  public void testRegionServerRemovedEvent() throws Exception {
148    ZKUtil.createAndWatch(zkw,
149      ZNodePaths.joinZNode(zkw.znodePaths.rsZNode, "hostname2.example.org:1234"),
150      HConstants.EMPTY_BYTE_ARRAY);
151    rt.registerListener(new DummyReplicationListener());
152    // delete one
153    ZKUtil.deleteNode(zkw,
154      ZNodePaths.joinZNode(zkw.znodePaths.rsZNode, "hostname2.example.org:1234"));
155    // wait for event
156    while (rsRemovedCount.get() < 1) {
157      Thread.sleep(5);
158    }
159    assertEquals("hostname2.example.org:1234", rsRemovedData);
160  }
161
162  @Test
163  public void testPeerRemovedEvent() throws Exception {
164    rp.registerPeer("5", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey()));
165    rt.registerListener(new DummyReplicationListener());
166    rp.unregisterPeer("5");
167    // wait for event
168    while (peerRemovedCount.get() < 1) {
169      Thread.sleep(5);
170    }
171    assertEquals("5", peerRemovedData);
172  }
173
174  @Test
175  public void testPeerListChangedEvent() throws Exception {
176    // add a peer
177    rp.registerPeer("5", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey()));
178    zkw.getRecoverableZooKeeper().getZooKeeper().getChildren("/hbase/replication/peers/5", true);
179    rt.registerListener(new DummyReplicationListener());
180    rp.disablePeer("5");
181    int tmp = plChangedCount.get();
182    LOG.info("Peer count=" + tmp);
183    ZKUtil.deleteNode(zkw, "/hbase/replication/peers/5/peer-state");
184    // wait for event
185    while (plChangedCount.get() <= tmp) {
186      Thread.sleep(100);
187      LOG.info("Peer count=" + tmp);
188    }
189    assertEquals(1, plChangedData.size());
190    assertTrue(plChangedData.contains("5"));
191
192    // clean up
193    //ZKUtil.deleteNode(zkw, "/hbase/replication/peers/5");
194    rp.unregisterPeer("5");
195  }
196
197  @Test
198  public void testPeerNameControl() throws Exception {
199    int exists = 0;
200    int hyphen = 0;
201    rp.registerPeer("6", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey()));
202
203    try{
204      rp.registerPeer("6", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey()));
205    }catch(IllegalArgumentException e){
206      exists++;
207    }
208
209    try{
210      rp.registerPeer("6-ec2", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey()));
211    }catch(IllegalArgumentException e){
212      hyphen++;
213    }
214    assertEquals(1, exists);
215    assertEquals(1, hyphen);
216
217    // clean up
218    rp.unregisterPeer("6");
219  }
220
221  private class DummyReplicationListener implements ReplicationListener {
222
223    @Override
224    public void regionServerRemoved(String regionServer) {
225      rsRemovedData = regionServer;
226      rsRemovedCount.getAndIncrement();
227      LOG.debug("Received regionServerRemoved event: " + regionServer);
228    }
229
230    @Override
231    public void peerRemoved(String peerId) {
232      peerRemovedData = peerId;
233      peerRemovedCount.getAndIncrement();
234      LOG.debug("Received peerDisconnected event: " + peerId);
235    }
236
237    @Override
238    public void peerListChanged(List<String> peerIds) {
239      plChangedData.clear();
240      plChangedData.addAll(peerIds);
241      int count = plChangedCount.getAndIncrement();
242      LOG.debug("Received peerListChanged event " + count);
243    }
244  }
245
246  private class DummyServer implements Server {
247    private String serverName;
248    private boolean isAborted = false;
249    private boolean isStopped = false;
250
251    public DummyServer(String serverName) {
252      this.serverName = serverName;
253    }
254
255    @Override
256    public Configuration getConfiguration() {
257      return conf;
258    }
259
260    @Override
261    public ZKWatcher getZooKeeper() {
262      return zkw;
263    }
264
265    @Override
266    public CoordinatedStateManager getCoordinatedStateManager() {
267      return null;
268    }
269
270    @Override
271    public ClusterConnection getConnection() {
272      return null;
273    }
274
275    @Override
276    public MetaTableLocator getMetaTableLocator() {
277      return null;
278    }
279
280    @Override
281    public ServerName getServerName() {
282      return ServerName.valueOf(this.serverName);
283    }
284
285    @Override
286    public void abort(String why, Throwable e) {
287      LOG.info("Aborting " + serverName);
288      this.isAborted = true;
289    }
290
291    @Override
292    public boolean isAborted() {
293      return this.isAborted;
294    }
295
296    @Override
297    public void stop(String why) {
298      this.isStopped = true;
299    }
300
301    @Override
302    public boolean isStopped() {
303      return this.isStopped;
304    }
305
306    @Override
307    public ChoreService getChoreService() {
308      return null;
309    }
310
311    @Override
312    public ClusterConnection getClusterConnection() {
313      // TODO Auto-generated method stub
314      return null;
315    }
316
317    @Override
318    public FileSystem getFileSystem() {
319      return null;
320    }
321
322    @Override
323    public boolean isStopping() {
324      return false;
325    }
326
327    @Override
328    public Connection createConnection(Configuration conf) throws IOException {
329      return null;
330    }
331  }
332}