001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.zookeeper;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNotNull;
023import static org.junit.Assert.assertNull;
024import static org.junit.Assert.assertTrue;
025
026import java.io.IOException;
027import java.util.concurrent.Semaphore;
028import java.util.concurrent.ThreadLocalRandom;
029import org.apache.hadoop.hbase.Abortable;
030import org.apache.hadoop.hbase.HBaseClassTestRule;
031import org.apache.hadoop.hbase.HBaseZKTestingUtility;
032import org.apache.hadoop.hbase.HConstants;
033import org.apache.hadoop.hbase.ServerName;
034import org.apache.hadoop.hbase.testclassification.MediumTests;
035import org.apache.hadoop.hbase.testclassification.ZKTests;
036import org.apache.hadoop.hbase.util.Bytes;
037import org.apache.hadoop.hbase.util.Threads;
038import org.apache.zookeeper.CreateMode;
039import org.apache.zookeeper.ZooDefs.Ids;
040import org.apache.zookeeper.ZooKeeper;
041import org.junit.AfterClass;
042import org.junit.BeforeClass;
043import org.junit.ClassRule;
044import org.junit.Test;
045import org.junit.experimental.categories.Category;
046import org.slf4j.Logger;
047import org.slf4j.LoggerFactory;
048
049@Category({ ZKTests.class, MediumTests.class })
050public class TestZKNodeTracker {
051
052  @ClassRule
053  public static final HBaseClassTestRule CLASS_RULE =
054      HBaseClassTestRule.forClass(TestZKNodeTracker.class);
055
056  private static final Logger LOG = LoggerFactory.getLogger(TestZKNodeTracker.class);
057  private final static HBaseZKTestingUtility TEST_UTIL = new HBaseZKTestingUtility();
058
059  @BeforeClass
060  public static void setUpBeforeClass() throws Exception {
061    TEST_UTIL.startMiniZKCluster();
062  }
063
064  @AfterClass
065  public static void tearDownAfterClass() throws Exception {
066    TEST_UTIL.shutdownMiniZKCluster();
067  }
068
069  /**
070   * Test that we can interrupt a node that is blocked on a wait.
071   */
072  @Test
073  public void testInterruptible() throws IOException, InterruptedException {
074    Abortable abortable = new StubAbortable();
075    ZKWatcher zk = new ZKWatcher(TEST_UTIL.getConfiguration(), "testInterruptible", abortable);
076    final TestTracker tracker = new TestTracker(zk, "/xyz", abortable);
077    tracker.start();
078    Thread t = new Thread() {
079      @Override
080      public void run() {
081        try {
082          tracker.blockUntilAvailable();
083        } catch (InterruptedException e) {
084          throw new RuntimeException("Interrupted", e);
085        }
086      }
087    };
088    t.start();
089    while (!t.isAlive()) {
090      Threads.sleep(1);
091    }
092    tracker.stop();
093    t.join();
094    // If it wasn't interruptible, we'd never get to here.
095  }
096
097  @Test
098  public void testNodeTracker() throws Exception {
099    Abortable abortable = new StubAbortable();
100    ZKWatcher zk = new ZKWatcher(TEST_UTIL.getConfiguration(),
101        "testNodeTracker", abortable);
102    ZKUtil.createAndFailSilent(zk, zk.getZNodePaths().baseZNode);
103
104    final String node = ZNodePaths.joinZNode(zk.getZNodePaths().baseZNode,
105      Long.toString(ThreadLocalRandom.current().nextLong()));
106
107    final byte [] dataOne = Bytes.toBytes("dataOne");
108    final byte [] dataTwo = Bytes.toBytes("dataTwo");
109
110    // Start a ZKNT with no node currently available
111    TestTracker localTracker = new TestTracker(zk, node, abortable);
112    localTracker.start();
113    zk.registerListener(localTracker);
114
115    // Make sure we don't have a node
116    assertNull(localTracker.getData(false));
117
118    // Spin up a thread with another ZKNT and have it block
119    WaitToGetDataThread thread = new WaitToGetDataThread(zk, node);
120    thread.start();
121
122    // Verify the thread doesn't have a node
123    assertFalse(thread.hasData);
124
125    // Now, start a new ZKNT with the node already available
126    TestTracker secondTracker = new TestTracker(zk, node, null);
127    secondTracker.start();
128    zk.registerListener(secondTracker);
129
130    // Put up an additional zk listener so we know when zk event is done
131    TestingZKListener zkListener = new TestingZKListener(zk, node);
132    zk.registerListener(zkListener);
133    assertEquals(0, zkListener.createdLock.availablePermits());
134
135    // Create a completely separate zk connection for test triggers and avoid
136    // any weird watcher interactions from the test
137    final ZooKeeper zkconn = ZooKeeperHelper.
138        getConnectedZooKeeper(ZKConfig.getZKQuorumServersString(TEST_UTIL.getConfiguration()),
139            60000);
140
141    // Add the node with data one
142    zkconn.create(node, dataOne, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
143
144    // Wait for the zk event to be processed
145    zkListener.waitForCreation();
146    thread.join();
147
148    // Both trackers should have the node available with data one
149    assertNotNull(localTracker.getData(false));
150    assertNotNull(localTracker.blockUntilAvailable());
151    assertTrue(Bytes.equals(localTracker.getData(false), dataOne));
152    assertTrue(thread.hasData);
153    assertTrue(Bytes.equals(thread.tracker.getData(false), dataOne));
154    LOG.info("Successfully got data one");
155
156    // Make sure it's available and with the expected data
157    assertNotNull(secondTracker.getData(false));
158    assertNotNull(secondTracker.blockUntilAvailable());
159    assertTrue(Bytes.equals(secondTracker.getData(false), dataOne));
160    LOG.info("Successfully got data one with the second tracker");
161
162    // Drop the node
163    zkconn.delete(node, -1);
164    zkListener.waitForDeletion();
165
166    // Create a new thread but with the existing thread's tracker to wait
167    TestTracker threadTracker = thread.tracker;
168    thread = new WaitToGetDataThread(zk, node, threadTracker);
169    thread.start();
170
171    // Verify other guys don't have data
172    assertFalse(thread.hasData);
173    assertNull(secondTracker.getData(false));
174    assertNull(localTracker.getData(false));
175    LOG.info("Successfully made unavailable");
176
177    // Create with second data
178    zkconn.create(node, dataTwo, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
179
180    // Wait for the zk event to be processed
181    zkListener.waitForCreation();
182    thread.join();
183
184    // All trackers should have the node available with data two
185    assertNotNull(localTracker.getData(false));
186    assertNotNull(localTracker.blockUntilAvailable());
187    assertTrue(Bytes.equals(localTracker.getData(false), dataTwo));
188    assertNotNull(secondTracker.getData(false));
189    assertNotNull(secondTracker.blockUntilAvailable());
190    assertTrue(Bytes.equals(secondTracker.getData(false), dataTwo));
191    assertTrue(thread.hasData);
192    assertTrue(Bytes.equals(thread.tracker.getData(false), dataTwo));
193    LOG.info("Successfully got data two on all trackers and threads");
194
195    // Change the data back to data one
196    zkconn.setData(node, dataOne, -1);
197
198    // Wait for zk event to be processed
199    zkListener.waitForDataChange();
200
201    // All trackers should have the node available with data one
202    assertNotNull(localTracker.getData(false));
203    assertNotNull(localTracker.blockUntilAvailable());
204    assertTrue(Bytes.equals(localTracker.getData(false), dataOne));
205    assertNotNull(secondTracker.getData(false));
206    assertNotNull(secondTracker.blockUntilAvailable());
207    assertTrue(Bytes.equals(secondTracker.getData(false), dataOne));
208    assertTrue(thread.hasData);
209    assertTrue(Bytes.equals(thread.tracker.getData(false), dataOne));
210    LOG.info("Successfully got data one following a data change on all trackers and threads");
211  }
212
213  public static class WaitToGetDataThread extends Thread {
214
215    TestTracker tracker;
216    boolean hasData;
217
218    public WaitToGetDataThread(ZKWatcher zk, String node) {
219      tracker = new TestTracker(zk, node, null);
220      tracker.start();
221      zk.registerListener(tracker);
222      hasData = false;
223    }
224
225    public WaitToGetDataThread(ZKWatcher zk, String node,
226                               TestTracker tracker) {
227      this.tracker = tracker;
228      hasData = false;
229    }
230
231    @Override
232    public void run() {
233      LOG.info("Waiting for data to be available in WaitToGetDataThread");
234      try {
235        tracker.blockUntilAvailable();
236      } catch (InterruptedException e) {
237        e.printStackTrace();
238      }
239      LOG.info("Data now available in tracker from WaitToGetDataThread");
240      hasData = true;
241    }
242  }
243
244  public static class TestTracker extends ZKNodeTracker {
245    public TestTracker(ZKWatcher watcher, String node,
246                       Abortable abortable) {
247      super(watcher, node, abortable);
248    }
249  }
250
251  public static class TestingZKListener extends ZKListener {
252    private static final Logger LOG = LoggerFactory.getLogger(TestingZKListener.class);
253
254    private Semaphore deletedLock;
255    private Semaphore createdLock;
256    private Semaphore changedLock;
257    private String node;
258
259    public TestingZKListener(ZKWatcher watcher, String node) {
260      super(watcher);
261      deletedLock = new Semaphore(0);
262      createdLock = new Semaphore(0);
263      changedLock = new Semaphore(0);
264      this.node = node;
265    }
266
267    @Override
268    public void nodeDeleted(String path) {
269      if(path.equals(node)) {
270        LOG.debug("nodeDeleted(" + path + ")");
271        deletedLock.release();
272      }
273    }
274
275    @Override
276    public void nodeCreated(String path) {
277      if(path.equals(node)) {
278        LOG.debug("nodeCreated(" + path + ")");
279        createdLock.release();
280      }
281    }
282
283    @Override
284    public void nodeDataChanged(String path) {
285      if(path.equals(node)) {
286        LOG.debug("nodeDataChanged(" + path + ")");
287        changedLock.release();
288      }
289    }
290
291    public void waitForDeletion() throws InterruptedException {
292      deletedLock.acquire();
293    }
294
295    public void waitForCreation() throws InterruptedException {
296      createdLock.acquire();
297    }
298
299    public void waitForDataChange() throws InterruptedException {
300      changedLock.acquire();
301    }
302  }
303
304  public static class StubAbortable implements Abortable {
305    @Override
306    public void abort(final String msg, final Throwable t) {}
307
308    @Override
309    public boolean isAborted() {
310      return false;
311    }
312  }
313
314  @Test
315  public void testCleanZNode() throws Exception {
316    ZKWatcher zkw = new ZKWatcher(TEST_UTIL.getConfiguration(),
317        "testNodeTracker", new TestZKNodeTracker.StubAbortable());
318
319    final ServerName sn = ServerName.valueOf("127.0.0.1:52", 45L);
320
321    ZKUtil.createAndFailSilent(zkw,
322        TEST_UTIL.getConfiguration().get(HConstants.ZOOKEEPER_ZNODE_PARENT,
323            HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT));
324
325    final String nodeName =  zkw.getZNodePaths().masterAddressZNode;
326
327    // Check that we manage the case when there is no data
328    ZKUtil.createAndFailSilent(zkw, nodeName);
329    MasterAddressTracker.deleteIfEquals(zkw, sn.toString());
330    assertNotNull(ZKUtil.getData(zkw, nodeName));
331
332    // Check that we don't delete if we're not supposed to
333    ZKUtil.setData(zkw, nodeName, MasterAddressTracker.toByteArray(sn, 0));
334    MasterAddressTracker.deleteIfEquals(zkw, ServerName.valueOf("127.0.0.2:52", 45L).toString());
335    assertNotNull(ZKUtil.getData(zkw, nodeName));
336
337    // Check that we delete when we're supposed to
338    ZKUtil.setData(zkw, nodeName,MasterAddressTracker.toByteArray(sn, 0));
339    MasterAddressTracker.deleteIfEquals(zkw, sn.toString());
340    assertNull(ZKUtil.getData(zkw, nodeName));
341
342    // Check that we support the case when the znode does not exist
343    MasterAddressTracker.deleteIfEquals(zkw, sn.toString()); // must not throw an exception
344  }
345
346}