001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.zookeeper; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertFalse; 022import static org.junit.jupiter.api.Assertions.assertNotNull; 023import static org.junit.jupiter.api.Assertions.assertNull; 024import static org.junit.jupiter.api.Assertions.assertTrue; 025 026import java.io.IOException; 027import java.util.concurrent.Semaphore; 028import java.util.concurrent.ThreadLocalRandom; 029import org.apache.hadoop.hbase.Abortable; 030import org.apache.hadoop.hbase.HBaseZKTestingUtil; 031import org.apache.hadoop.hbase.HConstants; 032import org.apache.hadoop.hbase.ServerName; 033import org.apache.hadoop.hbase.testclassification.MediumTests; 034import org.apache.hadoop.hbase.testclassification.ZKTests; 035import org.apache.hadoop.hbase.util.Bytes; 036import org.apache.hadoop.hbase.util.Threads; 037import org.apache.zookeeper.CreateMode; 038import org.apache.zookeeper.ZooDefs.Ids; 039import org.apache.zookeeper.ZooKeeper; 040import org.junit.jupiter.api.AfterAll; 041import org.junit.jupiter.api.BeforeAll; 042import org.junit.jupiter.api.Tag; 043import org.junit.jupiter.api.Test; 044import org.slf4j.Logger; 045import org.slf4j.LoggerFactory; 046 047@Tag(ZKTests.TAG) 048@Tag(MediumTests.TAG) 049public class TestZKNodeTracker { 050 051 private static final Logger LOG = LoggerFactory.getLogger(TestZKNodeTracker.class); 052 private final static HBaseZKTestingUtil TEST_UTIL = new HBaseZKTestingUtil(); 053 054 @BeforeAll 055 public static void setUpBeforeClass() throws Exception { 056 TEST_UTIL.startMiniZKCluster(); 057 } 058 059 @AfterAll 060 public static void tearDownAfterClass() throws Exception { 061 TEST_UTIL.shutdownMiniZKCluster(); 062 } 063 064 /** 065 * Test that we can interrupt a node that is blocked on a wait. 066 */ 067 @Test 068 public void testInterruptible() throws IOException, InterruptedException { 069 Abortable abortable = new StubAbortable(); 070 ZKWatcher zk = new ZKWatcher(TEST_UTIL.getConfiguration(), "testInterruptible", abortable); 071 final TestTracker tracker = new TestTracker(zk, "/xyz", abortable); 072 tracker.start(); 073 Thread t = new Thread(() -> { 074 try { 075 tracker.blockUntilAvailable(); 076 } catch (InterruptedException e) { 077 throw new RuntimeException("Interrupted", e); 078 } 079 }); 080 t.start(); 081 while (!t.isAlive()) { 082 Threads.sleep(1); 083 } 084 tracker.stop(); 085 t.join(); 086 // If it wasn't interruptible, we'd never get to here. 087 } 088 089 @Test 090 public void testNodeTracker() throws Exception { 091 Abortable abortable = new StubAbortable(); 092 ZKWatcher zk = new ZKWatcher(TEST_UTIL.getConfiguration(), "testNodeTracker", abortable); 093 ZKUtil.createAndFailSilent(zk, zk.getZNodePaths().baseZNode); 094 095 final String node = ZNodePaths.joinZNode(zk.getZNodePaths().baseZNode, 096 Long.toString(ThreadLocalRandom.current().nextLong())); 097 098 final byte[] dataOne = Bytes.toBytes("dataOne"); 099 final byte[] dataTwo = Bytes.toBytes("dataTwo"); 100 101 // Start a ZKNT with no node currently available 102 TestTracker localTracker = new TestTracker(zk, node, abortable); 103 localTracker.start(); 104 zk.registerListener(localTracker); 105 106 // Make sure we don't have a node 107 assertNull(localTracker.getData(false)); 108 109 // Spin up a thread with another ZKNT and have it block 110 WaitToGetDataThread thread = new WaitToGetDataThread(zk, node); 111 thread.start(); 112 113 // Verify the thread doesn't have a node 114 assertFalse(thread.hasData); 115 116 // Now, start a new ZKNT with the node already available 117 TestTracker secondTracker = new TestTracker(zk, node, null); 118 secondTracker.start(); 119 zk.registerListener(secondTracker); 120 121 // Put up an additional zk listener so we know when zk event is done 122 TestingZKListener zkListener = new TestingZKListener(zk, node); 123 zk.registerListener(zkListener); 124 assertEquals(0, zkListener.createdLock.availablePermits()); 125 126 // Create a completely separate zk connection for test triggers and avoid 127 // any weird watcher interactions from the test 128 final ZooKeeper zkconn = ZooKeeperHelper.getConnectedZooKeeper( 129 ZKConfig.getZKQuorumServersString(TEST_UTIL.getConfiguration()), 60000); 130 131 // Add the node with data one 132 zkconn.create(node, dataOne, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 133 134 // Wait for the zk event to be processed 135 zkListener.waitForCreation(); 136 thread.join(); 137 138 // Both trackers should have the node available with data one 139 assertNotNull(localTracker.getData(false)); 140 assertNotNull(localTracker.blockUntilAvailable()); 141 assertTrue(Bytes.equals(localTracker.getData(false), dataOne)); 142 assertTrue(thread.hasData); 143 assertTrue(Bytes.equals(thread.tracker.getData(false), dataOne)); 144 LOG.info("Successfully got data one"); 145 146 // Make sure it's available and with the expected data 147 assertNotNull(secondTracker.getData(false)); 148 assertNotNull(secondTracker.blockUntilAvailable()); 149 assertTrue(Bytes.equals(secondTracker.getData(false), dataOne)); 150 LOG.info("Successfully got data one with the second tracker"); 151 152 // Drop the node 153 zkconn.delete(node, -1); 154 zkListener.waitForDeletion(); 155 156 // Create a new thread but with the existing thread's tracker to wait 157 TestTracker threadTracker = thread.tracker; 158 thread = new WaitToGetDataThread(threadTracker); 159 thread.start(); 160 161 // Verify other guys don't have data 162 assertFalse(thread.hasData); 163 assertNull(secondTracker.getData(false)); 164 assertNull(localTracker.getData(false)); 165 LOG.info("Successfully made unavailable"); 166 167 // Create with second data 168 zkconn.create(node, dataTwo, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 169 170 // Wait for the zk event to be processed 171 zkListener.waitForCreation(); 172 thread.join(); 173 174 // All trackers should have the node available with data two 175 assertNotNull(localTracker.getData(false)); 176 assertNotNull(localTracker.blockUntilAvailable()); 177 assertTrue(Bytes.equals(localTracker.getData(false), dataTwo)); 178 assertNotNull(secondTracker.getData(false)); 179 assertNotNull(secondTracker.blockUntilAvailable()); 180 assertTrue(Bytes.equals(secondTracker.getData(false), dataTwo)); 181 assertTrue(thread.hasData); 182 assertTrue(Bytes.equals(thread.tracker.getData(false), dataTwo)); 183 LOG.info("Successfully got data two on all trackers and threads"); 184 185 // Change the data back to data one 186 zkconn.setData(node, dataOne, -1); 187 188 // Wait for zk event to be processed 189 zkListener.waitForDataChange(); 190 191 // All trackers should have the node available with data one 192 assertNotNull(localTracker.getData(false)); 193 assertNotNull(localTracker.blockUntilAvailable()); 194 assertTrue(Bytes.equals(localTracker.getData(false), dataOne)); 195 assertNotNull(secondTracker.getData(false)); 196 assertNotNull(secondTracker.blockUntilAvailable()); 197 assertTrue(Bytes.equals(secondTracker.getData(false), dataOne)); 198 assertTrue(thread.hasData); 199 assertTrue(Bytes.equals(thread.tracker.getData(false), dataOne)); 200 LOG.info("Successfully got data one following a data change on all trackers and threads"); 201 } 202 203 public static class WaitToGetDataThread extends Thread { 204 TestTracker tracker; 205 boolean hasData; 206 207 WaitToGetDataThread(ZKWatcher zk, String node) { 208 tracker = new TestTracker(zk, node, null); 209 tracker.start(); 210 zk.registerListener(tracker); 211 hasData = false; 212 } 213 214 WaitToGetDataThread(TestTracker tracker) { 215 this.tracker = tracker; 216 hasData = false; 217 } 218 219 @Override 220 public void run() { 221 LOG.info("Waiting for data to be available in WaitToGetDataThread"); 222 try { 223 tracker.blockUntilAvailable(); 224 } catch (InterruptedException e) { 225 e.printStackTrace(); 226 } 227 LOG.info("Data now available in tracker from WaitToGetDataThread"); 228 hasData = true; 229 } 230 } 231 232 public static class TestTracker extends ZKNodeTracker { 233 TestTracker(ZKWatcher watcher, String node, Abortable abortable) { 234 super(watcher, node, abortable); 235 } 236 } 237 238 public static class TestingZKListener extends ZKListener { 239 private static final Logger LOG = LoggerFactory.getLogger(TestingZKListener.class); 240 241 private Semaphore deletedLock; 242 private Semaphore createdLock; 243 private Semaphore changedLock; 244 private String node; 245 246 TestingZKListener(ZKWatcher watcher, String node) { 247 super(watcher); 248 deletedLock = new Semaphore(0); 249 createdLock = new Semaphore(0); 250 changedLock = new Semaphore(0); 251 this.node = node; 252 } 253 254 @Override 255 public void nodeDeleted(String path) { 256 if (path.equals(node)) { 257 LOG.debug("nodeDeleted(" + path + ")"); 258 deletedLock.release(); 259 } 260 } 261 262 @Override 263 public void nodeCreated(String path) { 264 if (path.equals(node)) { 265 LOG.debug("nodeCreated(" + path + ")"); 266 createdLock.release(); 267 } 268 } 269 270 @Override 271 public void nodeDataChanged(String path) { 272 if (path.equals(node)) { 273 LOG.debug("nodeDataChanged(" + path + ")"); 274 changedLock.release(); 275 } 276 } 277 278 void waitForDeletion() throws InterruptedException { 279 deletedLock.acquire(); 280 } 281 282 void waitForCreation() throws InterruptedException { 283 createdLock.acquire(); 284 } 285 286 void waitForDataChange() throws InterruptedException { 287 changedLock.acquire(); 288 } 289 } 290 291 public static class StubAbortable implements Abortable { 292 @Override 293 public void abort(final String msg, final Throwable t) { 294 } 295 296 @Override 297 public boolean isAborted() { 298 return false; 299 } 300 } 301 302 @Test 303 public void testCleanZNode() throws Exception { 304 ZKWatcher zkw = new ZKWatcher(TEST_UTIL.getConfiguration(), "testNodeTracker", 305 new TestZKNodeTracker.StubAbortable()); 306 307 final ServerName sn = ServerName.valueOf("127.0.0.1:52", 45L); 308 309 ZKUtil.createAndFailSilent(zkw, TEST_UTIL.getConfiguration() 310 .get(HConstants.ZOOKEEPER_ZNODE_PARENT, HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT)); 311 312 final String nodeName = zkw.getZNodePaths().masterAddressZNode; 313 314 // Check that we manage the case when there is no data 315 ZKUtil.createAndFailSilent(zkw, nodeName); 316 MasterAddressTracker.deleteIfEquals(zkw, sn.toString()); 317 assertNotNull(ZKUtil.getData(zkw, nodeName)); 318 319 // Check that we don't delete if we're not supposed to 320 ZKUtil.setData(zkw, nodeName, MasterAddressTracker.toByteArray(sn, 0)); 321 MasterAddressTracker.deleteIfEquals(zkw, ServerName.valueOf("127.0.0.2:52", 45L).toString()); 322 assertNotNull(ZKUtil.getData(zkw, nodeName)); 323 324 // Check that we delete when we're supposed to 325 ZKUtil.setData(zkw, nodeName, MasterAddressTracker.toByteArray(sn, 0)); 326 MasterAddressTracker.deleteIfEquals(zkw, sn.toString()); 327 assertNull(ZKUtil.getData(zkw, nodeName)); 328 329 // Check that we support the case when the znode does not exist 330 MasterAddressTracker.deleteIfEquals(zkw, sn.toString()); // must not throw an exception 331 } 332}