001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertArrayEquals; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertTrue; 023 024import java.io.File; 025import org.apache.commons.io.FileUtils; 026import org.apache.hadoop.hbase.HBaseClassTestRule; 027import org.apache.hadoop.hbase.HBaseTestingUtility; 028import org.apache.hadoop.hbase.HConstants; 029import org.apache.hadoop.hbase.MiniHBaseCluster; 030import org.apache.hadoop.hbase.ServerName; 031import org.apache.hadoop.hbase.StartMiniClusterOption; 032import org.apache.hadoop.hbase.TableName; 033import org.apache.hadoop.hbase.TableNameTestRule; 034import org.apache.hadoop.hbase.master.HMaster; 035import org.apache.hadoop.hbase.master.assignment.AssignmentTestingUtil; 036import org.apache.hadoop.hbase.regionserver.HRegionServer; 037import org.apache.hadoop.hbase.testclassification.ClientTests; 038import org.apache.hadoop.hbase.testclassification.MediumTests; 039import org.apache.hadoop.hbase.util.Bytes; 040import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 041import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; 042import org.junit.AfterClass; 043import org.junit.BeforeClass; 044import org.junit.ClassRule; 045import org.junit.Rule; 046import org.junit.Test; 047import org.junit.experimental.categories.Category; 048import org.slf4j.Logger; 049import org.slf4j.LoggerFactory; 050 051@Category({ ClientTests.class, MediumTests.class }) 052public class TestSeparateClientZKCluster { 053 private static final Logger LOG = LoggerFactory.getLogger(TestSeparateClientZKCluster.class); 054 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 055 private static final File clientZkDir = 056 new File(TEST_UTIL.getDataTestDir("TestSeparateClientZKCluster").toString()); 057 private static final int ZK_SESSION_TIMEOUT = 5000; 058 private static MiniZooKeeperCluster clientZkCluster; 059 060 private final byte[] family = Bytes.toBytes("cf"); 061 private final byte[] qualifier = Bytes.toBytes("c1"); 062 private final byte[] row = Bytes.toBytes("row"); 063 private final byte[] value = Bytes.toBytes("v1"); 064 private final byte[] newVal = Bytes.toBytes("v2"); 065 066 @Rule 067 public TableNameTestRule name = new TableNameTestRule(); 068 069 @ClassRule 070 public static final HBaseClassTestRule CLASS_RULE = 071 HBaseClassTestRule.forClass(TestSeparateClientZKCluster.class); 072 073 @BeforeClass 074 public static void beforeAllTests() throws Exception { 075 int clientZkPort = 21828; 076 clientZkCluster = new MiniZooKeeperCluster(TEST_UTIL.getConfiguration()); 077 clientZkCluster.setDefaultClientPort(clientZkPort); 078 clientZkCluster.startup(clientZkDir); 079 // reduce the retry number and start log counter 080 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); 081 TEST_UTIL.getConfiguration().setInt("hbase.client.start.log.errors.counter", -1); 082 TEST_UTIL.getConfiguration().setInt("zookeeper.recovery.retry", 1); 083 // core settings for testing client ZK cluster 084 TEST_UTIL.getConfiguration().setClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, 085 ZKConnectionRegistry.class, ConnectionRegistry.class); 086 TEST_UTIL.getConfiguration().set(HConstants.CLIENT_ZOOKEEPER_QUORUM, HConstants.LOCALHOST); 087 TEST_UTIL.getConfiguration().setInt(HConstants.CLIENT_ZOOKEEPER_CLIENT_PORT, clientZkPort); 088 // reduce zk session timeout to easier trigger session expiration 089 TEST_UTIL.getConfiguration().setInt(HConstants.ZK_SESSION_TIMEOUT, ZK_SESSION_TIMEOUT); 090 // Start a cluster with 2 masters and 3 regionservers. 091 StartMiniClusterOption option = 092 StartMiniClusterOption.builder().numMasters(2).numRegionServers(3).numDataNodes(3).build(); 093 TEST_UTIL.startMiniCluster(option); 094 } 095 096 @AfterClass 097 public static void afterAllTests() throws Exception { 098 TEST_UTIL.shutdownMiniCluster(); 099 clientZkCluster.shutdown(); 100 FileUtils.deleteDirectory(clientZkDir); 101 } 102 103 @Test 104 public void testBasicOperation() throws Exception { 105 TableName tn = name.getTableName(); 106 // create table 107 Connection conn = TEST_UTIL.getConnection(); 108 try (Admin admin = conn.getAdmin(); Table table = conn.getTable(tn)) { 109 ColumnFamilyDescriptorBuilder cfDescBuilder = 110 ColumnFamilyDescriptorBuilder.newBuilder(family); 111 TableDescriptorBuilder tableDescBuilder = 112 TableDescriptorBuilder.newBuilder(tn).setColumnFamily(cfDescBuilder.build()); 113 admin.createTable(tableDescBuilder.build()); 114 // test simple get and put 115 Put put = new Put(row); 116 put.addColumn(family, qualifier, value); 117 table.put(put); 118 Get get = new Get(row); 119 Result result = table.get(get); 120 LOG.debug("Result: " + Bytes.toString(result.getValue(family, qualifier))); 121 assertArrayEquals(value, result.getValue(family, qualifier)); 122 } 123 } 124 125 @Test 126 public void testMasterSwitch() throws Exception { 127 // get an admin instance and issue some request first 128 Connection conn = TEST_UTIL.getConnection(); 129 try (Admin admin = conn.getAdmin()) { 130 LOG.debug("Tables: " + admin.listTableDescriptors()); 131 MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster(); 132 // switch active master 133 HMaster master = cluster.getMaster(); 134 master.stopMaster(); 135 LOG.info("Stopped master {}", master.getServerName()); 136 while (!master.isShutDown()) { 137 Thread.sleep(200); 138 } 139 LOG.info("Shutdown master {}", master.getServerName()); 140 while (cluster.getMaster() == null || !cluster.getMaster().isInitialized()) { 141 LOG.info("Get master {}", 142 cluster.getMaster() == null ? "null" : cluster.getMaster().getServerName()); 143 Thread.sleep(200); 144 } 145 LOG.info("Got master {}", cluster.getMaster().getServerName()); 146 // confirm client access still works 147 assertTrue(admin.balance(false)); 148 } 149 } 150 151 @Test 152 public void testMetaRegionMove() throws Exception { 153 TableName tn = name.getTableName(); 154 // create table 155 Connection conn = TEST_UTIL.getConnection(); 156 try (Admin admin = conn.getAdmin(); 157 Table table = conn.getTable(tn); 158 RegionLocator locator = conn.getRegionLocator(tn)) { 159 MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster(); 160 ColumnFamilyDescriptorBuilder cfDescBuilder = 161 ColumnFamilyDescriptorBuilder.newBuilder(family); 162 TableDescriptorBuilder tableDescBuilder = 163 TableDescriptorBuilder.newBuilder(tn).setColumnFamily(cfDescBuilder.build()); 164 admin.createTable(tableDescBuilder.build()); 165 // issue some requests to cache the region location 166 Put put = new Put(row); 167 put.addColumn(family, qualifier, value); 168 table.put(put); 169 Get get = new Get(row); 170 Result result = table.get(get); 171 // move meta region and confirm client could detect 172 ServerName destServerName = null; 173 for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) { 174 ServerName name = rst.getRegionServer().getServerName(); 175 if (!name.equals(cluster.getServerHoldingMeta())) { 176 destServerName = name; 177 break; 178 } 179 } 180 admin.move(RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes(), destServerName); 181 LOG.debug("Finished moving meta"); 182 // invalidate client cache 183 RegionInfo region = locator.getRegionLocation(row).getRegion(); 184 ServerName currentServer = cluster.getServerHoldingRegion(tn, region.getRegionName()); 185 for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) { 186 ServerName name = rst.getRegionServer().getServerName(); 187 if (!name.equals(currentServer)) { 188 destServerName = name; 189 break; 190 } 191 } 192 admin.move(region.getEncodedNameAsBytes(), destServerName); 193 LOG.debug("Finished moving user region"); 194 put = new Put(row); 195 put.addColumn(family, qualifier, newVal); 196 table.put(put); 197 result = table.get(get); 198 LOG.debug("Result: " + Bytes.toString(result.getValue(family, qualifier))); 199 assertArrayEquals(newVal, result.getValue(family, qualifier)); 200 } 201 } 202 203 @Test 204 public void testMetaMoveDuringClientZkClusterRestart() throws Exception { 205 TableName tn = name.getTableName(); 206 // create table 207 Connection conn = TEST_UTIL.getConnection(); 208 try (Admin admin = conn.getAdmin(); Table table = conn.getTable(tn)) { 209 ColumnFamilyDescriptorBuilder cfDescBuilder = 210 ColumnFamilyDescriptorBuilder.newBuilder(family); 211 TableDescriptorBuilder tableDescBuilder = 212 TableDescriptorBuilder.newBuilder(tn).setColumnFamily(cfDescBuilder.build()); 213 admin.createTable(tableDescBuilder.build()); 214 // put some data 215 Put put = new Put(row); 216 put.addColumn(family, qualifier, value); 217 table.put(put); 218 // invalid connection cache 219 conn.clearRegionLocationCache(); 220 // stop client zk cluster 221 clientZkCluster.shutdown(); 222 // stop current meta server and confirm the server shutdown process 223 // is not affected by client ZK crash 224 MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster(); 225 int metaServerId = cluster.getServerWithMeta(); 226 HRegionServer metaServer = cluster.getRegionServer(metaServerId); 227 metaServer.stop("Stop current RS holding meta region"); 228 while (!metaServer.isShutDown()) { 229 Thread.sleep(200); 230 } 231 // wait for meta region online 232 AssignmentTestingUtil.waitForAssignment(cluster.getMaster().getAssignmentManager(), 233 RegionInfoBuilder.FIRST_META_REGIONINFO); 234 // wait some long time to make sure we will retry sync data to client ZK until data set 235 Thread.sleep(10000); 236 clientZkCluster.startup(clientZkDir); 237 // new request should pass 238 Get get = new Get(row); 239 Result result = table.get(get); 240 LOG.debug("Result: " + Bytes.toString(result.getValue(family, qualifier))); 241 assertArrayEquals(value, result.getValue(family, qualifier)); 242 } 243 } 244 245 @Test 246 public void testAsyncTable() throws Exception { 247 TableName tn = name.getTableName(); 248 ColumnFamilyDescriptorBuilder cfDescBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family); 249 TableDescriptorBuilder tableDescBuilder = 250 TableDescriptorBuilder.newBuilder(tn).setColumnFamily(cfDescBuilder.build()); 251 try (AsyncConnection ASYNC_CONN = 252 ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get()) { 253 ASYNC_CONN.getAdmin().createTable(tableDescBuilder.build()).get(); 254 AsyncTable<?> table = ASYNC_CONN.getTable(tn); 255 // put some data 256 Put put = new Put(row); 257 put.addColumn(family, qualifier, value); 258 table.put(put).get(); 259 // get and verify 260 Get get = new Get(row); 261 Result result = table.get(get).get(); 262 LOG.debug("Result: " + Bytes.toString(result.getValue(family, qualifier))); 263 assertArrayEquals(value, result.getValue(family, qualifier)); 264 } 265 } 266 267 @Test 268 public void testChangeMetaReplicaCount() throws Exception { 269 Admin admin = TEST_UTIL.getAdmin(); 270 try (RegionLocator locator = 271 TEST_UTIL.getConnection().getRegionLocator(TableName.META_TABLE_NAME)) { 272 assertEquals(1, locator.getAllRegionLocations().size()); 273 HBaseTestingUtility.setReplicas(admin, TableName.META_TABLE_NAME, 3); 274 TEST_UTIL.waitFor(30000, () -> locator.getAllRegionLocations().size() == 3); 275 HBaseTestingUtility.setReplicas(admin, TableName.META_TABLE_NAME, 2); 276 TEST_UTIL.waitFor(30000, () -> locator.getAllRegionLocations().size() == 2); 277 HBaseTestingUtility.setReplicas(admin, TableName.META_TABLE_NAME, 1); 278 TEST_UTIL.waitFor(30000, () -> locator.getAllRegionLocations().size() == 1); 279 } 280 } 281}