001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertArrayEquals;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertTrue;
023
024import java.io.File;
025import org.apache.commons.io.FileUtils;
026import org.apache.hadoop.hbase.HBaseClassTestRule;
027import org.apache.hadoop.hbase.HBaseTestingUtility;
028import org.apache.hadoop.hbase.HConstants;
029import org.apache.hadoop.hbase.MiniHBaseCluster;
030import org.apache.hadoop.hbase.ServerName;
031import org.apache.hadoop.hbase.StartMiniClusterOption;
032import org.apache.hadoop.hbase.TableName;
033import org.apache.hadoop.hbase.TableNameTestRule;
034import org.apache.hadoop.hbase.master.HMaster;
035import org.apache.hadoop.hbase.master.assignment.AssignmentTestingUtil;
036import org.apache.hadoop.hbase.regionserver.HRegionServer;
037import org.apache.hadoop.hbase.testclassification.ClientTests;
038import org.apache.hadoop.hbase.testclassification.MediumTests;
039import org.apache.hadoop.hbase.util.Bytes;
040import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
041import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
042import org.junit.AfterClass;
043import org.junit.BeforeClass;
044import org.junit.ClassRule;
045import org.junit.Rule;
046import org.junit.Test;
047import org.junit.experimental.categories.Category;
048import org.slf4j.Logger;
049import org.slf4j.LoggerFactory;
050
051@Category({ ClientTests.class, MediumTests.class })
052public class TestSeparateClientZKCluster {
053  private static final Logger LOG = LoggerFactory.getLogger(TestSeparateClientZKCluster.class);
054  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
055  private static final File clientZkDir =
056    new File(TEST_UTIL.getDataTestDir("TestSeparateClientZKCluster").toString());
057  private static final int ZK_SESSION_TIMEOUT = 5000;
058  private static MiniZooKeeperCluster clientZkCluster;
059
060  private final byte[] family = Bytes.toBytes("cf");
061  private final byte[] qualifier = Bytes.toBytes("c1");
062  private final byte[] row = Bytes.toBytes("row");
063  private final byte[] value = Bytes.toBytes("v1");
064  private final byte[] newVal = Bytes.toBytes("v2");
065
066  @Rule
067  public TableNameTestRule name = new TableNameTestRule();
068
069  @ClassRule
070  public static final HBaseClassTestRule CLASS_RULE =
071    HBaseClassTestRule.forClass(TestSeparateClientZKCluster.class);
072
073  @BeforeClass
074  public static void beforeAllTests() throws Exception {
075    int clientZkPort = 21828;
076    clientZkCluster = new MiniZooKeeperCluster(TEST_UTIL.getConfiguration());
077    clientZkCluster.setDefaultClientPort(clientZkPort);
078    clientZkCluster.startup(clientZkDir);
079    // reduce the retry number and start log counter
080    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
081    TEST_UTIL.getConfiguration().setInt("hbase.client.start.log.errors.counter", -1);
082    TEST_UTIL.getConfiguration().setInt("zookeeper.recovery.retry", 1);
083    // core settings for testing client ZK cluster
084    TEST_UTIL.getConfiguration().setClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
085      ZKConnectionRegistry.class, ConnectionRegistry.class);
086    TEST_UTIL.getConfiguration().set(HConstants.CLIENT_ZOOKEEPER_QUORUM, HConstants.LOCALHOST);
087    TEST_UTIL.getConfiguration().setInt(HConstants.CLIENT_ZOOKEEPER_CLIENT_PORT, clientZkPort);
088    // reduce zk session timeout to easier trigger session expiration
089    TEST_UTIL.getConfiguration().setInt(HConstants.ZK_SESSION_TIMEOUT, ZK_SESSION_TIMEOUT);
090    // Start a cluster with 2 masters and 3 regionservers.
091    StartMiniClusterOption option =
092      StartMiniClusterOption.builder().numMasters(2).numRegionServers(3).numDataNodes(3).build();
093    TEST_UTIL.startMiniCluster(option);
094  }
095
096  @AfterClass
097  public static void afterAllTests() throws Exception {
098    TEST_UTIL.shutdownMiniCluster();
099    clientZkCluster.shutdown();
100    FileUtils.deleteDirectory(clientZkDir);
101  }
102
103  @Test
104  public void testBasicOperation() throws Exception {
105    TableName tn = name.getTableName();
106    // create table
107    Connection conn = TEST_UTIL.getConnection();
108    try (Admin admin = conn.getAdmin(); Table table = conn.getTable(tn)) {
109      ColumnFamilyDescriptorBuilder cfDescBuilder =
110        ColumnFamilyDescriptorBuilder.newBuilder(family);
111      TableDescriptorBuilder tableDescBuilder =
112        TableDescriptorBuilder.newBuilder(tn).setColumnFamily(cfDescBuilder.build());
113      admin.createTable(tableDescBuilder.build());
114      // test simple get and put
115      Put put = new Put(row);
116      put.addColumn(family, qualifier, value);
117      table.put(put);
118      Get get = new Get(row);
119      Result result = table.get(get);
120      LOG.debug("Result: " + Bytes.toString(result.getValue(family, qualifier)));
121      assertArrayEquals(value, result.getValue(family, qualifier));
122    }
123  }
124
125  @Test
126  public void testMasterSwitch() throws Exception {
127    // get an admin instance and issue some request first
128    Connection conn = TEST_UTIL.getConnection();
129    try (Admin admin = conn.getAdmin()) {
130      LOG.debug("Tables: " + admin.listTableDescriptors());
131      MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
132      // switch active master
133      HMaster master = cluster.getMaster();
134      master.stopMaster();
135      LOG.info("Stopped master {}", master.getServerName());
136      while (!master.isShutDown()) {
137        Thread.sleep(200);
138      }
139      LOG.info("Shutdown master {}", master.getServerName());
140      while (cluster.getMaster() == null || !cluster.getMaster().isInitialized()) {
141        LOG.info("Get master {}",
142          cluster.getMaster() == null ? "null" : cluster.getMaster().getServerName());
143        Thread.sleep(200);
144      }
145      LOG.info("Got master {}", cluster.getMaster().getServerName());
146      // confirm client access still works
147      assertTrue(admin.balance(false));
148    }
149  }
150
151  @Test
152  public void testMetaRegionMove() throws Exception {
153    TableName tn = name.getTableName();
154    // create table
155    Connection conn = TEST_UTIL.getConnection();
156    try (Admin admin = conn.getAdmin();
157      Table table = conn.getTable(tn);
158      RegionLocator locator = conn.getRegionLocator(tn)) {
159      MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
160      ColumnFamilyDescriptorBuilder cfDescBuilder =
161        ColumnFamilyDescriptorBuilder.newBuilder(family);
162      TableDescriptorBuilder tableDescBuilder =
163        TableDescriptorBuilder.newBuilder(tn).setColumnFamily(cfDescBuilder.build());
164      admin.createTable(tableDescBuilder.build());
165      // issue some requests to cache the region location
166      Put put = new Put(row);
167      put.addColumn(family, qualifier, value);
168      table.put(put);
169      Get get = new Get(row);
170      Result result = table.get(get);
171      // move meta region and confirm client could detect
172      ServerName destServerName = null;
173      for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) {
174        ServerName name = rst.getRegionServer().getServerName();
175        if (!name.equals(cluster.getServerHoldingMeta())) {
176          destServerName = name;
177          break;
178        }
179      }
180      admin.move(RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes(), destServerName);
181      LOG.debug("Finished moving meta");
182      // invalidate client cache
183      RegionInfo region = locator.getRegionLocation(row).getRegion();
184      ServerName currentServer = cluster.getServerHoldingRegion(tn, region.getRegionName());
185      for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) {
186        ServerName name = rst.getRegionServer().getServerName();
187        if (!name.equals(currentServer)) {
188          destServerName = name;
189          break;
190        }
191      }
192      admin.move(region.getEncodedNameAsBytes(), destServerName);
193      LOG.debug("Finished moving user region");
194      put = new Put(row);
195      put.addColumn(family, qualifier, newVal);
196      table.put(put);
197      result = table.get(get);
198      LOG.debug("Result: " + Bytes.toString(result.getValue(family, qualifier)));
199      assertArrayEquals(newVal, result.getValue(family, qualifier));
200    }
201  }
202
203  @Test
204  public void testMetaMoveDuringClientZkClusterRestart() throws Exception {
205    TableName tn = name.getTableName();
206    // create table
207    Connection conn = TEST_UTIL.getConnection();
208    try (Admin admin = conn.getAdmin(); Table table = conn.getTable(tn)) {
209      ColumnFamilyDescriptorBuilder cfDescBuilder =
210        ColumnFamilyDescriptorBuilder.newBuilder(family);
211      TableDescriptorBuilder tableDescBuilder =
212        TableDescriptorBuilder.newBuilder(tn).setColumnFamily(cfDescBuilder.build());
213      admin.createTable(tableDescBuilder.build());
214      // put some data
215      Put put = new Put(row);
216      put.addColumn(family, qualifier, value);
217      table.put(put);
218      // invalid connection cache
219      conn.clearRegionLocationCache();
220      // stop client zk cluster
221      clientZkCluster.shutdown();
222      // stop current meta server and confirm the server shutdown process
223      // is not affected by client ZK crash
224      MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
225      int metaServerId = cluster.getServerWithMeta();
226      HRegionServer metaServer = cluster.getRegionServer(metaServerId);
227      metaServer.stop("Stop current RS holding meta region");
228      while (!metaServer.isShutDown()) {
229        Thread.sleep(200);
230      }
231      // wait for meta region online
232      AssignmentTestingUtil.waitForAssignment(cluster.getMaster().getAssignmentManager(),
233        RegionInfoBuilder.FIRST_META_REGIONINFO);
234      // wait some long time to make sure we will retry sync data to client ZK until data set
235      Thread.sleep(10000);
236      clientZkCluster.startup(clientZkDir);
237      // new request should pass
238      Get get = new Get(row);
239      Result result = table.get(get);
240      LOG.debug("Result: " + Bytes.toString(result.getValue(family, qualifier)));
241      assertArrayEquals(value, result.getValue(family, qualifier));
242    }
243  }
244
245  @Test
246  public void testAsyncTable() throws Exception {
247    TableName tn = name.getTableName();
248    ColumnFamilyDescriptorBuilder cfDescBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family);
249    TableDescriptorBuilder tableDescBuilder =
250      TableDescriptorBuilder.newBuilder(tn).setColumnFamily(cfDescBuilder.build());
251    try (AsyncConnection ASYNC_CONN =
252      ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get()) {
253      ASYNC_CONN.getAdmin().createTable(tableDescBuilder.build()).get();
254      AsyncTable<?> table = ASYNC_CONN.getTable(tn);
255      // put some data
256      Put put = new Put(row);
257      put.addColumn(family, qualifier, value);
258      table.put(put).get();
259      // get and verify
260      Get get = new Get(row);
261      Result result = table.get(get).get();
262      LOG.debug("Result: " + Bytes.toString(result.getValue(family, qualifier)));
263      assertArrayEquals(value, result.getValue(family, qualifier));
264    }
265  }
266
267  @Test
268  public void testChangeMetaReplicaCount() throws Exception {
269    Admin admin = TEST_UTIL.getAdmin();
270    try (RegionLocator locator =
271      TEST_UTIL.getConnection().getRegionLocator(TableName.META_TABLE_NAME)) {
272      assertEquals(1, locator.getAllRegionLocations().size());
273      HBaseTestingUtility.setReplicas(admin, TableName.META_TABLE_NAME, 3);
274      TEST_UTIL.waitFor(30000, () -> locator.getAllRegionLocations().size() == 3);
275      HBaseTestingUtility.setReplicas(admin, TableName.META_TABLE_NAME, 2);
276      TEST_UTIL.waitFor(30000, () -> locator.getAllRegionLocations().size() == 2);
277      HBaseTestingUtility.setReplicas(admin, TableName.META_TABLE_NAME, 1);
278      TEST_UTIL.waitFor(30000, () -> locator.getAllRegionLocations().size() == 1);
279    }
280  }
281}