001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import java.io.File;
021import org.apache.commons.io.FileUtils;
022import org.apache.hadoop.hbase.HBaseClassTestRule;
023import org.apache.hadoop.hbase.HBaseTestingUtility;
024import org.apache.hadoop.hbase.HConstants;
025import org.apache.hadoop.hbase.MiniHBaseCluster;
026import org.apache.hadoop.hbase.ServerName;
027import org.apache.hadoop.hbase.StartMiniClusterOption;
028import org.apache.hadoop.hbase.TableName;
029import org.apache.hadoop.hbase.master.HMaster;
030import org.apache.hadoop.hbase.master.assignment.AssignmentTestingUtil;
031import org.apache.hadoop.hbase.regionserver.HRegionServer;
032import org.apache.hadoop.hbase.testclassification.MediumTests;
033import org.apache.hadoop.hbase.util.Bytes;
034import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
035import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
036import org.junit.AfterClass;
037import org.junit.Assert;
038import org.junit.BeforeClass;
039import org.junit.ClassRule;
040import org.junit.Rule;
041import org.junit.Test;
042import org.junit.experimental.categories.Category;
043import org.junit.rules.TestName;
044import org.slf4j.Logger;
045import org.slf4j.LoggerFactory;
046
047@Category(MediumTests.class)
048public class TestSeparateClientZKCluster {
049  private static final Logger LOG = LoggerFactory.getLogger(TestSeparateClientZKCluster.class);
050  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
051  private static final File clientZkDir = new File("/tmp/TestSeparateClientZKCluster");
052  private static final int ZK_SESSION_TIMEOUT = 5000;
053  private static MiniZooKeeperCluster clientZkCluster;
054
055  private final byte[] family = Bytes.toBytes("cf");
056  private final byte[] qualifier = Bytes.toBytes("c1");
057  private final byte[] row = Bytes.toBytes("row");
058  private final byte[] value = Bytes.toBytes("v1");
059  private final byte[] newVal = Bytes.toBytes("v2");
060
061  @Rule
062  public TestName name = new TestName();
063
064  @ClassRule
065  public static final HBaseClassTestRule CLASS_RULE =
066      HBaseClassTestRule.forClass(TestSeparateClientZKCluster.class);
067
068  @BeforeClass
069  public static void beforeAllTests() throws Exception {
070    int clientZkPort = 21828;
071    clientZkCluster = new MiniZooKeeperCluster(TEST_UTIL.getConfiguration());
072    clientZkCluster.setDefaultClientPort(clientZkPort);
073    clientZkCluster.startup(clientZkDir);
074    // reduce the retry number and start log counter
075    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
076    TEST_UTIL.getConfiguration().setInt("hbase.client.start.log.errors.counter", -1);
077    TEST_UTIL.getConfiguration().setInt("zookeeper.recovery.retry", 1);
078    // core settings for testing client ZK cluster
079    TEST_UTIL.getConfiguration().set(HConstants.CLIENT_ZOOKEEPER_QUORUM, HConstants.LOCALHOST);
080    TEST_UTIL.getConfiguration().setInt(HConstants.CLIENT_ZOOKEEPER_CLIENT_PORT, clientZkPort);
081    // reduce zk session timeout to easier trigger session expiration
082    TEST_UTIL.getConfiguration().setInt(HConstants.ZK_SESSION_TIMEOUT, ZK_SESSION_TIMEOUT);
083    // Start a cluster with 2 masters and 3 regionservers.
084    StartMiniClusterOption option = StartMiniClusterOption.builder()
085        .numMasters(2).numRegionServers(3).numDataNodes(3).build();
086    TEST_UTIL.startMiniCluster(option);
087  }
088
089  @AfterClass
090  public static void afterAllTests() throws Exception {
091    TEST_UTIL.shutdownMiniCluster();
092    clientZkCluster.shutdown();
093    FileUtils.deleteDirectory(clientZkDir);
094  }
095
096  @Test
097  public void testBasicOperation() throws Exception {
098    TableName tn = TableName.valueOf(name.getMethodName());
099    // create table
100    Connection conn = TEST_UTIL.getConnection();
101    Admin admin = conn.getAdmin();
102    HTable table = (HTable) conn.getTable(tn);
103    try {
104      ColumnFamilyDescriptorBuilder cfDescBuilder =
105          ColumnFamilyDescriptorBuilder.newBuilder(family);
106      TableDescriptorBuilder tableDescBuilder =
107          TableDescriptorBuilder.newBuilder(tn).setColumnFamily(cfDescBuilder.build());
108      admin.createTable(tableDescBuilder.build());
109      // test simple get and put
110      Put put = new Put(row);
111      put.addColumn(family, qualifier, value);
112      table.put(put);
113      Get get = new Get(row);
114      Result result = table.get(get);
115      LOG.debug("Result: " + Bytes.toString(result.getValue(family, qualifier)));
116      Assert.assertArrayEquals(value, result.getValue(family, qualifier));
117    } finally {
118      admin.close();
119      table.close();
120    }
121  }
122
123  @Test
124  public void testMasterSwitch() throws Exception {
125    // get an admin instance and issue some request first
126    Connection conn = TEST_UTIL.getConnection();
127    Admin admin = conn.getAdmin();
128    LOG.debug("Tables: " + admin.listTableDescriptors());
129    try {
130      MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
131      // switch active master
132      HMaster master = cluster.getMaster();
133      master.stopMaster();
134      while (!master.isShutDown()) {
135        Thread.sleep(200);
136      }
137      while (cluster.getMaster() == null || !cluster.getMaster().isInitialized()) {
138        Thread.sleep(200);
139      }
140      // confirm client access still works
141      Assert.assertTrue(admin.balance(false));
142    } finally {
143      admin.close();
144    }
145  }
146
147  @Test
148  public void testMetaRegionMove() throws Exception {
149    TableName tn = TableName.valueOf(name.getMethodName());
150    // create table
151    Connection conn = TEST_UTIL.getConnection();
152    Admin admin = conn.getAdmin();
153    HTable table = (HTable) conn.getTable(tn);
154    try {
155      MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
156      ColumnFamilyDescriptorBuilder cfDescBuilder =
157          ColumnFamilyDescriptorBuilder.newBuilder(family);
158      TableDescriptorBuilder tableDescBuilder =
159          TableDescriptorBuilder.newBuilder(tn).setColumnFamily(cfDescBuilder.build());
160      admin.createTable(tableDescBuilder.build());
161      // issue some requests to cache the region location
162      Put put = new Put(row);
163      put.addColumn(family, qualifier, value);
164      table.put(put);
165      Get get = new Get(row);
166      Result result = table.get(get);
167      // move meta region and confirm client could detect
168      ServerName destServerName = null;
169      for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) {
170        ServerName name = rst.getRegionServer().getServerName();
171        if (!name.equals(cluster.getServerHoldingMeta())) {
172          destServerName = name;
173          break;
174        }
175      }
176      admin.move(RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes(), destServerName);
177      LOG.debug("Finished moving meta");
178      // invalidate client cache
179      RegionInfo region =
180          table.getRegionLocator().getRegionLocation(row).getRegion();
181      ServerName currentServer = cluster.getServerHoldingRegion(tn, region.getRegionName());
182      for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) {
183        ServerName name = rst.getRegionServer().getServerName();
184        if (!name.equals(currentServer)) {
185          destServerName = name;
186          break;
187        }
188      }
189      admin.move(region.getEncodedNameAsBytes(), destServerName);
190      LOG.debug("Finished moving user region");
191      put = new Put(row);
192      put.addColumn(family, qualifier, newVal);
193      table.put(put);
194      result = table.get(get);
195      LOG.debug("Result: " + Bytes.toString(result.getValue(family, qualifier)));
196      Assert.assertArrayEquals(newVal, result.getValue(family, qualifier));
197    } finally {
198      admin.close();
199      table.close();
200    }
201  }
202
203  @Test
204  public void testMetaMoveDuringClientZkClusterRestart() throws Exception {
205    TableName tn = TableName.valueOf(name.getMethodName());
206    // create table
207    ClusterConnection conn = (ClusterConnection) TEST_UTIL.getConnection();
208    Admin admin = conn.getAdmin();
209    HTable table = (HTable) conn.getTable(tn);
210    try {
211      ColumnFamilyDescriptorBuilder cfDescBuilder =
212          ColumnFamilyDescriptorBuilder.newBuilder(family);
213      TableDescriptorBuilder tableDescBuilder =
214          TableDescriptorBuilder.newBuilder(tn).setColumnFamily(cfDescBuilder.build());
215      admin.createTable(tableDescBuilder.build());
216      // put some data
217      Put put = new Put(row);
218      put.addColumn(family, qualifier, value);
219      table.put(put);
220      // invalid connection cache
221      conn.clearRegionLocationCache();
222      // stop client zk cluster
223      clientZkCluster.shutdown();
224      // stop current meta server and confirm the server shutdown process
225      // is not affected by client ZK crash
226      MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
227      int metaServerId = cluster.getServerWithMeta();
228      HRegionServer metaServer = cluster.getRegionServer(metaServerId);
229      metaServer.stop("Stop current RS holding meta region");
230      while (!metaServer.isShutDown()) {
231        Thread.sleep(200);
232      }
233      // wait for meta region online
234      AssignmentTestingUtil.waitForAssignment(cluster.getMaster().getAssignmentManager(),
235        RegionInfoBuilder.FIRST_META_REGIONINFO);
236      // wait some long time to make sure we will retry sync data to client ZK until data set
237      Thread.sleep(10000);
238      clientZkCluster.startup(clientZkDir);
239      // new request should pass
240      Get get = new Get(row);
241      Result result = table.get(get);
242      LOG.debug("Result: " + Bytes.toString(result.getValue(family, qualifier)));
243      Assert.assertArrayEquals(value, result.getValue(family, qualifier));
244    } finally {
245      admin.close();
246      table.close();
247    }
248  }
249
250  @Test
251  public void testAsyncTable() throws Exception {
252    TableName tn = TableName.valueOf(name.getMethodName());
253    ColumnFamilyDescriptorBuilder cfDescBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family);
254    TableDescriptorBuilder tableDescBuilder =
255        TableDescriptorBuilder.newBuilder(tn).setColumnFamily(cfDescBuilder.build());
256    try (AsyncConnection ASYNC_CONN =
257        ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get()) {
258      ASYNC_CONN.getAdmin().createTable(tableDescBuilder.build()).get();
259      AsyncTable<?> table = ASYNC_CONN.getTable(tn);
260      // put some data
261      Put put = new Put(row);
262      put.addColumn(family, qualifier, value);
263      table.put(put).get();
264      // get and verify
265      Get get = new Get(row);
266      Result result = table.get(get).get();
267      LOG.debug("Result: " + Bytes.toString(result.getValue(family, qualifier)));
268      Assert.assertArrayEquals(value, result.getValue(family, qualifier));
269    }
270  }
271}