001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import java.io.File; 021import org.apache.commons.io.FileUtils; 022import org.apache.hadoop.hbase.HBaseClassTestRule; 023import org.apache.hadoop.hbase.HBaseTestingUtility; 024import org.apache.hadoop.hbase.HConstants; 025import org.apache.hadoop.hbase.MiniHBaseCluster; 026import org.apache.hadoop.hbase.ServerName; 027import org.apache.hadoop.hbase.TableName; 028import org.apache.hadoop.hbase.master.HMaster; 029import org.apache.hadoop.hbase.master.assignment.AssignmentTestingUtil; 030import org.apache.hadoop.hbase.regionserver.HRegionServer; 031import org.apache.hadoop.hbase.testclassification.MediumTests; 032import org.apache.hadoop.hbase.util.Bytes; 033import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 034import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; 035import org.junit.AfterClass; 036import org.junit.Assert; 037import org.junit.BeforeClass; 038import org.junit.ClassRule; 039import org.junit.Rule; 040import org.junit.Test; 041import org.junit.experimental.categories.Category; 042import org.junit.rules.TestName; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046@Category(MediumTests.class) 047public class TestSeparateClientZKCluster { 048 private static final Logger LOG = LoggerFactory.getLogger(TestSeparateClientZKCluster.class); 049 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 050 private static final File clientZkDir = new File("/tmp/TestSeparateClientZKCluster"); 051 private static final int ZK_SESSION_TIMEOUT = 5000; 052 private static MiniZooKeeperCluster clientZkCluster; 053 054 private final byte[] family = Bytes.toBytes("cf"); 055 private final byte[] qualifier = Bytes.toBytes("c1"); 056 private final byte[] row = Bytes.toBytes("row"); 057 private final byte[] value = Bytes.toBytes("v1"); 058 private final byte[] newVal = Bytes.toBytes("v2"); 059 060 @Rule 061 public TestName name = new TestName(); 062 063 @ClassRule 064 public static final HBaseClassTestRule CLASS_RULE = 065 HBaseClassTestRule.forClass(TestSeparateClientZKCluster.class); 066 067 @BeforeClass 068 public static void beforeAllTests() throws Exception { 069 int clientZkPort = 21828; 070 clientZkCluster = new MiniZooKeeperCluster(TEST_UTIL.getConfiguration()); 071 clientZkCluster.setDefaultClientPort(clientZkPort); 072 clientZkCluster.startup(clientZkDir); 073 // reduce the retry number and start log counter 074 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); 075 TEST_UTIL.getConfiguration().setInt("hbase.client.start.log.errors.counter", -1); 076 TEST_UTIL.getConfiguration().setInt("zookeeper.recovery.retry", 1); 077 // core settings for testing client ZK cluster 078 TEST_UTIL.getConfiguration().set(HConstants.CLIENT_ZOOKEEPER_QUORUM, HConstants.LOCALHOST); 079 TEST_UTIL.getConfiguration().setInt(HConstants.CLIENT_ZOOKEEPER_CLIENT_PORT, clientZkPort); 080 // reduce zk session timeout to easier trigger session expiration 081 TEST_UTIL.getConfiguration().setInt(HConstants.ZK_SESSION_TIMEOUT, ZK_SESSION_TIMEOUT); 082 // Start a cluster with 2 masters and 3 regionservers. 083 TEST_UTIL.startMiniCluster(2, 3); 084 } 085 086 @AfterClass 087 public static void afterAllTests() throws Exception { 088 TEST_UTIL.shutdownMiniCluster(); 089 clientZkCluster.shutdown(); 090 FileUtils.deleteDirectory(clientZkDir); 091 } 092 093 @Test 094 public void testBasicOperation() throws Exception { 095 TableName tn = TableName.valueOf(name.getMethodName()); 096 // create table 097 Connection conn = TEST_UTIL.getConnection(); 098 Admin admin = conn.getAdmin(); 099 HTable table = (HTable) conn.getTable(tn); 100 try { 101 ColumnFamilyDescriptorBuilder cfDescBuilder = 102 ColumnFamilyDescriptorBuilder.newBuilder(family); 103 TableDescriptorBuilder tableDescBuilder = 104 TableDescriptorBuilder.newBuilder(tn).setColumnFamily(cfDescBuilder.build()); 105 admin.createTable(tableDescBuilder.build()); 106 // test simple get and put 107 Put put = new Put(row); 108 put.addColumn(family, qualifier, value); 109 table.put(put); 110 Get get = new Get(row); 111 Result result = table.get(get); 112 LOG.debug("Result: " + Bytes.toString(result.getValue(family, qualifier))); 113 Assert.assertArrayEquals(value, result.getValue(family, qualifier)); 114 } finally { 115 admin.close(); 116 table.close(); 117 } 118 } 119 120 @Test 121 public void testMasterSwitch() throws Exception { 122 // get an admin instance and issue some request first 123 Connection conn = TEST_UTIL.getConnection(); 124 Admin admin = conn.getAdmin(); 125 LOG.debug("Tables: " + admin.listTableDescriptors()); 126 try { 127 MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster(); 128 // switch active master 129 HMaster master = cluster.getMaster(); 130 master.stopMaster(); 131 while (!master.isShutDown()) { 132 Thread.sleep(200); 133 } 134 while (cluster.getMaster() == null || !cluster.getMaster().isInitialized()) { 135 Thread.sleep(200); 136 } 137 // confirm client access still works 138 Assert.assertTrue(admin.balance(false)); 139 } finally { 140 admin.close(); 141 } 142 } 143 144 @Test 145 public void testMetaRegionMove() throws Exception { 146 TableName tn = TableName.valueOf(name.getMethodName()); 147 // create table 148 Connection conn = TEST_UTIL.getConnection(); 149 Admin admin = conn.getAdmin(); 150 HTable table = (HTable) conn.getTable(tn); 151 try { 152 MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster(); 153 ColumnFamilyDescriptorBuilder cfDescBuilder = 154 ColumnFamilyDescriptorBuilder.newBuilder(family); 155 TableDescriptorBuilder tableDescBuilder = 156 TableDescriptorBuilder.newBuilder(tn).setColumnFamily(cfDescBuilder.build()); 157 admin.createTable(tableDescBuilder.build()); 158 // issue some requests to cache the region location 159 Put put = new Put(row); 160 put.addColumn(family, qualifier, value); 161 table.put(put); 162 Get get = new Get(row); 163 Result result = table.get(get); 164 // move meta region and confirm client could detect 165 byte[] destServerName = null; 166 for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) { 167 ServerName name = rst.getRegionServer().getServerName(); 168 if (!name.equals(cluster.getServerHoldingMeta())) { 169 destServerName = Bytes.toBytes(name.getServerName()); 170 break; 171 } 172 } 173 admin.move(RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes(), destServerName); 174 LOG.debug("Finished moving meta"); 175 // invalidate client cache 176 RegionInfo region = 177 table.getRegionLocator().getRegionLocation(row).getRegion(); 178 ServerName currentServer = cluster.getServerHoldingRegion(tn, region.getRegionName()); 179 for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) { 180 ServerName name = rst.getRegionServer().getServerName(); 181 if (!name.equals(currentServer)) { 182 destServerName = Bytes.toBytes(name.getServerName()); 183 break; 184 } 185 } 186 admin.move(region.getEncodedNameAsBytes(), destServerName); 187 LOG.debug("Finished moving user region"); 188 put = new Put(row); 189 put.addColumn(family, qualifier, newVal); 190 table.put(put); 191 result = table.get(get); 192 LOG.debug("Result: " + Bytes.toString(result.getValue(family, qualifier))); 193 Assert.assertArrayEquals(newVal, result.getValue(family, qualifier)); 194 } finally { 195 admin.close(); 196 table.close(); 197 } 198 } 199 200 @Test 201 public void testMetaMoveDuringClientZkClusterRestart() throws Exception { 202 TableName tn = TableName.valueOf(name.getMethodName()); 203 // create table 204 ClusterConnection conn = (ClusterConnection) TEST_UTIL.getConnection(); 205 Admin admin = conn.getAdmin(); 206 HTable table = (HTable) conn.getTable(tn); 207 try { 208 ColumnFamilyDescriptorBuilder cfDescBuilder = 209 ColumnFamilyDescriptorBuilder.newBuilder(family); 210 TableDescriptorBuilder tableDescBuilder = 211 TableDescriptorBuilder.newBuilder(tn).setColumnFamily(cfDescBuilder.build()); 212 admin.createTable(tableDescBuilder.build()); 213 // put some data 214 Put put = new Put(row); 215 put.addColumn(family, qualifier, value); 216 table.put(put); 217 // invalid connection cache 218 conn.clearRegionCache(); 219 // stop client zk cluster 220 clientZkCluster.shutdown(); 221 // stop current meta server and confirm the server shutdown process 222 // is not affected by client ZK crash 223 MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster(); 224 int metaServerId = cluster.getServerWithMeta(); 225 HRegionServer metaServer = cluster.getRegionServer(metaServerId); 226 metaServer.stop("Stop current RS holding meta region"); 227 while (!metaServer.isShutDown()) { 228 Thread.sleep(200); 229 } 230 // wait for meta region online 231 AssignmentTestingUtil.waitForAssignment(cluster.getMaster().getAssignmentManager(), 232 RegionInfoBuilder.FIRST_META_REGIONINFO); 233 // wait some long time to make sure we will retry sync data to client ZK until data set 234 Thread.sleep(10000); 235 clientZkCluster.startup(clientZkDir); 236 // new request should pass 237 Get get = new Get(row); 238 Result result = table.get(get); 239 LOG.debug("Result: " + Bytes.toString(result.getValue(family, qualifier))); 240 Assert.assertArrayEquals(value, result.getValue(family, qualifier)); 241 } finally { 242 admin.close(); 243 table.close(); 244 } 245 } 246 247 @Test 248 public void testAsyncTable() throws Exception { 249 TableName tn = TableName.valueOf(name.getMethodName()); 250 ColumnFamilyDescriptorBuilder cfDescBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family); 251 TableDescriptorBuilder tableDescBuilder = 252 TableDescriptorBuilder.newBuilder(tn).setColumnFamily(cfDescBuilder.build()); 253 try (AsyncConnection ASYNC_CONN = 254 ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get()) { 255 ASYNC_CONN.getAdmin().createTable(tableDescBuilder.build()).get(); 256 AsyncTable<?> table = ASYNC_CONN.getTable(tn); 257 // put some data 258 Put put = new Put(row); 259 put.addColumn(family, qualifier, value); 260 table.put(put).get(); 261 // get and verify 262 Get get = new Get(row); 263 Result result = table.get(get).get(); 264 LOG.debug("Result: " + Bytes.toString(result.getValue(family, qualifier))); 265 Assert.assertArrayEquals(value, result.getValue(family, qualifier)); 266 } 267 } 268}