001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.zookeeper; 019 020import static org.hamcrest.CoreMatchers.instanceOf; 021import static org.hamcrest.MatcherAssert.assertThat; 022import static org.junit.jupiter.api.Assertions.assertArrayEquals; 023import static org.junit.jupiter.api.Assertions.assertEquals; 024import static org.junit.jupiter.api.Assertions.assertNotEquals; 025import static org.junit.jupiter.api.Assertions.assertNotNull; 026import static org.junit.jupiter.api.Assertions.assertNotSame; 027import static org.junit.jupiter.api.Assertions.assertNull; 028import static org.junit.jupiter.api.Assertions.assertSame; 029import static org.junit.jupiter.api.Assertions.fail; 030import static org.mockito.ArgumentMatchers.any; 031import static org.mockito.ArgumentMatchers.anyBoolean; 032import static org.mockito.ArgumentMatchers.anyString; 033import static org.mockito.Mockito.doAnswer; 034import static org.mockito.Mockito.mock; 035import static org.mockito.Mockito.never; 036import static org.mockito.Mockito.times; 037import static org.mockito.Mockito.verify; 038import static org.mockito.Mockito.when; 039 040import java.io.IOException; 041import java.util.Collections; 042import java.util.List; 043import java.util.concurrent.CompletableFuture; 044import java.util.concurrent.Exchanger; 045import java.util.concurrent.ExecutionException; 046import java.util.concurrent.TimeUnit; 047import org.apache.hadoop.conf.Configuration; 048import org.apache.hadoop.hbase.HBaseZKTestingUtil; 049import org.apache.hadoop.hbase.HConstants; 050import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; 051import org.apache.hadoop.hbase.testclassification.MediumTests; 052import org.apache.hadoop.hbase.testclassification.ZKTests; 053import org.apache.hadoop.hbase.util.Bytes; 054import org.apache.hadoop.hbase.util.Threads; 055import org.apache.zookeeper.AsyncCallback; 056import org.apache.zookeeper.CreateMode; 057import org.apache.zookeeper.KeeperException; 058import org.apache.zookeeper.KeeperException.Code; 059import org.apache.zookeeper.ZooDefs; 060import org.apache.zookeeper.ZooKeeper; 061import org.junit.jupiter.api.AfterAll; 062import org.junit.jupiter.api.BeforeAll; 063import org.junit.jupiter.api.Tag; 064import org.junit.jupiter.api.Test; 065 066import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 067import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; 068 069@Tag(ZKTests.TAG) 070@Tag(MediumTests.TAG) 071public class TestReadOnlyZKClient { 072 073 private static HBaseZKTestingUtil UTIL = new HBaseZKTestingUtil(); 074 075 private static String PATH = "/test"; 076 077 private static byte[] DATA; 078 079 private static int CHILDREN = 5; 080 081 private static ReadOnlyZKClient RO_ZK; 082 private static final HashedWheelTimer RETRY_TIMER = new HashedWheelTimer( 083 new ThreadFactoryBuilder().setNameFormat("Async-Client-Retry-Timer-pool-%d").setDaemon(true) 084 .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(), 085 10, TimeUnit.MILLISECONDS); 086 087 @BeforeAll 088 public static void setUp() throws Exception { 089 final int port = UTIL.startMiniZKCluster().getClientPort(); 090 String hostPort = UTIL.getZkCluster().getAddress().toString(); 091 092 ZooKeeper zk = ZooKeeperHelper.getConnectedZooKeeper(hostPort, 10000); 093 DATA = new byte[10]; 094 Bytes.random(DATA); 095 zk.create(PATH, DATA, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 096 for (int i = 0; i < CHILDREN; i++) { 097 zk.create(PATH + "/c" + i, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 098 } 099 zk.close(); 100 Configuration conf = UTIL.getConfiguration(); 101 conf.set(HConstants.ZOOKEEPER_QUORUM, hostPort); 102 conf.setInt(ReadOnlyZKClient.RECOVERY_RETRY, 3); 103 conf.setInt(ReadOnlyZKClient.RECOVERY_RETRY_INTERVAL_MILLIS, 100); 104 conf.setInt(ReadOnlyZKClient.KEEPALIVE_MILLIS, 3000); 105 RO_ZK = new ReadOnlyZKClient(conf, RETRY_TIMER); 106 // only connect when necessary 107 assertNull(RO_ZK.zookeeper); 108 } 109 110 @AfterAll 111 public static void tearDown() throws IOException { 112 RETRY_TIMER.stop(); 113 RO_ZK.close(); 114 UTIL.shutdownMiniZKCluster(); 115 UTIL.cleanupTestDir(); 116 } 117 118 private void waitForIdleConnectionClosed() throws Exception { 119 // The zookeeper client should be closed finally after the keep alive time elapsed 120 UTIL.waitFor(10000, new ExplainingPredicate<Exception>() { 121 122 @Override 123 public boolean evaluate() { 124 return RO_ZK.zookeeper == null; 125 } 126 127 @Override 128 public String explainFailure() { 129 return "Connection to zookeeper is still alive"; 130 } 131 }); 132 } 133 134 @Test 135 public void testRead() throws Exception { 136 assertArrayEquals(DATA, RO_ZK.get(PATH).get()); 137 assertEquals(CHILDREN, RO_ZK.exists(PATH).get().getNumChildren()); 138 List<String> children = RO_ZK.list(PATH).get(); 139 assertEquals(CHILDREN, children.size()); 140 Collections.sort(children); 141 for (int i = 0; i < CHILDREN; i++) { 142 assertEquals("c" + i, children.get(i)); 143 } 144 assertNotNull(RO_ZK.zookeeper); 145 waitForIdleConnectionClosed(); 146 } 147 148 @Test 149 public void testNoNode() throws InterruptedException, ExecutionException { 150 String pathNotExists = PATH + "_whatever"; 151 try { 152 RO_ZK.get(pathNotExists).get(); 153 fail("should fail because of " + pathNotExists + " does not exist"); 154 } catch (ExecutionException e) { 155 assertThat(e.getCause(), instanceOf(KeeperException.class)); 156 KeeperException ke = (KeeperException) e.getCause(); 157 assertEquals(Code.NONODE, ke.code()); 158 assertEquals(pathNotExists, ke.getPath()); 159 } 160 try { 161 RO_ZK.list(pathNotExists).get(); 162 fail("should fail because of " + pathNotExists + " does not exist"); 163 } catch (ExecutionException e) { 164 assertThat(e.getCause(), instanceOf(KeeperException.class)); 165 KeeperException ke = (KeeperException) e.getCause(); 166 assertEquals(Code.NONODE, ke.code()); 167 assertEquals(pathNotExists, ke.getPath()); 168 } 169 // exists will not throw exception. 170 assertNull(RO_ZK.exists(pathNotExists).get()); 171 } 172 173 @Test 174 public void testSessionExpire() throws Exception { 175 assertArrayEquals(DATA, RO_ZK.get(PATH).get()); 176 ZooKeeper zk = RO_ZK.zookeeper; 177 long sessionId = zk.getSessionId(); 178 UTIL.getZkCluster().getZooKeeperServers().get(0).closeSession(sessionId); 179 // should not reach keep alive so still the same instance 180 assertSame(zk, RO_ZK.zookeeper); 181 byte[] got = RO_ZK.get(PATH).get(); 182 assertArrayEquals(DATA, got); 183 assertNotNull(RO_ZK.zookeeper); 184 assertNotSame(zk, RO_ZK.zookeeper); 185 assertNotEquals(sessionId, RO_ZK.zookeeper.getSessionId()); 186 } 187 188 @Test 189 public void testNotCloseZkWhenPending() throws Exception { 190 ZooKeeper mockedZK = mock(ZooKeeper.class); 191 Exchanger<AsyncCallback.DataCallback> exchanger = new Exchanger<>(); 192 doAnswer(i -> { 193 exchanger.exchange(i.getArgument(2)); 194 return null; 195 }).when(mockedZK).getData(anyString(), anyBoolean(), any(AsyncCallback.DataCallback.class), 196 any()); 197 doAnswer(i -> null).when(mockedZK).close(); 198 when(mockedZK.getState()).thenReturn(ZooKeeper.States.CONNECTED); 199 RO_ZK.zookeeper = mockedZK; 200 CompletableFuture<byte[]> future = RO_ZK.get(PATH); 201 AsyncCallback.DataCallback callback = exchanger.exchange(null); 202 // 2 * keep alive time to ensure that we will not close the zk when there are pending requests 203 Thread.sleep(6000); 204 assertNotNull(RO_ZK.zookeeper); 205 verify(mockedZK, never()).close(); 206 callback.processResult(Code.OK.intValue(), PATH, null, DATA, null); 207 assertArrayEquals(DATA, future.get()); 208 // now we will close the idle connection. 209 waitForIdleConnectionClosed(); 210 verify(mockedZK, times(1)).close(); 211 } 212 213 @Test 214 public void testReadWithTimeout() throws Exception { 215 assertArrayEquals(DATA, RO_ZK.get(PATH, 10000).get()); 216 assertEquals(CHILDREN, RO_ZK.exists(PATH, 10000).get().getNumChildren()); 217 List<String> children = RO_ZK.list(PATH, 10000).get(); 218 assertEquals(CHILDREN, children.size()); 219 Collections.sort(children); 220 for (int i = 0; i < CHILDREN; i++) { 221 assertEquals("c" + i, children.get(i)); 222 } 223 assertNotNull(RO_ZK.zookeeper); 224 waitForIdleConnectionClosed(); 225 } 226}