001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.jupiter.api.Assertions.assertArrayEquals; 021import static org.junit.jupiter.api.Assertions.assertEquals; 022 023import java.io.File; 024import java.io.IOException; 025import org.apache.commons.io.FileUtils; 026import org.apache.hadoop.hbase.HBaseTestingUtil; 027import org.apache.hadoop.hbase.HConstants; 028import org.apache.hadoop.hbase.ServerName; 029import org.apache.hadoop.hbase.SingleProcessHBaseCluster; 030import org.apache.hadoop.hbase.StartTestingClusterOption; 031import org.apache.hadoop.hbase.TableName; 032import org.apache.hadoop.hbase.TableNameTestExtension; 033import org.apache.hadoop.hbase.master.HMaster; 034import org.apache.hadoop.hbase.master.assignment.AssignmentTestingUtil; 035import org.apache.hadoop.hbase.regionserver.HRegionServer; 036import org.apache.hadoop.hbase.testclassification.ClientTests; 037import org.apache.hadoop.hbase.testclassification.MediumTests; 038import org.apache.hadoop.hbase.util.Bytes; 039import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 040import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; 041import org.junit.jupiter.api.AfterAll; 042import org.junit.jupiter.api.BeforeAll; 043import org.junit.jupiter.api.BeforeEach; 044import org.junit.jupiter.api.Tag; 045import org.junit.jupiter.api.Test; 046import org.junit.jupiter.api.extension.RegisterExtension; 047import org.slf4j.Logger; 048import org.slf4j.LoggerFactory; 049 050@Tag(ClientTests.TAG) 051@Tag(MediumTests.TAG) 052public class TestSeparateClientZKCluster { 053 private static final Logger LOG = LoggerFactory.getLogger(TestSeparateClientZKCluster.class); 054 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 055 private static final File clientZkDir = 056 new File(TEST_UTIL.getDataTestDir("TestSeparateClientZKCluster").toString()); 057 private static final int ZK_SESSION_TIMEOUT = 5000; 058 private static MiniZooKeeperCluster clientZkCluster; 059 060 private final byte[] family = Bytes.toBytes("cf"); 061 private final byte[] qualifier = Bytes.toBytes("c1"); 062 private final byte[] row = Bytes.toBytes("row"); 063 private final byte[] value = Bytes.toBytes("v1"); 064 private final byte[] newVal = Bytes.toBytes("v2"); 065 066 @RegisterExtension 067 private TableNameTestExtension name = new TableNameTestExtension(); 068 069 @BeforeAll 070 public static void beforeAllTests() throws Exception { 071 int clientZkPort = 21828; 072 clientZkCluster = new MiniZooKeeperCluster(TEST_UTIL.getConfiguration()); 073 clientZkCluster.setDefaultClientPort(clientZkPort); 074 clientZkCluster.startup(clientZkDir); 075 // start log counter 076 TEST_UTIL.getConfiguration().setInt("hbase.client.start.log.errors.counter", 3); 077 TEST_UTIL.getConfiguration().setInt("zookeeper.recovery.retry", 1); 078 // core settings for testing client ZK cluster 079 TEST_UTIL.getConfiguration().setClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, 080 ZKConnectionRegistry.class, ConnectionRegistry.class); 081 TEST_UTIL.getConfiguration().set(HConstants.CLIENT_ZOOKEEPER_QUORUM, HConstants.LOCALHOST); 082 TEST_UTIL.getConfiguration().setInt(HConstants.CLIENT_ZOOKEEPER_CLIENT_PORT, clientZkPort); 083 // reduce zk session timeout to easier trigger session expiration 084 TEST_UTIL.getConfiguration().setInt(HConstants.ZK_SESSION_TIMEOUT, ZK_SESSION_TIMEOUT); 085 // Start a cluster with 2 masters and 3 regionservers. 086 StartTestingClusterOption option = 087 StartTestingClusterOption.builder().numMasters(2).numRegionServers(3).numDataNodes(3).build(); 088 TEST_UTIL.startMiniCluster(option); 089 } 090 091 @AfterAll 092 public static void afterAllTests() throws Exception { 093 TEST_UTIL.shutdownMiniCluster(); 094 clientZkCluster.shutdown(); 095 FileUtils.deleteDirectory(clientZkDir); 096 } 097 098 @BeforeEach 099 public void setUp() throws IOException { 100 try (Admin admin = TEST_UTIL.getConnection().getAdmin()) { 101 waitForNewMasterUpAndAddressSynced(admin); 102 } 103 } 104 105 private void waitForNewMasterUpAndAddressSynced(Admin admin) { 106 TEST_UTIL.waitFor(30000, () -> { 107 try { 108 return admin.listNamespaces().length > 0; 109 } catch (Exception e) { 110 LOG.warn("failed to list namespaces", e); 111 return false; 112 } 113 }); 114 } 115 116 @Test 117 public void testBasicOperation() throws Exception { 118 TableName tn = name.getTableName(); 119 // create table 120 Connection conn = TEST_UTIL.getConnection(); 121 try (Admin admin = conn.getAdmin(); Table table = conn.getTable(tn)) { 122 ColumnFamilyDescriptorBuilder cfDescBuilder = 123 ColumnFamilyDescriptorBuilder.newBuilder(family); 124 TableDescriptorBuilder tableDescBuilder = 125 TableDescriptorBuilder.newBuilder(tn).setColumnFamily(cfDescBuilder.build()); 126 admin.createTable(tableDescBuilder.build()); 127 // test simple get and put 128 Put put = new Put(row); 129 put.addColumn(family, qualifier, value); 130 table.put(put); 131 Get get = new Get(row); 132 Result result = table.get(get); 133 LOG.debug("Result: " + Bytes.toString(result.getValue(family, qualifier))); 134 assertArrayEquals(value, result.getValue(family, qualifier)); 135 } 136 } 137 138 @Test 139 public void testMasterSwitch() throws Exception { 140 // get an admin instance and issue some request first 141 Connection conn = TEST_UTIL.getConnection(); 142 try (Admin admin = conn.getAdmin()) { 143 LOG.debug("Tables: " + admin.listTableDescriptors()); 144 SingleProcessHBaseCluster cluster = TEST_UTIL.getHBaseCluster(); 145 // switch active master 146 HMaster master = cluster.getMaster(); 147 master.stopMaster(); 148 LOG.info("Stopped master {}", master.getServerName()); 149 TEST_UTIL.waitFor(30000, () -> !master.isAlive()); 150 LOG.info("Shutdown master {}", master.getServerName()); 151 TEST_UTIL.waitFor(30000, 152 () -> cluster.getMaster() != null && cluster.getMaster().isInitialized()); 153 LOG.info("Got master {}", cluster.getMaster().getServerName()); 154 // confirm client access still works 155 waitForNewMasterUpAndAddressSynced(admin); 156 } 157 } 158 159 @Test 160 public void testMetaRegionMove() throws Exception { 161 TableName tn = name.getTableName(); 162 // create table 163 Connection conn = TEST_UTIL.getConnection(); 164 try (Admin admin = conn.getAdmin(); Table table = conn.getTable(tn); 165 RegionLocator locator = conn.getRegionLocator(tn)) { 166 SingleProcessHBaseCluster cluster = TEST_UTIL.getHBaseCluster(); 167 ColumnFamilyDescriptorBuilder cfDescBuilder = 168 ColumnFamilyDescriptorBuilder.newBuilder(family); 169 TableDescriptorBuilder tableDescBuilder = 170 TableDescriptorBuilder.newBuilder(tn).setColumnFamily(cfDescBuilder.build()); 171 admin.createTable(tableDescBuilder.build()); 172 // issue some requests to cache the region location 173 Put put = new Put(row); 174 put.addColumn(family, qualifier, value); 175 table.put(put); 176 Get get = new Get(row); 177 Result result = table.get(get); 178 // move meta region and confirm client could detect 179 ServerName destServerName = null; 180 for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) { 181 ServerName name = rst.getRegionServer().getServerName(); 182 if (!name.equals(cluster.getServerHoldingMeta())) { 183 destServerName = name; 184 break; 185 } 186 } 187 admin.move(RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes(), destServerName); 188 LOG.debug("Finished moving meta"); 189 // invalidate client cache 190 RegionInfo region = locator.getRegionLocation(row).getRegion(); 191 ServerName currentServer = cluster.getServerHoldingRegion(tn, region.getRegionName()); 192 for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) { 193 ServerName name = rst.getRegionServer().getServerName(); 194 if (!name.equals(currentServer)) { 195 destServerName = name; 196 break; 197 } 198 } 199 admin.move(region.getEncodedNameAsBytes(), destServerName); 200 LOG.debug("Finished moving user region"); 201 put = new Put(row); 202 put.addColumn(family, qualifier, newVal); 203 table.put(put); 204 result = table.get(get); 205 LOG.debug("Result: " + Bytes.toString(result.getValue(family, qualifier))); 206 assertArrayEquals(newVal, result.getValue(family, qualifier)); 207 } 208 } 209 210 @Test 211 public void testMetaMoveDuringClientZkClusterRestart() throws Exception { 212 TableName tn = name.getTableName(); 213 // create table 214 Connection conn = TEST_UTIL.getConnection(); 215 try (Admin admin = conn.getAdmin(); Table table = conn.getTable(tn)) { 216 ColumnFamilyDescriptorBuilder cfDescBuilder = 217 ColumnFamilyDescriptorBuilder.newBuilder(family); 218 TableDescriptorBuilder tableDescBuilder = 219 TableDescriptorBuilder.newBuilder(tn).setColumnFamily(cfDescBuilder.build()); 220 admin.createTable(tableDescBuilder.build()); 221 // put some data 222 Put put = new Put(row); 223 put.addColumn(family, qualifier, value); 224 table.put(put); 225 // invalid connection cache 226 conn.clearRegionLocationCache(); 227 // stop client zk cluster 228 clientZkCluster.shutdown(); 229 // stop current meta server and confirm the server shutdown process 230 // is not affected by client ZK crash 231 SingleProcessHBaseCluster cluster = TEST_UTIL.getHBaseCluster(); 232 int metaServerId = cluster.getServerWithMeta(); 233 HRegionServer metaServer = cluster.getRegionServer(metaServerId); 234 metaServer.stop("Stop current RS holding meta region"); 235 while (metaServer.isAlive()) { 236 Thread.sleep(200); 237 } 238 // wait for meta region online 239 AssignmentTestingUtil.waitForAssignment(cluster.getMaster().getAssignmentManager(), 240 RegionInfoBuilder.FIRST_META_REGIONINFO); 241 // wait some long time to make sure we will retry sync data to client ZK until data set 242 Thread.sleep(10000); 243 clientZkCluster.startup(clientZkDir); 244 // new request should pass 245 Get get = new Get(row); 246 Result result = table.get(get); 247 LOG.debug("Result: " + Bytes.toString(result.getValue(family, qualifier))); 248 assertArrayEquals(value, result.getValue(family, qualifier)); 249 } 250 } 251 252 @Test 253 public void testAsyncTable() throws Exception { 254 TableName tn = name.getTableName(); 255 ColumnFamilyDescriptorBuilder cfDescBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family); 256 TableDescriptorBuilder tableDescBuilder = 257 TableDescriptorBuilder.newBuilder(tn).setColumnFamily(cfDescBuilder.build()); 258 try (AsyncConnection ASYNC_CONN = 259 ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get()) { 260 ASYNC_CONN.getAdmin().createTable(tableDescBuilder.build()).get(); 261 AsyncTable<?> table = ASYNC_CONN.getTable(tn); 262 // put some data 263 Put put = new Put(row); 264 put.addColumn(family, qualifier, value); 265 table.put(put).get(); 266 // get and verify 267 Get get = new Get(row); 268 Result result = table.get(get).get(); 269 LOG.debug("Result: " + Bytes.toString(result.getValue(family, qualifier))); 270 assertArrayEquals(value, result.getValue(family, qualifier)); 271 } 272 } 273 274 @Test 275 public void testChangeMetaReplicaCount() throws Exception { 276 Admin admin = TEST_UTIL.getAdmin(); 277 try (RegionLocator locator = 278 TEST_UTIL.getConnection().getRegionLocator(TableName.META_TABLE_NAME)) { 279 assertEquals(1, locator.getAllRegionLocations().size()); 280 HBaseTestingUtil.setReplicas(admin, TableName.META_TABLE_NAME, 3); 281 TEST_UTIL.waitFor(30000, () -> locator.getAllRegionLocations().size() == 3); 282 HBaseTestingUtil.setReplicas(admin, TableName.META_TABLE_NAME, 2); 283 TEST_UTIL.waitFor(30000, () -> locator.getAllRegionLocations().size() == 2); 284 HBaseTestingUtil.setReplicas(admin, TableName.META_TABLE_NAME, 1); 285 TEST_UTIL.waitFor(30000, () -> locator.getAllRegionLocations().size() == 1); 286 } 287 } 288}