001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import java.io.File; 021import org.apache.commons.io.FileUtils; 022import org.apache.hadoop.hbase.HBaseClassTestRule; 023import org.apache.hadoop.hbase.HBaseTestingUtility; 024import org.apache.hadoop.hbase.HConstants; 025import org.apache.hadoop.hbase.MiniHBaseCluster; 026import org.apache.hadoop.hbase.ServerName; 027import org.apache.hadoop.hbase.StartMiniClusterOption; 028import org.apache.hadoop.hbase.TableName; 029import org.apache.hadoop.hbase.master.HMaster; 030import org.apache.hadoop.hbase.master.assignment.AssignmentTestingUtil; 031import org.apache.hadoop.hbase.regionserver.HRegionServer; 032import org.apache.hadoop.hbase.testclassification.MediumTests; 033import org.apache.hadoop.hbase.util.Bytes; 034import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 035import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; 036import org.junit.AfterClass; 037import org.junit.Assert; 038import org.junit.BeforeClass; 039import org.junit.ClassRule; 040import org.junit.Rule; 041import org.junit.Test; 042import org.junit.experimental.categories.Category; 043import org.junit.rules.TestName; 044import org.slf4j.Logger; 045import org.slf4j.LoggerFactory; 046 047@Category(MediumTests.class) 048public class TestSeparateClientZKCluster { 049 private static final Logger LOG = LoggerFactory.getLogger(TestSeparateClientZKCluster.class); 050 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 051 private static final File clientZkDir = new File("/tmp/TestSeparateClientZKCluster"); 052 private static final int ZK_SESSION_TIMEOUT = 5000; 053 private static MiniZooKeeperCluster clientZkCluster; 054 055 private final byte[] family = Bytes.toBytes("cf"); 056 private final byte[] qualifier = Bytes.toBytes("c1"); 057 private final byte[] row = Bytes.toBytes("row"); 058 private final byte[] value = Bytes.toBytes("v1"); 059 private final byte[] newVal = Bytes.toBytes("v2"); 060 061 @Rule 062 public TestName name = new TestName(); 063 064 @ClassRule 065 public static final HBaseClassTestRule CLASS_RULE = 066 HBaseClassTestRule.forClass(TestSeparateClientZKCluster.class); 067 068 @BeforeClass 069 public static void beforeAllTests() throws Exception { 070 int clientZkPort = 21828; 071 clientZkCluster = new MiniZooKeeperCluster(TEST_UTIL.getConfiguration()); 072 clientZkCluster.setDefaultClientPort(clientZkPort); 073 clientZkCluster.startup(clientZkDir); 074 // reduce the retry number and start log counter 075 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); 076 TEST_UTIL.getConfiguration().setInt("hbase.client.start.log.errors.counter", -1); 077 TEST_UTIL.getConfiguration().setInt("zookeeper.recovery.retry", 1); 078 // core settings for testing client ZK cluster 079 TEST_UTIL.getConfiguration().set(HConstants.CLIENT_ZOOKEEPER_QUORUM, HConstants.LOCALHOST); 080 TEST_UTIL.getConfiguration().setInt(HConstants.CLIENT_ZOOKEEPER_CLIENT_PORT, clientZkPort); 081 // reduce zk session timeout to easier trigger session expiration 082 TEST_UTIL.getConfiguration().setInt(HConstants.ZK_SESSION_TIMEOUT, ZK_SESSION_TIMEOUT); 083 // Start a cluster with 2 masters and 3 regionservers. 084 StartMiniClusterOption option = StartMiniClusterOption.builder() 085 .numMasters(2).numRegionServers(3).numDataNodes(3).build(); 086 TEST_UTIL.startMiniCluster(option); 087 } 088 089 @AfterClass 090 public static void afterAllTests() throws Exception { 091 TEST_UTIL.shutdownMiniCluster(); 092 clientZkCluster.shutdown(); 093 FileUtils.deleteDirectory(clientZkDir); 094 } 095 096 @Test 097 public void testBasicOperation() throws Exception { 098 TableName tn = TableName.valueOf(name.getMethodName()); 099 // create table 100 Connection conn = TEST_UTIL.getConnection(); 101 Admin admin = conn.getAdmin(); 102 HTable table = (HTable) conn.getTable(tn); 103 try { 104 ColumnFamilyDescriptorBuilder cfDescBuilder = 105 ColumnFamilyDescriptorBuilder.newBuilder(family); 106 TableDescriptorBuilder tableDescBuilder = 107 TableDescriptorBuilder.newBuilder(tn).setColumnFamily(cfDescBuilder.build()); 108 admin.createTable(tableDescBuilder.build()); 109 // test simple get and put 110 Put put = new Put(row); 111 put.addColumn(family, qualifier, value); 112 table.put(put); 113 Get get = new Get(row); 114 Result result = table.get(get); 115 LOG.debug("Result: " + Bytes.toString(result.getValue(family, qualifier))); 116 Assert.assertArrayEquals(value, result.getValue(family, qualifier)); 117 } finally { 118 admin.close(); 119 table.close(); 120 } 121 } 122 123 @Test 124 public void testMasterSwitch() throws Exception { 125 // get an admin instance and issue some request first 126 Connection conn = TEST_UTIL.getConnection(); 127 Admin admin = conn.getAdmin(); 128 LOG.debug("Tables: " + admin.listTableDescriptors()); 129 try { 130 MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster(); 131 // switch active master 132 HMaster master = cluster.getMaster(); 133 master.stopMaster(); 134 while (!master.isShutDown()) { 135 Thread.sleep(200); 136 } 137 while (cluster.getMaster() == null || !cluster.getMaster().isInitialized()) { 138 Thread.sleep(200); 139 } 140 // confirm client access still works 141 Assert.assertTrue(admin.balance(false)); 142 } finally { 143 admin.close(); 144 } 145 } 146 147 @Test 148 public void testMetaRegionMove() throws Exception { 149 TableName tn = TableName.valueOf(name.getMethodName()); 150 // create table 151 Connection conn = TEST_UTIL.getConnection(); 152 Admin admin = conn.getAdmin(); 153 HTable table = (HTable) conn.getTable(tn); 154 try { 155 MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster(); 156 ColumnFamilyDescriptorBuilder cfDescBuilder = 157 ColumnFamilyDescriptorBuilder.newBuilder(family); 158 TableDescriptorBuilder tableDescBuilder = 159 TableDescriptorBuilder.newBuilder(tn).setColumnFamily(cfDescBuilder.build()); 160 admin.createTable(tableDescBuilder.build()); 161 // issue some requests to cache the region location 162 Put put = new Put(row); 163 put.addColumn(family, qualifier, value); 164 table.put(put); 165 Get get = new Get(row); 166 Result result = table.get(get); 167 // move meta region and confirm client could detect 168 ServerName destServerName = null; 169 for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) { 170 ServerName name = rst.getRegionServer().getServerName(); 171 if (!name.equals(cluster.getServerHoldingMeta())) { 172 destServerName = name; 173 break; 174 } 175 } 176 admin.move(RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes(), destServerName); 177 LOG.debug("Finished moving meta"); 178 // invalidate client cache 179 RegionInfo region = 180 table.getRegionLocator().getRegionLocation(row).getRegion(); 181 ServerName currentServer = cluster.getServerHoldingRegion(tn, region.getRegionName()); 182 for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) { 183 ServerName name = rst.getRegionServer().getServerName(); 184 if (!name.equals(currentServer)) { 185 destServerName = name; 186 break; 187 } 188 } 189 admin.move(region.getEncodedNameAsBytes(), destServerName); 190 LOG.debug("Finished moving user region"); 191 put = new Put(row); 192 put.addColumn(family, qualifier, newVal); 193 table.put(put); 194 result = table.get(get); 195 LOG.debug("Result: " + Bytes.toString(result.getValue(family, qualifier))); 196 Assert.assertArrayEquals(newVal, result.getValue(family, qualifier)); 197 } finally { 198 admin.close(); 199 table.close(); 200 } 201 } 202 203 @Test 204 public void testMetaMoveDuringClientZkClusterRestart() throws Exception { 205 TableName tn = TableName.valueOf(name.getMethodName()); 206 // create table 207 ClusterConnection conn = (ClusterConnection) TEST_UTIL.getConnection(); 208 Admin admin = conn.getAdmin(); 209 HTable table = (HTable) conn.getTable(tn); 210 try { 211 ColumnFamilyDescriptorBuilder cfDescBuilder = 212 ColumnFamilyDescriptorBuilder.newBuilder(family); 213 TableDescriptorBuilder tableDescBuilder = 214 TableDescriptorBuilder.newBuilder(tn).setColumnFamily(cfDescBuilder.build()); 215 admin.createTable(tableDescBuilder.build()); 216 // put some data 217 Put put = new Put(row); 218 put.addColumn(family, qualifier, value); 219 table.put(put); 220 // invalid connection cache 221 conn.clearRegionLocationCache(); 222 // stop client zk cluster 223 clientZkCluster.shutdown(); 224 // stop current meta server and confirm the server shutdown process 225 // is not affected by client ZK crash 226 MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster(); 227 int metaServerId = cluster.getServerWithMeta(); 228 HRegionServer metaServer = cluster.getRegionServer(metaServerId); 229 metaServer.stop("Stop current RS holding meta region"); 230 while (!metaServer.isShutDown()) { 231 Thread.sleep(200); 232 } 233 // wait for meta region online 234 AssignmentTestingUtil.waitForAssignment(cluster.getMaster().getAssignmentManager(), 235 RegionInfoBuilder.FIRST_META_REGIONINFO); 236 // wait some long time to make sure we will retry sync data to client ZK until data set 237 Thread.sleep(10000); 238 clientZkCluster.startup(clientZkDir); 239 // new request should pass 240 Get get = new Get(row); 241 Result result = table.get(get); 242 LOG.debug("Result: " + Bytes.toString(result.getValue(family, qualifier))); 243 Assert.assertArrayEquals(value, result.getValue(family, qualifier)); 244 } finally { 245 admin.close(); 246 table.close(); 247 } 248 } 249 250 @Test 251 public void testAsyncTable() throws Exception { 252 TableName tn = TableName.valueOf(name.getMethodName()); 253 ColumnFamilyDescriptorBuilder cfDescBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family); 254 TableDescriptorBuilder tableDescBuilder = 255 TableDescriptorBuilder.newBuilder(tn).setColumnFamily(cfDescBuilder.build()); 256 try (AsyncConnection ASYNC_CONN = 257 ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get()) { 258 ASYNC_CONN.getAdmin().createTable(tableDescBuilder.build()).get(); 259 AsyncTable<?> table = ASYNC_CONN.getTable(tn); 260 // put some data 261 Put put = new Put(row); 262 put.addColumn(family, qualifier, value); 263 table.put(put).get(); 264 // get and verify 265 Get get = new Get(row); 266 Result result = table.get(get).get(); 267 LOG.debug("Result: " + Bytes.toString(result.getValue(family, qualifier))); 268 Assert.assertArrayEquals(value, result.getValue(family, qualifier)); 269 } 270 } 271}