001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertArrayEquals;
021import static org.junit.Assert.assertEquals;
022
023import java.io.File;
024import java.io.IOException;
025import org.apache.commons.io.FileUtils;
026import org.apache.hadoop.hbase.HBaseClassTestRule;
027import org.apache.hadoop.hbase.HBaseTestingUtility;
028import org.apache.hadoop.hbase.HConstants;
029import org.apache.hadoop.hbase.MiniHBaseCluster;
030import org.apache.hadoop.hbase.ServerName;
031import org.apache.hadoop.hbase.StartMiniClusterOption;
032import org.apache.hadoop.hbase.TableName;
033import org.apache.hadoop.hbase.TableNameTestRule;
034import org.apache.hadoop.hbase.master.HMaster;
035import org.apache.hadoop.hbase.master.assignment.AssignmentTestingUtil;
036import org.apache.hadoop.hbase.regionserver.HRegionServer;
037import org.apache.hadoop.hbase.testclassification.ClientTests;
038import org.apache.hadoop.hbase.testclassification.MediumTests;
039import org.apache.hadoop.hbase.util.Bytes;
040import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
041import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
042import org.junit.AfterClass;
043import org.junit.Before;
044import org.junit.BeforeClass;
045import org.junit.ClassRule;
046import org.junit.Rule;
047import org.junit.Test;
048import org.junit.experimental.categories.Category;
049import org.slf4j.Logger;
050import org.slf4j.LoggerFactory;
051
052@Category({ ClientTests.class, MediumTests.class })
053public class TestSeparateClientZKCluster {
054  private static final Logger LOG = LoggerFactory.getLogger(TestSeparateClientZKCluster.class);
055  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
056  private static final File clientZkDir =
057    new File(TEST_UTIL.getDataTestDir("TestSeparateClientZKCluster").toString());
058  private static final int ZK_SESSION_TIMEOUT = 5000;
059  private static MiniZooKeeperCluster clientZkCluster;
060
061  private final byte[] family = Bytes.toBytes("cf");
062  private final byte[] qualifier = Bytes.toBytes("c1");
063  private final byte[] row = Bytes.toBytes("row");
064  private final byte[] value = Bytes.toBytes("v1");
065  private final byte[] newVal = Bytes.toBytes("v2");
066
067  @Rule
068  public TableNameTestRule name = new TableNameTestRule();
069
070  @ClassRule
071  public static final HBaseClassTestRule CLASS_RULE =
072    HBaseClassTestRule.forClass(TestSeparateClientZKCluster.class);
073
074  @BeforeClass
075  public static void beforeAllTests() throws Exception {
076    int clientZkPort = 21828;
077    clientZkCluster = new MiniZooKeeperCluster(TEST_UTIL.getConfiguration());
078    clientZkCluster.setDefaultClientPort(clientZkPort);
079    clientZkCluster.startup(clientZkDir);
080    // start log counter
081    TEST_UTIL.getConfiguration().setInt("hbase.client.start.log.errors.counter", 3);
082    TEST_UTIL.getConfiguration().setInt("zookeeper.recovery.retry", 1);
083    // core settings for testing client ZK cluster
084    TEST_UTIL.getConfiguration().setClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
085      ZKConnectionRegistry.class, ConnectionRegistry.class);
086    TEST_UTIL.getConfiguration().set(HConstants.CLIENT_ZOOKEEPER_QUORUM, HConstants.LOCALHOST);
087    TEST_UTIL.getConfiguration().setInt(HConstants.CLIENT_ZOOKEEPER_CLIENT_PORT, clientZkPort);
088    // reduce zk session timeout to easier trigger session expiration
089    TEST_UTIL.getConfiguration().setInt(HConstants.ZK_SESSION_TIMEOUT, ZK_SESSION_TIMEOUT);
090    // Start a cluster with 2 masters and 3 regionservers.
091    StartMiniClusterOption option =
092      StartMiniClusterOption.builder().numMasters(2).numRegionServers(3).numDataNodes(3).build();
093    TEST_UTIL.startMiniCluster(option);
094  }
095
096  @AfterClass
097  public static void afterAllTests() throws Exception {
098    TEST_UTIL.shutdownMiniCluster();
099    clientZkCluster.shutdown();
100    FileUtils.deleteDirectory(clientZkDir);
101  }
102
103  @Before
104  public void setUp() throws IOException {
105    try (Admin admin = TEST_UTIL.getConnection().getAdmin()) {
106      waitForNewMasterUpAndAddressSynced(admin);
107    }
108  }
109
110  private void waitForNewMasterUpAndAddressSynced(Admin admin) {
111    TEST_UTIL.waitFor(30000, () -> {
112      try {
113        return admin.listNamespaces().length > 0;
114      } catch (Exception e) {
115        LOG.warn("failed to list namespaces", e);
116        return false;
117      }
118    });
119  }
120
121  @Test
122  public void testBasicOperation() throws Exception {
123    TableName tn = name.getTableName();
124    // create table
125    Connection conn = TEST_UTIL.getConnection();
126    try (Admin admin = conn.getAdmin(); Table table = conn.getTable(tn)) {
127      ColumnFamilyDescriptorBuilder cfDescBuilder =
128        ColumnFamilyDescriptorBuilder.newBuilder(family);
129      TableDescriptorBuilder tableDescBuilder =
130        TableDescriptorBuilder.newBuilder(tn).setColumnFamily(cfDescBuilder.build());
131      admin.createTable(tableDescBuilder.build());
132      // test simple get and put
133      Put put = new Put(row);
134      put.addColumn(family, qualifier, value);
135      table.put(put);
136      Get get = new Get(row);
137      Result result = table.get(get);
138      LOG.debug("Result: " + Bytes.toString(result.getValue(family, qualifier)));
139      assertArrayEquals(value, result.getValue(family, qualifier));
140    }
141  }
142
143  @Test
144  public void testMasterSwitch() throws Exception {
145    // get an admin instance and issue some request first
146    Connection conn = TEST_UTIL.getConnection();
147    try (Admin admin = conn.getAdmin()) {
148      LOG.debug("Tables: " + admin.listTableDescriptors());
149      MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
150      // switch active master
151      HMaster master = cluster.getMaster();
152      master.stopMaster();
153      LOG.info("Stopped master {}", master.getServerName());
154      TEST_UTIL.waitFor(30000, () -> !master.isAlive());
155      LOG.info("Shutdown master {}", master.getServerName());
156      TEST_UTIL.waitFor(30000,
157        () -> cluster.getMaster() != null && cluster.getMaster().isInitialized());
158      LOG.info("Got master {}", cluster.getMaster().getServerName());
159      // confirm client access still works
160      waitForNewMasterUpAndAddressSynced(admin);
161    }
162  }
163
164  @Test
165  public void testMetaRegionMove() throws Exception {
166    TableName tn = name.getTableName();
167    // create table
168    Connection conn = TEST_UTIL.getConnection();
169    try (Admin admin = conn.getAdmin(); Table table = conn.getTable(tn);
170      RegionLocator locator = conn.getRegionLocator(tn)) {
171      MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
172      ColumnFamilyDescriptorBuilder cfDescBuilder =
173        ColumnFamilyDescriptorBuilder.newBuilder(family);
174      TableDescriptorBuilder tableDescBuilder =
175        TableDescriptorBuilder.newBuilder(tn).setColumnFamily(cfDescBuilder.build());
176      admin.createTable(tableDescBuilder.build());
177      // issue some requests to cache the region location
178      Put put = new Put(row);
179      put.addColumn(family, qualifier, value);
180      table.put(put);
181      Get get = new Get(row);
182      Result result = table.get(get);
183      // move meta region and confirm client could detect
184      ServerName destServerName = null;
185      for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) {
186        ServerName name = rst.getRegionServer().getServerName();
187        if (!name.equals(cluster.getServerHoldingMeta())) {
188          destServerName = name;
189          break;
190        }
191      }
192      admin.move(RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes(), destServerName);
193      LOG.debug("Finished moving meta");
194      // invalidate client cache
195      RegionInfo region = locator.getRegionLocation(row).getRegion();
196      ServerName currentServer = cluster.getServerHoldingRegion(tn, region.getRegionName());
197      for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) {
198        ServerName name = rst.getRegionServer().getServerName();
199        if (!name.equals(currentServer)) {
200          destServerName = name;
201          break;
202        }
203      }
204      admin.move(region.getEncodedNameAsBytes(), destServerName);
205      LOG.debug("Finished moving user region");
206      put = new Put(row);
207      put.addColumn(family, qualifier, newVal);
208      table.put(put);
209      result = table.get(get);
210      LOG.debug("Result: " + Bytes.toString(result.getValue(family, qualifier)));
211      assertArrayEquals(newVal, result.getValue(family, qualifier));
212    }
213  }
214
215  @Test
216  public void testMetaMoveDuringClientZkClusterRestart() throws Exception {
217    TableName tn = name.getTableName();
218    // create table
219    Connection conn = TEST_UTIL.getConnection();
220    try (Admin admin = conn.getAdmin(); Table table = conn.getTable(tn)) {
221      ColumnFamilyDescriptorBuilder cfDescBuilder =
222        ColumnFamilyDescriptorBuilder.newBuilder(family);
223      TableDescriptorBuilder tableDescBuilder =
224        TableDescriptorBuilder.newBuilder(tn).setColumnFamily(cfDescBuilder.build());
225      admin.createTable(tableDescBuilder.build());
226      // put some data
227      Put put = new Put(row);
228      put.addColumn(family, qualifier, value);
229      table.put(put);
230      // invalid connection cache
231      conn.clearRegionLocationCache();
232      // stop client zk cluster
233      clientZkCluster.shutdown();
234      // stop current meta server and confirm the server shutdown process
235      // is not affected by client ZK crash
236      MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
237      int metaServerId = cluster.getServerWithMeta();
238      HRegionServer metaServer = cluster.getRegionServer(metaServerId);
239      metaServer.stop("Stop current RS holding meta region");
240      while (!metaServer.isShutDown()) {
241        Thread.sleep(200);
242      }
243      // wait for meta region online
244      AssignmentTestingUtil.waitForAssignment(cluster.getMaster().getAssignmentManager(),
245        RegionInfoBuilder.FIRST_META_REGIONINFO);
246      // wait some long time to make sure we will retry sync data to client ZK until data set
247      Thread.sleep(10000);
248      clientZkCluster.startup(clientZkDir);
249      // new request should pass
250      Get get = new Get(row);
251      Result result = table.get(get);
252      LOG.debug("Result: " + Bytes.toString(result.getValue(family, qualifier)));
253      assertArrayEquals(value, result.getValue(family, qualifier));
254    }
255  }
256
257  @Test
258  public void testAsyncTable() throws Exception {
259    TableName tn = name.getTableName();
260    ColumnFamilyDescriptorBuilder cfDescBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family);
261    TableDescriptorBuilder tableDescBuilder =
262      TableDescriptorBuilder.newBuilder(tn).setColumnFamily(cfDescBuilder.build());
263    try (AsyncConnection ASYNC_CONN =
264      ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get()) {
265      ASYNC_CONN.getAdmin().createTable(tableDescBuilder.build()).get();
266      AsyncTable<?> table = ASYNC_CONN.getTable(tn);
267      // put some data
268      Put put = new Put(row);
269      put.addColumn(family, qualifier, value);
270      table.put(put).get();
271      // get and verify
272      Get get = new Get(row);
273      Result result = table.get(get).get();
274      LOG.debug("Result: " + Bytes.toString(result.getValue(family, qualifier)));
275      assertArrayEquals(value, result.getValue(family, qualifier));
276    }
277  }
278
279  @Test
280  public void testChangeMetaReplicaCount() throws Exception {
281    Admin admin = TEST_UTIL.getAdmin();
282    try (RegionLocator locator =
283      TEST_UTIL.getConnection().getRegionLocator(TableName.META_TABLE_NAME)) {
284      assertEquals(1, locator.getAllRegionLocations().size());
285      HBaseTestingUtility.setReplicas(admin, TableName.META_TABLE_NAME, 3);
286      TEST_UTIL.waitFor(30000, () -> locator.getAllRegionLocations().size() == 3);
287      HBaseTestingUtility.setReplicas(admin, TableName.META_TABLE_NAME, 2);
288      TEST_UTIL.waitFor(30000, () -> locator.getAllRegionLocations().size() == 2);
289      HBaseTestingUtility.setReplicas(admin, TableName.META_TABLE_NAME, 1);
290      TEST_UTIL.waitFor(30000, () -> locator.getAllRegionLocations().size() == 1);
291    }
292  }
293}