001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import java.io.File;
021import org.apache.commons.io.FileUtils;
022import org.apache.hadoop.hbase.HBaseClassTestRule;
023import org.apache.hadoop.hbase.HBaseTestingUtility;
024import org.apache.hadoop.hbase.HConstants;
025import org.apache.hadoop.hbase.MiniHBaseCluster;
026import org.apache.hadoop.hbase.ServerName;
027import org.apache.hadoop.hbase.StartMiniClusterOption;
028import org.apache.hadoop.hbase.TableName;
029import org.apache.hadoop.hbase.master.HMaster;
030import org.apache.hadoop.hbase.master.assignment.AssignmentTestingUtil;
031import org.apache.hadoop.hbase.regionserver.HRegionServer;
032import org.apache.hadoop.hbase.testclassification.MediumTests;
033import org.apache.hadoop.hbase.util.Bytes;
034import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
035import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
036import org.junit.AfterClass;
037import org.junit.Assert;
038import org.junit.BeforeClass;
039import org.junit.ClassRule;
040import org.junit.Rule;
041import org.junit.Test;
042import org.junit.experimental.categories.Category;
043import org.junit.rules.TestName;
044import org.slf4j.Logger;
045import org.slf4j.LoggerFactory;
046
047@Category(MediumTests.class)
048public class TestSeparateClientZKCluster {
049  private static final Logger LOG = LoggerFactory.getLogger(TestSeparateClientZKCluster.class);
050  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
051  private static final File clientZkDir =
052    new File(TEST_UTIL.getDataTestDir("TestSeparateClientZKCluster").toString());
053  private static final int ZK_SESSION_TIMEOUT = 5000;
054  private static MiniZooKeeperCluster clientZkCluster;
055
056  private final byte[] family = Bytes.toBytes("cf");
057  private final byte[] qualifier = Bytes.toBytes("c1");
058  private final byte[] row = Bytes.toBytes("row");
059  private final byte[] value = Bytes.toBytes("v1");
060  private final byte[] newVal = Bytes.toBytes("v2");
061
062  @Rule
063  public TestName name = new TestName();
064
065  @ClassRule
066  public static final HBaseClassTestRule CLASS_RULE =
067      HBaseClassTestRule.forClass(TestSeparateClientZKCluster.class);
068
069  @BeforeClass
070  public static void beforeAllTests() throws Exception {
071    int clientZkPort = 21828;
072    clientZkCluster = new MiniZooKeeperCluster(TEST_UTIL.getConfiguration());
073    clientZkCluster.setDefaultClientPort(clientZkPort);
074    clientZkCluster.startup(clientZkDir);
075    // reduce the retry number and start log counter
076    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
077    TEST_UTIL.getConfiguration().setInt("hbase.client.start.log.errors.counter", -1);
078    TEST_UTIL.getConfiguration().setInt("zookeeper.recovery.retry", 1);
079    // core settings for testing client ZK cluster
080    TEST_UTIL.getConfiguration().set(HConstants.CLIENT_ZOOKEEPER_QUORUM, HConstants.LOCALHOST);
081    TEST_UTIL.getConfiguration().setInt(HConstants.CLIENT_ZOOKEEPER_CLIENT_PORT, clientZkPort);
082    // reduce zk session timeout to easier trigger session expiration
083    TEST_UTIL.getConfiguration().setInt(HConstants.ZK_SESSION_TIMEOUT, ZK_SESSION_TIMEOUT);
084    // Start a cluster with 2 masters and 3 regionservers.
085    StartMiniClusterOption option = StartMiniClusterOption.builder()
086        .numMasters(2).numRegionServers(3).numDataNodes(3).build();
087    TEST_UTIL.startMiniCluster(option);
088  }
089
090  @AfterClass
091  public static void afterAllTests() throws Exception {
092    TEST_UTIL.shutdownMiniCluster();
093    clientZkCluster.shutdown();
094    FileUtils.deleteDirectory(clientZkDir);
095  }
096
097  @Test
098  public void testBasicOperation() throws Exception {
099    TableName tn = TableName.valueOf(name.getMethodName());
100    // create table
101    Connection conn = TEST_UTIL.getConnection();
102    Admin admin = conn.getAdmin();
103    HTable table = (HTable) conn.getTable(tn);
104    try {
105      ColumnFamilyDescriptorBuilder cfDescBuilder =
106          ColumnFamilyDescriptorBuilder.newBuilder(family);
107      TableDescriptorBuilder tableDescBuilder =
108          TableDescriptorBuilder.newBuilder(tn).setColumnFamily(cfDescBuilder.build());
109      admin.createTable(tableDescBuilder.build());
110      // test simple get and put
111      Put put = new Put(row);
112      put.addColumn(family, qualifier, value);
113      table.put(put);
114      Get get = new Get(row);
115      Result result = table.get(get);
116      LOG.debug("Result: " + Bytes.toString(result.getValue(family, qualifier)));
117      Assert.assertArrayEquals(value, result.getValue(family, qualifier));
118    } finally {
119      admin.close();
120      table.close();
121    }
122  }
123
124  @Test
125  public void testMasterSwitch() throws Exception {
126    // get an admin instance and issue some request first
127    Connection conn = TEST_UTIL.getConnection();
128    Admin admin = conn.getAdmin();
129    LOG.debug("Tables: " + admin.listTableDescriptors());
130    try {
131      MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
132      // switch active master
133      HMaster master = cluster.getMaster();
134      master.stopMaster();
135      LOG.info("Stopped master {}", master.getServerName());
136      while (!master.isShutDown()) {
137        Thread.sleep(200);
138      }
139      LOG.info("Shutdown master {}", master.getServerName());
140      while (cluster.getMaster() == null || !cluster.getMaster().isInitialized()) {
141        LOG.info("Get master {}", cluster.getMaster() == null? "null":
142          cluster.getMaster().getServerName());
143        Thread.sleep(200);
144      }
145      LOG.info("Got master {}", cluster.getMaster().getServerName());
146      // confirm client access still works
147      Assert.assertTrue(admin.balance(false));
148    } finally {
149      admin.close();
150    }
151  }
152
153  @Test
154  public void testMetaRegionMove() throws Exception {
155    TableName tn = TableName.valueOf(name.getMethodName());
156    // create table
157    Connection conn = TEST_UTIL.getConnection();
158    Admin admin = conn.getAdmin();
159    HTable table = (HTable) conn.getTable(tn);
160    try {
161      MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
162      ColumnFamilyDescriptorBuilder cfDescBuilder =
163          ColumnFamilyDescriptorBuilder.newBuilder(family);
164      TableDescriptorBuilder tableDescBuilder =
165          TableDescriptorBuilder.newBuilder(tn).setColumnFamily(cfDescBuilder.build());
166      admin.createTable(tableDescBuilder.build());
167      // issue some requests to cache the region location
168      Put put = new Put(row);
169      put.addColumn(family, qualifier, value);
170      table.put(put);
171      Get get = new Get(row);
172      Result result = table.get(get);
173      // move meta region and confirm client could detect
174      ServerName destServerName = null;
175      for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) {
176        ServerName name = rst.getRegionServer().getServerName();
177        if (!name.equals(cluster.getServerHoldingMeta())) {
178          destServerName = name;
179          break;
180        }
181      }
182      admin.move(RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes(), destServerName);
183      LOG.debug("Finished moving meta");
184      // invalidate client cache
185      RegionInfo region =
186          table.getRegionLocator().getRegionLocation(row).getRegion();
187      ServerName currentServer = cluster.getServerHoldingRegion(tn, region.getRegionName());
188      for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) {
189        ServerName name = rst.getRegionServer().getServerName();
190        if (!name.equals(currentServer)) {
191          destServerName = name;
192          break;
193        }
194      }
195      admin.move(region.getEncodedNameAsBytes(), destServerName);
196      LOG.debug("Finished moving user region");
197      put = new Put(row);
198      put.addColumn(family, qualifier, newVal);
199      table.put(put);
200      result = table.get(get);
201      LOG.debug("Result: " + Bytes.toString(result.getValue(family, qualifier)));
202      Assert.assertArrayEquals(newVal, result.getValue(family, qualifier));
203    } finally {
204      admin.close();
205      table.close();
206    }
207  }
208
209  @Test
210  public void testMetaMoveDuringClientZkClusterRestart() throws Exception {
211    TableName tn = TableName.valueOf(name.getMethodName());
212    // create table
213    ClusterConnection conn = (ClusterConnection) TEST_UTIL.getConnection();
214    Admin admin = conn.getAdmin();
215    HTable table = (HTable) conn.getTable(tn);
216    try {
217      ColumnFamilyDescriptorBuilder cfDescBuilder =
218          ColumnFamilyDescriptorBuilder.newBuilder(family);
219      TableDescriptorBuilder tableDescBuilder =
220          TableDescriptorBuilder.newBuilder(tn).setColumnFamily(cfDescBuilder.build());
221      admin.createTable(tableDescBuilder.build());
222      // put some data
223      Put put = new Put(row);
224      put.addColumn(family, qualifier, value);
225      table.put(put);
226      // invalid connection cache
227      conn.clearRegionLocationCache();
228      // stop client zk cluster
229      clientZkCluster.shutdown();
230      // stop current meta server and confirm the server shutdown process
231      // is not affected by client ZK crash
232      MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
233      int metaServerId = cluster.getServerWithMeta();
234      HRegionServer metaServer = cluster.getRegionServer(metaServerId);
235      metaServer.stop("Stop current RS holding meta region");
236      while (!metaServer.isShutDown()) {
237        Thread.sleep(200);
238      }
239      // wait for meta region online
240      AssignmentTestingUtil.waitForAssignment(cluster.getMaster().getAssignmentManager(),
241        RegionInfoBuilder.FIRST_META_REGIONINFO);
242      // wait some long time to make sure we will retry sync data to client ZK until data set
243      Thread.sleep(10000);
244      clientZkCluster.startup(clientZkDir);
245      // new request should pass
246      Get get = new Get(row);
247      Result result = table.get(get);
248      LOG.debug("Result: " + Bytes.toString(result.getValue(family, qualifier)));
249      Assert.assertArrayEquals(value, result.getValue(family, qualifier));
250    } finally {
251      admin.close();
252      table.close();
253    }
254  }
255
256  @Test
257  public void testAsyncTable() throws Exception {
258    TableName tn = TableName.valueOf(name.getMethodName());
259    ColumnFamilyDescriptorBuilder cfDescBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family);
260    TableDescriptorBuilder tableDescBuilder =
261        TableDescriptorBuilder.newBuilder(tn).setColumnFamily(cfDescBuilder.build());
262    try (AsyncConnection ASYNC_CONN =
263        ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get()) {
264      ASYNC_CONN.getAdmin().createTable(tableDescBuilder.build()).get();
265      AsyncTable<?> table = ASYNC_CONN.getTable(tn);
266      // put some data
267      Put put = new Put(row);
268      put.addColumn(family, qualifier, value);
269      table.put(put).get();
270      // get and verify
271      Get get = new Get(row);
272      Result result = table.get(get).get();
273      LOG.debug("Result: " + Bytes.toString(result.getValue(family, qualifier)));
274      Assert.assertArrayEquals(value, result.getValue(family, qualifier));
275    }
276  }
277}