001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import java.io.File;
021import org.apache.commons.io.FileUtils;
022import org.apache.hadoop.hbase.HBaseClassTestRule;
023import org.apache.hadoop.hbase.HBaseTestingUtility;
024import org.apache.hadoop.hbase.HConstants;
025import org.apache.hadoop.hbase.MiniHBaseCluster;
026import org.apache.hadoop.hbase.ServerName;
027import org.apache.hadoop.hbase.TableName;
028import org.apache.hadoop.hbase.master.HMaster;
029import org.apache.hadoop.hbase.master.assignment.AssignmentTestingUtil;
030import org.apache.hadoop.hbase.regionserver.HRegionServer;
031import org.apache.hadoop.hbase.testclassification.MediumTests;
032import org.apache.hadoop.hbase.util.Bytes;
033import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
034import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
035import org.junit.AfterClass;
036import org.junit.Assert;
037import org.junit.BeforeClass;
038import org.junit.ClassRule;
039import org.junit.Rule;
040import org.junit.Test;
041import org.junit.experimental.categories.Category;
042import org.junit.rules.TestName;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046@Category(MediumTests.class)
047public class TestSeparateClientZKCluster {
048  private static final Logger LOG = LoggerFactory.getLogger(TestSeparateClientZKCluster.class);
049  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
050  private static final File clientZkDir = new File("/tmp/TestSeparateClientZKCluster");
051  private static final int ZK_SESSION_TIMEOUT = 5000;
052  private static MiniZooKeeperCluster clientZkCluster;
053
054  private final byte[] family = Bytes.toBytes("cf");
055  private final byte[] qualifier = Bytes.toBytes("c1");
056  private final byte[] row = Bytes.toBytes("row");
057  private final byte[] value = Bytes.toBytes("v1");
058  private final byte[] newVal = Bytes.toBytes("v2");
059
060  @Rule
061  public TestName name = new TestName();
062
063  @ClassRule
064  public static final HBaseClassTestRule CLASS_RULE =
065      HBaseClassTestRule.forClass(TestSeparateClientZKCluster.class);
066
067  @BeforeClass
068  public static void beforeAllTests() throws Exception {
069    int clientZkPort = 21828;
070    clientZkCluster = new MiniZooKeeperCluster(TEST_UTIL.getConfiguration());
071    clientZkCluster.setDefaultClientPort(clientZkPort);
072    clientZkCluster.startup(clientZkDir);
073    // reduce the retry number and start log counter
074    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
075    TEST_UTIL.getConfiguration().setInt("hbase.client.start.log.errors.counter", -1);
076    TEST_UTIL.getConfiguration().setInt("zookeeper.recovery.retry", 1);
077    // core settings for testing client ZK cluster
078    TEST_UTIL.getConfiguration().set(HConstants.CLIENT_ZOOKEEPER_QUORUM, HConstants.LOCALHOST);
079    TEST_UTIL.getConfiguration().setInt(HConstants.CLIENT_ZOOKEEPER_CLIENT_PORT, clientZkPort);
080    // reduce zk session timeout to easier trigger session expiration
081    TEST_UTIL.getConfiguration().setInt(HConstants.ZK_SESSION_TIMEOUT, ZK_SESSION_TIMEOUT);
082    // Start a cluster with 2 masters and 3 regionservers.
083    TEST_UTIL.startMiniCluster(2, 3);
084  }
085
086  @AfterClass
087  public static void afterAllTests() throws Exception {
088    TEST_UTIL.shutdownMiniCluster();
089    clientZkCluster.shutdown();
090    FileUtils.deleteDirectory(clientZkDir);
091  }
092
093  @Test
094  public void testBasicOperation() throws Exception {
095    TableName tn = TableName.valueOf(name.getMethodName());
096    // create table
097    Connection conn = TEST_UTIL.getConnection();
098    Admin admin = conn.getAdmin();
099    HTable table = (HTable) conn.getTable(tn);
100    try {
101      ColumnFamilyDescriptorBuilder cfDescBuilder =
102          ColumnFamilyDescriptorBuilder.newBuilder(family);
103      TableDescriptorBuilder tableDescBuilder =
104          TableDescriptorBuilder.newBuilder(tn).setColumnFamily(cfDescBuilder.build());
105      admin.createTable(tableDescBuilder.build());
106      // test simple get and put
107      Put put = new Put(row);
108      put.addColumn(family, qualifier, value);
109      table.put(put);
110      Get get = new Get(row);
111      Result result = table.get(get);
112      LOG.debug("Result: " + Bytes.toString(result.getValue(family, qualifier)));
113      Assert.assertArrayEquals(value, result.getValue(family, qualifier));
114    } finally {
115      admin.close();
116      table.close();
117    }
118  }
119
120  @Test
121  public void testMasterSwitch() throws Exception {
122    // get an admin instance and issue some request first
123    Connection conn = TEST_UTIL.getConnection();
124    Admin admin = conn.getAdmin();
125    LOG.debug("Tables: " + admin.listTableDescriptors());
126    try {
127      MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
128      // switch active master
129      HMaster master = cluster.getMaster();
130      master.stopMaster();
131      while (!master.isShutDown()) {
132        Thread.sleep(200);
133      }
134      while (cluster.getMaster() == null || !cluster.getMaster().isInitialized()) {
135        Thread.sleep(200);
136      }
137      // confirm client access still works
138      Assert.assertTrue(admin.balance(false));
139    } finally {
140      admin.close();
141    }
142  }
143
144  @Test
145  public void testMetaRegionMove() throws Exception {
146    TableName tn = TableName.valueOf(name.getMethodName());
147    // create table
148    Connection conn = TEST_UTIL.getConnection();
149    Admin admin = conn.getAdmin();
150    HTable table = (HTable) conn.getTable(tn);
151    try {
152      MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
153      ColumnFamilyDescriptorBuilder cfDescBuilder =
154          ColumnFamilyDescriptorBuilder.newBuilder(family);
155      TableDescriptorBuilder tableDescBuilder =
156          TableDescriptorBuilder.newBuilder(tn).setColumnFamily(cfDescBuilder.build());
157      admin.createTable(tableDescBuilder.build());
158      // issue some requests to cache the region location
159      Put put = new Put(row);
160      put.addColumn(family, qualifier, value);
161      table.put(put);
162      Get get = new Get(row);
163      Result result = table.get(get);
164      // move meta region and confirm client could detect
165      byte[] destServerName = null;
166      for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) {
167        ServerName name = rst.getRegionServer().getServerName();
168        if (!name.equals(cluster.getServerHoldingMeta())) {
169          destServerName = Bytes.toBytes(name.getServerName());
170          break;
171        }
172      }
173      admin.move(RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes(), destServerName);
174      LOG.debug("Finished moving meta");
175      // invalidate client cache
176      RegionInfo region =
177          table.getRegionLocator().getRegionLocation(row).getRegion();
178      ServerName currentServer = cluster.getServerHoldingRegion(tn, region.getRegionName());
179      for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) {
180        ServerName name = rst.getRegionServer().getServerName();
181        if (!name.equals(currentServer)) {
182          destServerName = Bytes.toBytes(name.getServerName());
183          break;
184        }
185      }
186      admin.move(region.getEncodedNameAsBytes(), destServerName);
187      LOG.debug("Finished moving user region");
188      put = new Put(row);
189      put.addColumn(family, qualifier, newVal);
190      table.put(put);
191      result = table.get(get);
192      LOG.debug("Result: " + Bytes.toString(result.getValue(family, qualifier)));
193      Assert.assertArrayEquals(newVal, result.getValue(family, qualifier));
194    } finally {
195      admin.close();
196      table.close();
197    }
198  }
199
200  @Test
201  public void testMetaMoveDuringClientZkClusterRestart() throws Exception {
202    TableName tn = TableName.valueOf(name.getMethodName());
203    // create table
204    ClusterConnection conn = (ClusterConnection) TEST_UTIL.getConnection();
205    Admin admin = conn.getAdmin();
206    HTable table = (HTable) conn.getTable(tn);
207    try {
208      ColumnFamilyDescriptorBuilder cfDescBuilder =
209          ColumnFamilyDescriptorBuilder.newBuilder(family);
210      TableDescriptorBuilder tableDescBuilder =
211          TableDescriptorBuilder.newBuilder(tn).setColumnFamily(cfDescBuilder.build());
212      admin.createTable(tableDescBuilder.build());
213      // put some data
214      Put put = new Put(row);
215      put.addColumn(family, qualifier, value);
216      table.put(put);
217      // invalid connection cache
218      conn.clearRegionCache();
219      // stop client zk cluster
220      clientZkCluster.shutdown();
221      // stop current meta server and confirm the server shutdown process
222      // is not affected by client ZK crash
223      MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
224      int metaServerId = cluster.getServerWithMeta();
225      HRegionServer metaServer = cluster.getRegionServer(metaServerId);
226      metaServer.stop("Stop current RS holding meta region");
227      while (!metaServer.isShutDown()) {
228        Thread.sleep(200);
229      }
230      // wait for meta region online
231      AssignmentTestingUtil.waitForAssignment(cluster.getMaster().getAssignmentManager(),
232        RegionInfoBuilder.FIRST_META_REGIONINFO);
233      // wait some long time to make sure we will retry sync data to client ZK until data set
234      Thread.sleep(10000);
235      clientZkCluster.startup(clientZkDir);
236      // new request should pass
237      Get get = new Get(row);
238      Result result = table.get(get);
239      LOG.debug("Result: " + Bytes.toString(result.getValue(family, qualifier)));
240      Assert.assertArrayEquals(value, result.getValue(family, qualifier));
241    } finally {
242      admin.close();
243      table.close();
244    }
245  }
246
247  @Test
248  public void testAsyncTable() throws Exception {
249    TableName tn = TableName.valueOf(name.getMethodName());
250    ColumnFamilyDescriptorBuilder cfDescBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family);
251    TableDescriptorBuilder tableDescBuilder =
252        TableDescriptorBuilder.newBuilder(tn).setColumnFamily(cfDescBuilder.build());
253    try (AsyncConnection ASYNC_CONN =
254        ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get()) {
255      ASYNC_CONN.getAdmin().createTable(tableDescBuilder.build()).get();
256      AsyncTable<?> table = ASYNC_CONN.getTable(tn);
257      // put some data
258      Put put = new Put(row);
259      put.addColumn(family, qualifier, value);
260      table.put(put).get();
261      // get and verify
262      Get get = new Get(row);
263      Result result = table.get(get).get();
264      LOG.debug("Result: " + Bytes.toString(result.getValue(family, qualifier)));
265      Assert.assertArrayEquals(value, result.getValue(family, qualifier));
266    }
267  }
268}