001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.zookeeper; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertNotNull; 023import static org.junit.Assert.assertNull; 024import static org.junit.Assert.assertTrue; 025 026import java.io.IOException; 027import java.util.concurrent.Semaphore; 028import java.util.concurrent.ThreadLocalRandom; 029import org.apache.hadoop.hbase.Abortable; 030import org.apache.hadoop.hbase.HBaseClassTestRule; 031import org.apache.hadoop.hbase.HBaseZKTestingUtility; 032import org.apache.hadoop.hbase.HConstants; 033import org.apache.hadoop.hbase.ServerName; 034import org.apache.hadoop.hbase.testclassification.MediumTests; 035import org.apache.hadoop.hbase.testclassification.ZKTests; 036import org.apache.hadoop.hbase.util.Bytes; 037import org.apache.hadoop.hbase.util.Threads; 038import org.apache.zookeeper.CreateMode; 039import org.apache.zookeeper.ZooDefs.Ids; 040import org.apache.zookeeper.ZooKeeper; 041import org.junit.AfterClass; 042import org.junit.BeforeClass; 043import org.junit.ClassRule; 044import org.junit.Test; 045import org.junit.experimental.categories.Category; 046import org.slf4j.Logger; 047import org.slf4j.LoggerFactory; 048 049@Category({ ZKTests.class, MediumTests.class }) 050public class TestZKNodeTracker { 051 @ClassRule 052 public static final HBaseClassTestRule CLASS_RULE = 053 HBaseClassTestRule.forClass(TestZKNodeTracker.class); 054 055 private static final Logger LOG = LoggerFactory.getLogger(TestZKNodeTracker.class); 056 private final static HBaseZKTestingUtility TEST_UTIL = new HBaseZKTestingUtility(); 057 058 @BeforeClass 059 public static void setUpBeforeClass() throws Exception { 060 TEST_UTIL.startMiniZKCluster(); 061 } 062 063 @AfterClass 064 public static void tearDownAfterClass() throws Exception { 065 TEST_UTIL.shutdownMiniZKCluster(); 066 } 067 068 /** 069 * Test that we can interrupt a node that is blocked on a wait. 070 */ 071 @Test 072 public void testInterruptible() throws IOException, InterruptedException { 073 Abortable abortable = new StubAbortable(); 074 ZKWatcher zk = new ZKWatcher(TEST_UTIL.getConfiguration(), "testInterruptible", abortable); 075 final TestTracker tracker = new TestTracker(zk, "/xyz", abortable); 076 tracker.start(); 077 Thread t = new Thread(() -> { 078 try { 079 tracker.blockUntilAvailable(); 080 } catch (InterruptedException e) { 081 throw new RuntimeException("Interrupted", e); 082 } 083 }); 084 t.start(); 085 while (!t.isAlive()) { 086 Threads.sleep(1); 087 } 088 tracker.stop(); 089 t.join(); 090 // If it wasn't interruptible, we'd never get to here. 091 } 092 093 @Test 094 public void testNodeTracker() throws Exception { 095 Abortable abortable = new StubAbortable(); 096 ZKWatcher zk = new ZKWatcher(TEST_UTIL.getConfiguration(), 097 "testNodeTracker", abortable); 098 ZKUtil.createAndFailSilent(zk, zk.znodePaths.baseZNode); 099 100 final String node = ZNodePaths.joinZNode(zk.znodePaths.baseZNode, 101 Long.toString(ThreadLocalRandom.current().nextLong())); 102 103 final byte [] dataOne = Bytes.toBytes("dataOne"); 104 final byte [] dataTwo = Bytes.toBytes("dataTwo"); 105 106 // Start a ZKNT with no node currently available 107 TestTracker localTracker = new TestTracker(zk, node, abortable); 108 localTracker.start(); 109 zk.registerListener(localTracker); 110 111 // Make sure we don't have a node 112 assertNull(localTracker.getData(false)); 113 114 // Spin up a thread with another ZKNT and have it block 115 WaitToGetDataThread thread = new WaitToGetDataThread(zk, node); 116 thread.start(); 117 118 // Verify the thread doesn't have a node 119 assertFalse(thread.hasData); 120 121 // Now, start a new ZKNT with the node already available 122 TestTracker secondTracker = new TestTracker(zk, node, null); 123 secondTracker.start(); 124 zk.registerListener(secondTracker); 125 126 // Put up an additional zk listener so we know when zk event is done 127 TestingZKListener zkListener = new TestingZKListener(zk, node); 128 zk.registerListener(zkListener); 129 assertEquals(0, zkListener.createdLock.availablePermits()); 130 131 // Create a completely separate zk connection for test triggers and avoid 132 // any weird watcher interactions from the test 133 final ZooKeeper zkconn = ZooKeeperHelper. 134 getConnectedZooKeeper(ZKConfig.getZKQuorumServersString(TEST_UTIL.getConfiguration()), 135 60000); 136 137 // Add the node with data one 138 zkconn.create(node, dataOne, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 139 140 // Wait for the zk event to be processed 141 zkListener.waitForCreation(); 142 thread.join(); 143 144 // Both trackers should have the node available with data one 145 assertNotNull(localTracker.getData(false)); 146 assertNotNull(localTracker.blockUntilAvailable()); 147 assertTrue(Bytes.equals(localTracker.getData(false), dataOne)); 148 assertTrue(thread.hasData); 149 assertTrue(Bytes.equals(thread.tracker.getData(false), dataOne)); 150 LOG.info("Successfully got data one"); 151 152 // Make sure it's available and with the expected data 153 assertNotNull(secondTracker.getData(false)); 154 assertNotNull(secondTracker.blockUntilAvailable()); 155 assertTrue(Bytes.equals(secondTracker.getData(false), dataOne)); 156 LOG.info("Successfully got data one with the second tracker"); 157 158 // Drop the node 159 zkconn.delete(node, -1); 160 zkListener.waitForDeletion(); 161 162 // Create a new thread but with the existing thread's tracker to wait 163 TestTracker threadTracker = thread.tracker; 164 thread = new WaitToGetDataThread(threadTracker); 165 thread.start(); 166 167 // Verify other guys don't have data 168 assertFalse(thread.hasData); 169 assertNull(secondTracker.getData(false)); 170 assertNull(localTracker.getData(false)); 171 LOG.info("Successfully made unavailable"); 172 173 // Create with second data 174 zkconn.create(node, dataTwo, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 175 176 // Wait for the zk event to be processed 177 zkListener.waitForCreation(); 178 thread.join(); 179 180 // All trackers should have the node available with data two 181 assertNotNull(localTracker.getData(false)); 182 assertNotNull(localTracker.blockUntilAvailable()); 183 assertTrue(Bytes.equals(localTracker.getData(false), dataTwo)); 184 assertNotNull(secondTracker.getData(false)); 185 assertNotNull(secondTracker.blockUntilAvailable()); 186 assertTrue(Bytes.equals(secondTracker.getData(false), dataTwo)); 187 assertTrue(thread.hasData); 188 assertTrue(Bytes.equals(thread.tracker.getData(false), dataTwo)); 189 LOG.info("Successfully got data two on all trackers and threads"); 190 191 // Change the data back to data one 192 zkconn.setData(node, dataOne, -1); 193 194 // Wait for zk event to be processed 195 zkListener.waitForDataChange(); 196 197 // All trackers should have the node available with data one 198 assertNotNull(localTracker.getData(false)); 199 assertNotNull(localTracker.blockUntilAvailable()); 200 assertTrue(Bytes.equals(localTracker.getData(false), dataOne)); 201 assertNotNull(secondTracker.getData(false)); 202 assertNotNull(secondTracker.blockUntilAvailable()); 203 assertTrue(Bytes.equals(secondTracker.getData(false), dataOne)); 204 assertTrue(thread.hasData); 205 assertTrue(Bytes.equals(thread.tracker.getData(false), dataOne)); 206 LOG.info("Successfully got data one following a data change on all trackers and threads"); 207 } 208 209 public static class WaitToGetDataThread extends Thread { 210 TestTracker tracker; 211 boolean hasData; 212 213 WaitToGetDataThread(ZKWatcher zk, String node) { 214 tracker = new TestTracker(zk, node, null); 215 tracker.start(); 216 zk.registerListener(tracker); 217 hasData = false; 218 } 219 220 WaitToGetDataThread(TestTracker tracker) { 221 this.tracker = tracker; 222 hasData = false; 223 } 224 225 @Override 226 public void run() { 227 LOG.info("Waiting for data to be available in WaitToGetDataThread"); 228 try { 229 tracker.blockUntilAvailable(); 230 } catch (InterruptedException e) { 231 e.printStackTrace(); 232 } 233 LOG.info("Data now available in tracker from WaitToGetDataThread"); 234 hasData = true; 235 } 236 } 237 238 public static class TestTracker extends ZKNodeTracker { 239 TestTracker(ZKWatcher watcher, String node, Abortable abortable) { 240 super(watcher, node, abortable); 241 } 242 } 243 244 public static class TestingZKListener extends ZKListener { 245 private static final Logger LOG = LoggerFactory.getLogger(TestingZKListener.class); 246 247 private Semaphore deletedLock; 248 private Semaphore createdLock; 249 private Semaphore changedLock; 250 private String node; 251 252 TestingZKListener(ZKWatcher watcher, String node) { 253 super(watcher); 254 deletedLock = new Semaphore(0); 255 createdLock = new Semaphore(0); 256 changedLock = new Semaphore(0); 257 this.node = node; 258 } 259 260 @Override 261 public void nodeDeleted(String path) { 262 if(path.equals(node)) { 263 LOG.debug("nodeDeleted(" + path + ")"); 264 deletedLock.release(); 265 } 266 } 267 268 @Override 269 public void nodeCreated(String path) { 270 if(path.equals(node)) { 271 LOG.debug("nodeCreated(" + path + ")"); 272 createdLock.release(); 273 } 274 } 275 276 @Override 277 public void nodeDataChanged(String path) { 278 if(path.equals(node)) { 279 LOG.debug("nodeDataChanged(" + path + ")"); 280 changedLock.release(); 281 } 282 } 283 284 void waitForDeletion() throws InterruptedException { 285 deletedLock.acquire(); 286 } 287 288 void waitForCreation() throws InterruptedException { 289 createdLock.acquire(); 290 } 291 292 void waitForDataChange() throws InterruptedException { 293 changedLock.acquire(); 294 } 295 } 296 297 public static class StubAbortable implements Abortable { 298 @Override 299 public void abort(final String msg, final Throwable t) {} 300 301 @Override 302 public boolean isAborted() { 303 return false; 304 } 305 } 306 307 @Test 308 public void testCleanZNode() throws Exception { 309 ZKWatcher zkw = new ZKWatcher(TEST_UTIL.getConfiguration(), 310 "testNodeTracker", new TestZKNodeTracker.StubAbortable()); 311 312 final ServerName sn = ServerName.valueOf("127.0.0.1:52", 45L); 313 314 ZKUtil.createAndFailSilent(zkw, 315 TEST_UTIL.getConfiguration().get(HConstants.ZOOKEEPER_ZNODE_PARENT, 316 HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT)); 317 318 final String nodeName = zkw.znodePaths.masterAddressZNode; 319 320 // Check that we manage the case when there is no data 321 ZKUtil.createAndFailSilent(zkw, nodeName); 322 MasterAddressTracker.deleteIfEquals(zkw, sn.toString()); 323 assertNotNull(ZKUtil.getData(zkw, nodeName)); 324 325 // Check that we don't delete if we're not supposed to 326 ZKUtil.setData(zkw, nodeName, MasterAddressTracker.toByteArray(sn, 0)); 327 MasterAddressTracker.deleteIfEquals(zkw, ServerName.valueOf("127.0.0.2:52", 45L).toString()); 328 assertNotNull(ZKUtil.getData(zkw, nodeName)); 329 330 // Check that we delete when we're supposed to 331 ZKUtil.setData(zkw, nodeName,MasterAddressTracker.toByteArray(sn, 0)); 332 MasterAddressTracker.deleteIfEquals(zkw, sn.toString()); 333 assertNull(ZKUtil.getData(zkw, nodeName)); 334 335 // Check that we support the case when the znode does not exist 336 MasterAddressTracker.deleteIfEquals(zkw, sn.toString()); // must not throw an exception 337 } 338}