001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.zookeeper; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertNotNull; 023import static org.junit.Assert.assertNull; 024import static org.junit.Assert.assertTrue; 025 026import java.io.IOException; 027import java.util.concurrent.Semaphore; 028import java.util.concurrent.ThreadLocalRandom; 029import org.apache.hadoop.hbase.Abortable; 030import org.apache.hadoop.hbase.HBaseClassTestRule; 031import org.apache.hadoop.hbase.HBaseZKTestingUtility; 032import org.apache.hadoop.hbase.HConstants; 033import org.apache.hadoop.hbase.ServerName; 034import org.apache.hadoop.hbase.testclassification.MediumTests; 035import org.apache.hadoop.hbase.testclassification.ZKTests; 036import org.apache.hadoop.hbase.util.Bytes; 037import org.apache.hadoop.hbase.util.Threads; 038import org.apache.zookeeper.CreateMode; 039import org.apache.zookeeper.ZooDefs.Ids; 040import org.apache.zookeeper.ZooKeeper; 041import org.junit.AfterClass; 042import org.junit.BeforeClass; 043import org.junit.ClassRule; 044import org.junit.Test; 045import org.junit.experimental.categories.Category; 046import org.slf4j.Logger; 047import org.slf4j.LoggerFactory; 048 049@Category({ ZKTests.class, MediumTests.class }) 050public class TestZKNodeTracker { 051 @ClassRule 052 public static final HBaseClassTestRule CLASS_RULE = 053 HBaseClassTestRule.forClass(TestZKNodeTracker.class); 054 055 private static final Logger LOG = LoggerFactory.getLogger(TestZKNodeTracker.class); 056 private final static HBaseZKTestingUtility TEST_UTIL = new HBaseZKTestingUtility(); 057 058 @BeforeClass 059 public static void setUpBeforeClass() throws Exception { 060 TEST_UTIL.startMiniZKCluster(); 061 } 062 063 @AfterClass 064 public static void tearDownAfterClass() throws Exception { 065 TEST_UTIL.shutdownMiniZKCluster(); 066 } 067 068 /** 069 * Test that we can interrupt a node that is blocked on a wait. 070 */ 071 @Test 072 public void testInterruptible() throws IOException, InterruptedException { 073 Abortable abortable = new StubAbortable(); 074 ZKWatcher zk = new ZKWatcher(TEST_UTIL.getConfiguration(), "testInterruptible", abortable); 075 final TestTracker tracker = new TestTracker(zk, "/xyz", abortable); 076 tracker.start(); 077 Thread t = new Thread(() -> { 078 try { 079 tracker.blockUntilAvailable(); 080 } catch (InterruptedException e) { 081 throw new RuntimeException("Interrupted", e); 082 } 083 }); 084 t.start(); 085 while (!t.isAlive()) { 086 Threads.sleep(1); 087 } 088 tracker.stop(); 089 t.join(); 090 // If it wasn't interruptible, we'd never get to here. 091 } 092 093 @Test 094 public void testNodeTracker() throws Exception { 095 Abortable abortable = new StubAbortable(); 096 ZKWatcher zk = new ZKWatcher(TEST_UTIL.getConfiguration(), "testNodeTracker", abortable); 097 ZKUtil.createAndFailSilent(zk, zk.getZNodePaths().baseZNode); 098 099 final String node = ZNodePaths.joinZNode(zk.getZNodePaths().baseZNode, 100 Long.toString(ThreadLocalRandom.current().nextLong())); 101 102 final byte[] dataOne = Bytes.toBytes("dataOne"); 103 final byte[] dataTwo = Bytes.toBytes("dataTwo"); 104 105 // Start a ZKNT with no node currently available 106 TestTracker localTracker = new TestTracker(zk, node, abortable); 107 localTracker.start(); 108 zk.registerListener(localTracker); 109 110 // Make sure we don't have a node 111 assertNull(localTracker.getData(false)); 112 113 // Spin up a thread with another ZKNT and have it block 114 WaitToGetDataThread thread = new WaitToGetDataThread(zk, node); 115 thread.start(); 116 117 // Verify the thread doesn't have a node 118 assertFalse(thread.hasData); 119 120 // Now, start a new ZKNT with the node already available 121 TestTracker secondTracker = new TestTracker(zk, node, null); 122 secondTracker.start(); 123 zk.registerListener(secondTracker); 124 125 // Put up an additional zk listener so we know when zk event is done 126 TestingZKListener zkListener = new TestingZKListener(zk, node); 127 zk.registerListener(zkListener); 128 assertEquals(0, zkListener.createdLock.availablePermits()); 129 130 // Create a completely separate zk connection for test triggers and avoid 131 // any weird watcher interactions from the test 132 final ZooKeeper zkconn = ZooKeeperHelper.getConnectedZooKeeper( 133 ZKConfig.getZKQuorumServersString(TEST_UTIL.getConfiguration()), 60000); 134 135 // Add the node with data one 136 zkconn.create(node, dataOne, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 137 138 // Wait for the zk event to be processed 139 zkListener.waitForCreation(); 140 thread.join(); 141 142 // Both trackers should have the node available with data one 143 assertNotNull(localTracker.getData(false)); 144 assertNotNull(localTracker.blockUntilAvailable()); 145 assertTrue(Bytes.equals(localTracker.getData(false), dataOne)); 146 assertTrue(thread.hasData); 147 assertTrue(Bytes.equals(thread.tracker.getData(false), dataOne)); 148 LOG.info("Successfully got data one"); 149 150 // Make sure it's available and with the expected data 151 assertNotNull(secondTracker.getData(false)); 152 assertNotNull(secondTracker.blockUntilAvailable()); 153 assertTrue(Bytes.equals(secondTracker.getData(false), dataOne)); 154 LOG.info("Successfully got data one with the second tracker"); 155 156 // Drop the node 157 zkconn.delete(node, -1); 158 zkListener.waitForDeletion(); 159 160 // Create a new thread but with the existing thread's tracker to wait 161 TestTracker threadTracker = thread.tracker; 162 thread = new WaitToGetDataThread(threadTracker); 163 thread.start(); 164 165 // Verify other guys don't have data 166 assertFalse(thread.hasData); 167 assertNull(secondTracker.getData(false)); 168 assertNull(localTracker.getData(false)); 169 LOG.info("Successfully made unavailable"); 170 171 // Create with second data 172 zkconn.create(node, dataTwo, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 173 174 // Wait for the zk event to be processed 175 zkListener.waitForCreation(); 176 thread.join(); 177 178 // All trackers should have the node available with data two 179 assertNotNull(localTracker.getData(false)); 180 assertNotNull(localTracker.blockUntilAvailable()); 181 assertTrue(Bytes.equals(localTracker.getData(false), dataTwo)); 182 assertNotNull(secondTracker.getData(false)); 183 assertNotNull(secondTracker.blockUntilAvailable()); 184 assertTrue(Bytes.equals(secondTracker.getData(false), dataTwo)); 185 assertTrue(thread.hasData); 186 assertTrue(Bytes.equals(thread.tracker.getData(false), dataTwo)); 187 LOG.info("Successfully got data two on all trackers and threads"); 188 189 // Change the data back to data one 190 zkconn.setData(node, dataOne, -1); 191 192 // Wait for zk event to be processed 193 zkListener.waitForDataChange(); 194 195 // All trackers should have the node available with data one 196 assertNotNull(localTracker.getData(false)); 197 assertNotNull(localTracker.blockUntilAvailable()); 198 assertTrue(Bytes.equals(localTracker.getData(false), dataOne)); 199 assertNotNull(secondTracker.getData(false)); 200 assertNotNull(secondTracker.blockUntilAvailable()); 201 assertTrue(Bytes.equals(secondTracker.getData(false), dataOne)); 202 assertTrue(thread.hasData); 203 assertTrue(Bytes.equals(thread.tracker.getData(false), dataOne)); 204 LOG.info("Successfully got data one following a data change on all trackers and threads"); 205 } 206 207 public static class WaitToGetDataThread extends Thread { 208 TestTracker tracker; 209 boolean hasData; 210 211 WaitToGetDataThread(ZKWatcher zk, String node) { 212 tracker = new TestTracker(zk, node, null); 213 tracker.start(); 214 zk.registerListener(tracker); 215 hasData = false; 216 } 217 218 WaitToGetDataThread(TestTracker tracker) { 219 this.tracker = tracker; 220 hasData = false; 221 } 222 223 @Override 224 public void run() { 225 LOG.info("Waiting for data to be available in WaitToGetDataThread"); 226 try { 227 tracker.blockUntilAvailable(); 228 } catch (InterruptedException e) { 229 e.printStackTrace(); 230 } 231 LOG.info("Data now available in tracker from WaitToGetDataThread"); 232 hasData = true; 233 } 234 } 235 236 public static class TestTracker extends ZKNodeTracker { 237 TestTracker(ZKWatcher watcher, String node, Abortable abortable) { 238 super(watcher, node, abortable); 239 } 240 } 241 242 public static class TestingZKListener extends ZKListener { 243 private static final Logger LOG = LoggerFactory.getLogger(TestingZKListener.class); 244 245 private Semaphore deletedLock; 246 private Semaphore createdLock; 247 private Semaphore changedLock; 248 private String node; 249 250 TestingZKListener(ZKWatcher watcher, String node) { 251 super(watcher); 252 deletedLock = new Semaphore(0); 253 createdLock = new Semaphore(0); 254 changedLock = new Semaphore(0); 255 this.node = node; 256 } 257 258 @Override 259 public void nodeDeleted(String path) { 260 if (path.equals(node)) { 261 LOG.debug("nodeDeleted(" + path + ")"); 262 deletedLock.release(); 263 } 264 } 265 266 @Override 267 public void nodeCreated(String path) { 268 if (path.equals(node)) { 269 LOG.debug("nodeCreated(" + path + ")"); 270 createdLock.release(); 271 } 272 } 273 274 @Override 275 public void nodeDataChanged(String path) { 276 if (path.equals(node)) { 277 LOG.debug("nodeDataChanged(" + path + ")"); 278 changedLock.release(); 279 } 280 } 281 282 void waitForDeletion() throws InterruptedException { 283 deletedLock.acquire(); 284 } 285 286 void waitForCreation() throws InterruptedException { 287 createdLock.acquire(); 288 } 289 290 void waitForDataChange() throws InterruptedException { 291 changedLock.acquire(); 292 } 293 } 294 295 public static class StubAbortable implements Abortable { 296 @Override 297 public void abort(final String msg, final Throwable t) { 298 } 299 300 @Override 301 public boolean isAborted() { 302 return false; 303 } 304 } 305 306 @Test 307 public void testCleanZNode() throws Exception { 308 ZKWatcher zkw = new ZKWatcher(TEST_UTIL.getConfiguration(), "testNodeTracker", 309 new TestZKNodeTracker.StubAbortable()); 310 311 final ServerName sn = ServerName.valueOf("127.0.0.1:52", 45L); 312 313 ZKUtil.createAndFailSilent(zkw, TEST_UTIL.getConfiguration() 314 .get(HConstants.ZOOKEEPER_ZNODE_PARENT, HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT)); 315 316 final String nodeName = zkw.getZNodePaths().masterAddressZNode; 317 318 // Check that we manage the case when there is no data 319 ZKUtil.createAndFailSilent(zkw, nodeName); 320 MasterAddressTracker.deleteIfEquals(zkw, sn.toString()); 321 assertNotNull(ZKUtil.getData(zkw, nodeName)); 322 323 // Check that we don't delete if we're not supposed to 324 ZKUtil.setData(zkw, nodeName, MasterAddressTracker.toByteArray(sn, 0)); 325 MasterAddressTracker.deleteIfEquals(zkw, ServerName.valueOf("127.0.0.2:52", 45L).toString()); 326 assertNotNull(ZKUtil.getData(zkw, nodeName)); 327 328 // Check that we delete when we're supposed to 329 ZKUtil.setData(zkw, nodeName, MasterAddressTracker.toByteArray(sn, 0)); 330 MasterAddressTracker.deleteIfEquals(zkw, sn.toString()); 331 assertNull(ZKUtil.getData(zkw, nodeName)); 332 333 // Check that we support the case when the znode does not exist 334 MasterAddressTracker.deleteIfEquals(zkw, sn.toString()); // must not throw an exception 335 } 336}