001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import java.io.File; 021import org.apache.commons.io.FileUtils; 022import org.apache.hadoop.hbase.HBaseClassTestRule; 023import org.apache.hadoop.hbase.HBaseTestingUtility; 024import org.apache.hadoop.hbase.HConstants; 025import org.apache.hadoop.hbase.MiniHBaseCluster; 026import org.apache.hadoop.hbase.ServerName; 027import org.apache.hadoop.hbase.StartMiniClusterOption; 028import org.apache.hadoop.hbase.TableName; 029import org.apache.hadoop.hbase.master.HMaster; 030import org.apache.hadoop.hbase.master.assignment.AssignmentTestingUtil; 031import org.apache.hadoop.hbase.regionserver.HRegionServer; 032import org.apache.hadoop.hbase.testclassification.MediumTests; 033import org.apache.hadoop.hbase.util.Bytes; 034import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 035import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; 036import org.junit.AfterClass; 037import org.junit.Assert; 038import org.junit.BeforeClass; 039import org.junit.ClassRule; 040import org.junit.Rule; 041import org.junit.Test; 042import org.junit.experimental.categories.Category; 043import org.junit.rules.TestName; 044import org.slf4j.Logger; 045import org.slf4j.LoggerFactory; 046 047@Category(MediumTests.class) 048public class TestSeparateClientZKCluster { 049 private static final Logger LOG = LoggerFactory.getLogger(TestSeparateClientZKCluster.class); 050 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 051 private static final File clientZkDir = 052 new File(TEST_UTIL.getDataTestDir("TestSeparateClientZKCluster").toString()); 053 private static final int ZK_SESSION_TIMEOUT = 5000; 054 private static MiniZooKeeperCluster clientZkCluster; 055 056 private final byte[] family = Bytes.toBytes("cf"); 057 private final byte[] qualifier = Bytes.toBytes("c1"); 058 private final byte[] row = Bytes.toBytes("row"); 059 private final byte[] value = Bytes.toBytes("v1"); 060 private final byte[] newVal = Bytes.toBytes("v2"); 061 062 @Rule 063 public TestName name = new TestName(); 064 065 @ClassRule 066 public static final HBaseClassTestRule CLASS_RULE = 067 HBaseClassTestRule.forClass(TestSeparateClientZKCluster.class); 068 069 @BeforeClass 070 public static void beforeAllTests() throws Exception { 071 int clientZkPort = 21828; 072 clientZkCluster = new MiniZooKeeperCluster(TEST_UTIL.getConfiguration()); 073 clientZkCluster.setDefaultClientPort(clientZkPort); 074 clientZkCluster.startup(clientZkDir); 075 // reduce the retry number and start log counter 076 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); 077 TEST_UTIL.getConfiguration().setInt("hbase.client.start.log.errors.counter", -1); 078 TEST_UTIL.getConfiguration().setInt("zookeeper.recovery.retry", 1); 079 // core settings for testing client ZK cluster 080 TEST_UTIL.getConfiguration().set(HConstants.CLIENT_ZOOKEEPER_QUORUM, HConstants.LOCALHOST); 081 TEST_UTIL.getConfiguration().setInt(HConstants.CLIENT_ZOOKEEPER_CLIENT_PORT, clientZkPort); 082 // reduce zk session timeout to easier trigger session expiration 083 TEST_UTIL.getConfiguration().setInt(HConstants.ZK_SESSION_TIMEOUT, ZK_SESSION_TIMEOUT); 084 // Start a cluster with 2 masters and 3 regionservers. 085 StartMiniClusterOption option = StartMiniClusterOption.builder() 086 .numMasters(2).numRegionServers(3).numDataNodes(3).build(); 087 TEST_UTIL.startMiniCluster(option); 088 } 089 090 @AfterClass 091 public static void afterAllTests() throws Exception { 092 TEST_UTIL.shutdownMiniCluster(); 093 clientZkCluster.shutdown(); 094 FileUtils.deleteDirectory(clientZkDir); 095 } 096 097 @Test 098 public void testBasicOperation() throws Exception { 099 TableName tn = TableName.valueOf(name.getMethodName()); 100 // create table 101 Connection conn = TEST_UTIL.getConnection(); 102 Admin admin = conn.getAdmin(); 103 HTable table = (HTable) conn.getTable(tn); 104 try { 105 ColumnFamilyDescriptorBuilder cfDescBuilder = 106 ColumnFamilyDescriptorBuilder.newBuilder(family); 107 TableDescriptorBuilder tableDescBuilder = 108 TableDescriptorBuilder.newBuilder(tn).setColumnFamily(cfDescBuilder.build()); 109 admin.createTable(tableDescBuilder.build()); 110 // test simple get and put 111 Put put = new Put(row); 112 put.addColumn(family, qualifier, value); 113 table.put(put); 114 Get get = new Get(row); 115 Result result = table.get(get); 116 LOG.debug("Result: " + Bytes.toString(result.getValue(family, qualifier))); 117 Assert.assertArrayEquals(value, result.getValue(family, qualifier)); 118 } finally { 119 admin.close(); 120 table.close(); 121 } 122 } 123 124 @Test 125 public void testMasterSwitch() throws Exception { 126 // get an admin instance and issue some request first 127 Connection conn = TEST_UTIL.getConnection(); 128 Admin admin = conn.getAdmin(); 129 LOG.debug("Tables: " + admin.listTableDescriptors()); 130 try { 131 MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster(); 132 // switch active master 133 HMaster master = cluster.getMaster(); 134 master.stopMaster(); 135 LOG.info("Stopped master {}", master.getServerName()); 136 while (!master.isShutDown()) { 137 Thread.sleep(200); 138 } 139 LOG.info("Shutdown master {}", master.getServerName()); 140 while (cluster.getMaster() == null || !cluster.getMaster().isInitialized()) { 141 LOG.info("Get master {}", cluster.getMaster() == null? "null": 142 cluster.getMaster().getServerName()); 143 Thread.sleep(200); 144 } 145 LOG.info("Got master {}", cluster.getMaster().getServerName()); 146 // confirm client access still works 147 Assert.assertTrue(admin.balance(false)); 148 } finally { 149 admin.close(); 150 } 151 } 152 153 @Test 154 public void testMetaRegionMove() throws Exception { 155 TableName tn = TableName.valueOf(name.getMethodName()); 156 // create table 157 Connection conn = TEST_UTIL.getConnection(); 158 Admin admin = conn.getAdmin(); 159 HTable table = (HTable) conn.getTable(tn); 160 try { 161 MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster(); 162 ColumnFamilyDescriptorBuilder cfDescBuilder = 163 ColumnFamilyDescriptorBuilder.newBuilder(family); 164 TableDescriptorBuilder tableDescBuilder = 165 TableDescriptorBuilder.newBuilder(tn).setColumnFamily(cfDescBuilder.build()); 166 admin.createTable(tableDescBuilder.build()); 167 // issue some requests to cache the region location 168 Put put = new Put(row); 169 put.addColumn(family, qualifier, value); 170 table.put(put); 171 Get get = new Get(row); 172 Result result = table.get(get); 173 // move meta region and confirm client could detect 174 ServerName destServerName = null; 175 for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) { 176 ServerName name = rst.getRegionServer().getServerName(); 177 if (!name.equals(cluster.getServerHoldingMeta())) { 178 destServerName = name; 179 break; 180 } 181 } 182 admin.move(RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes(), destServerName); 183 LOG.debug("Finished moving meta"); 184 // invalidate client cache 185 RegionInfo region = 186 table.getRegionLocator().getRegionLocation(row).getRegion(); 187 ServerName currentServer = cluster.getServerHoldingRegion(tn, region.getRegionName()); 188 for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) { 189 ServerName name = rst.getRegionServer().getServerName(); 190 if (!name.equals(currentServer)) { 191 destServerName = name; 192 break; 193 } 194 } 195 admin.move(region.getEncodedNameAsBytes(), destServerName); 196 LOG.debug("Finished moving user region"); 197 put = new Put(row); 198 put.addColumn(family, qualifier, newVal); 199 table.put(put); 200 result = table.get(get); 201 LOG.debug("Result: " + Bytes.toString(result.getValue(family, qualifier))); 202 Assert.assertArrayEquals(newVal, result.getValue(family, qualifier)); 203 } finally { 204 admin.close(); 205 table.close(); 206 } 207 } 208 209 @Test 210 public void testMetaMoveDuringClientZkClusterRestart() throws Exception { 211 TableName tn = TableName.valueOf(name.getMethodName()); 212 // create table 213 ClusterConnection conn = (ClusterConnection) TEST_UTIL.getConnection(); 214 Admin admin = conn.getAdmin(); 215 HTable table = (HTable) conn.getTable(tn); 216 try { 217 ColumnFamilyDescriptorBuilder cfDescBuilder = 218 ColumnFamilyDescriptorBuilder.newBuilder(family); 219 TableDescriptorBuilder tableDescBuilder = 220 TableDescriptorBuilder.newBuilder(tn).setColumnFamily(cfDescBuilder.build()); 221 admin.createTable(tableDescBuilder.build()); 222 // put some data 223 Put put = new Put(row); 224 put.addColumn(family, qualifier, value); 225 table.put(put); 226 // invalid connection cache 227 conn.clearRegionLocationCache(); 228 // stop client zk cluster 229 clientZkCluster.shutdown(); 230 // stop current meta server and confirm the server shutdown process 231 // is not affected by client ZK crash 232 MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster(); 233 int metaServerId = cluster.getServerWithMeta(); 234 HRegionServer metaServer = cluster.getRegionServer(metaServerId); 235 metaServer.stop("Stop current RS holding meta region"); 236 while (!metaServer.isShutDown()) { 237 Thread.sleep(200); 238 } 239 // wait for meta region online 240 AssignmentTestingUtil.waitForAssignment(cluster.getMaster().getAssignmentManager(), 241 RegionInfoBuilder.FIRST_META_REGIONINFO); 242 // wait some long time to make sure we will retry sync data to client ZK until data set 243 Thread.sleep(10000); 244 clientZkCluster.startup(clientZkDir); 245 // new request should pass 246 Get get = new Get(row); 247 Result result = table.get(get); 248 LOG.debug("Result: " + Bytes.toString(result.getValue(family, qualifier))); 249 Assert.assertArrayEquals(value, result.getValue(family, qualifier)); 250 } finally { 251 admin.close(); 252 table.close(); 253 } 254 } 255 256 @Test 257 public void testAsyncTable() throws Exception { 258 TableName tn = TableName.valueOf(name.getMethodName()); 259 ColumnFamilyDescriptorBuilder cfDescBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family); 260 TableDescriptorBuilder tableDescBuilder = 261 TableDescriptorBuilder.newBuilder(tn).setColumnFamily(cfDescBuilder.build()); 262 try (AsyncConnection ASYNC_CONN = 263 ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get()) { 264 ASYNC_CONN.getAdmin().createTable(tableDescBuilder.build()).get(); 265 AsyncTable<?> table = ASYNC_CONN.getTable(tn); 266 // put some data 267 Put put = new Put(row); 268 put.addColumn(family, qualifier, value); 269 table.put(put).get(); 270 // get and verify 271 Get get = new Get(row); 272 Result result = table.get(get).get(); 273 LOG.debug("Result: " + Bytes.toString(result.getValue(family, qualifier))); 274 Assert.assertArrayEquals(value, result.getValue(family, qualifier)); 275 } 276 } 277}