001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.jupiter.api.Assertions.assertArrayEquals;
021import static org.junit.jupiter.api.Assertions.assertEquals;
022
023import java.io.File;
024import java.io.IOException;
025import org.apache.commons.io.FileUtils;
026import org.apache.hadoop.hbase.HBaseTestingUtil;
027import org.apache.hadoop.hbase.HConstants;
028import org.apache.hadoop.hbase.ServerName;
029import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
030import org.apache.hadoop.hbase.StartTestingClusterOption;
031import org.apache.hadoop.hbase.TableName;
032import org.apache.hadoop.hbase.TableNameTestExtension;
033import org.apache.hadoop.hbase.master.HMaster;
034import org.apache.hadoop.hbase.master.assignment.AssignmentTestingUtil;
035import org.apache.hadoop.hbase.regionserver.HRegionServer;
036import org.apache.hadoop.hbase.testclassification.ClientTests;
037import org.apache.hadoop.hbase.testclassification.MediumTests;
038import org.apache.hadoop.hbase.util.Bytes;
039import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
040import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
041import org.junit.jupiter.api.AfterAll;
042import org.junit.jupiter.api.BeforeAll;
043import org.junit.jupiter.api.BeforeEach;
044import org.junit.jupiter.api.Tag;
045import org.junit.jupiter.api.Test;
046import org.junit.jupiter.api.extension.RegisterExtension;
047import org.slf4j.Logger;
048import org.slf4j.LoggerFactory;
049
050@Tag(ClientTests.TAG)
051@Tag(MediumTests.TAG)
052public class TestSeparateClientZKCluster {
053  private static final Logger LOG = LoggerFactory.getLogger(TestSeparateClientZKCluster.class);
054  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
055  private static final File clientZkDir =
056    new File(TEST_UTIL.getDataTestDir("TestSeparateClientZKCluster").toString());
057  private static final int ZK_SESSION_TIMEOUT = 5000;
058  private static MiniZooKeeperCluster clientZkCluster;
059
060  private final byte[] family = Bytes.toBytes("cf");
061  private final byte[] qualifier = Bytes.toBytes("c1");
062  private final byte[] row = Bytes.toBytes("row");
063  private final byte[] value = Bytes.toBytes("v1");
064  private final byte[] newVal = Bytes.toBytes("v2");
065
066  @RegisterExtension
067  private TableNameTestExtension name = new TableNameTestExtension();
068
069  @BeforeAll
070  public static void beforeAllTests() throws Exception {
071    int clientZkPort = 21828;
072    clientZkCluster = new MiniZooKeeperCluster(TEST_UTIL.getConfiguration());
073    clientZkCluster.setDefaultClientPort(clientZkPort);
074    clientZkCluster.startup(clientZkDir);
075    // start log counter
076    TEST_UTIL.getConfiguration().setInt("hbase.client.start.log.errors.counter", 3);
077    TEST_UTIL.getConfiguration().setInt("zookeeper.recovery.retry", 1);
078    // core settings for testing client ZK cluster
079    TEST_UTIL.getConfiguration().setClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
080      ZKConnectionRegistry.class, ConnectionRegistry.class);
081    TEST_UTIL.getConfiguration().set(HConstants.CLIENT_ZOOKEEPER_QUORUM, HConstants.LOCALHOST);
082    TEST_UTIL.getConfiguration().setInt(HConstants.CLIENT_ZOOKEEPER_CLIENT_PORT, clientZkPort);
083    // reduce zk session timeout to easier trigger session expiration
084    TEST_UTIL.getConfiguration().setInt(HConstants.ZK_SESSION_TIMEOUT, ZK_SESSION_TIMEOUT);
085    // Start a cluster with 2 masters and 3 regionservers.
086    StartTestingClusterOption option =
087      StartTestingClusterOption.builder().numMasters(2).numRegionServers(3).numDataNodes(3).build();
088    TEST_UTIL.startMiniCluster(option);
089  }
090
091  @AfterAll
092  public static void afterAllTests() throws Exception {
093    TEST_UTIL.shutdownMiniCluster();
094    clientZkCluster.shutdown();
095    FileUtils.deleteDirectory(clientZkDir);
096  }
097
098  @BeforeEach
099  public void setUp() throws IOException {
100    try (Admin admin = TEST_UTIL.getConnection().getAdmin()) {
101      waitForNewMasterUpAndAddressSynced(admin);
102    }
103  }
104
105  private void waitForNewMasterUpAndAddressSynced(Admin admin) {
106    TEST_UTIL.waitFor(30000, () -> {
107      try {
108        return admin.listNamespaces().length > 0;
109      } catch (Exception e) {
110        LOG.warn("failed to list namespaces", e);
111        return false;
112      }
113    });
114  }
115
116  @Test
117  public void testBasicOperation() throws Exception {
118    TableName tn = name.getTableName();
119    // create table
120    Connection conn = TEST_UTIL.getConnection();
121    try (Admin admin = conn.getAdmin(); Table table = conn.getTable(tn)) {
122      ColumnFamilyDescriptorBuilder cfDescBuilder =
123        ColumnFamilyDescriptorBuilder.newBuilder(family);
124      TableDescriptorBuilder tableDescBuilder =
125        TableDescriptorBuilder.newBuilder(tn).setColumnFamily(cfDescBuilder.build());
126      admin.createTable(tableDescBuilder.build());
127      // test simple get and put
128      Put put = new Put(row);
129      put.addColumn(family, qualifier, value);
130      table.put(put);
131      Get get = new Get(row);
132      Result result = table.get(get);
133      LOG.debug("Result: " + Bytes.toString(result.getValue(family, qualifier)));
134      assertArrayEquals(value, result.getValue(family, qualifier));
135    }
136  }
137
138  @Test
139  public void testMasterSwitch() throws Exception {
140    // get an admin instance and issue some request first
141    Connection conn = TEST_UTIL.getConnection();
142    try (Admin admin = conn.getAdmin()) {
143      LOG.debug("Tables: " + admin.listTableDescriptors());
144      SingleProcessHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
145      // switch active master
146      HMaster master = cluster.getMaster();
147      master.stopMaster();
148      LOG.info("Stopped master {}", master.getServerName());
149      TEST_UTIL.waitFor(30000, () -> !master.isAlive());
150      LOG.info("Shutdown master {}", master.getServerName());
151      TEST_UTIL.waitFor(30000,
152        () -> cluster.getMaster() != null && cluster.getMaster().isInitialized());
153      LOG.info("Got master {}", cluster.getMaster().getServerName());
154      // confirm client access still works
155      waitForNewMasterUpAndAddressSynced(admin);
156    }
157  }
158
159  @Test
160  public void testMetaRegionMove() throws Exception {
161    TableName tn = name.getTableName();
162    // create table
163    Connection conn = TEST_UTIL.getConnection();
164    try (Admin admin = conn.getAdmin(); Table table = conn.getTable(tn);
165      RegionLocator locator = conn.getRegionLocator(tn)) {
166      SingleProcessHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
167      ColumnFamilyDescriptorBuilder cfDescBuilder =
168        ColumnFamilyDescriptorBuilder.newBuilder(family);
169      TableDescriptorBuilder tableDescBuilder =
170        TableDescriptorBuilder.newBuilder(tn).setColumnFamily(cfDescBuilder.build());
171      admin.createTable(tableDescBuilder.build());
172      // issue some requests to cache the region location
173      Put put = new Put(row);
174      put.addColumn(family, qualifier, value);
175      table.put(put);
176      Get get = new Get(row);
177      Result result = table.get(get);
178      // move meta region and confirm client could detect
179      ServerName destServerName = null;
180      for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) {
181        ServerName name = rst.getRegionServer().getServerName();
182        if (!name.equals(cluster.getServerHoldingMeta())) {
183          destServerName = name;
184          break;
185        }
186      }
187      admin.move(RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes(), destServerName);
188      LOG.debug("Finished moving meta");
189      // invalidate client cache
190      RegionInfo region = locator.getRegionLocation(row).getRegion();
191      ServerName currentServer = cluster.getServerHoldingRegion(tn, region.getRegionName());
192      for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) {
193        ServerName name = rst.getRegionServer().getServerName();
194        if (!name.equals(currentServer)) {
195          destServerName = name;
196          break;
197        }
198      }
199      admin.move(region.getEncodedNameAsBytes(), destServerName);
200      LOG.debug("Finished moving user region");
201      put = new Put(row);
202      put.addColumn(family, qualifier, newVal);
203      table.put(put);
204      result = table.get(get);
205      LOG.debug("Result: " + Bytes.toString(result.getValue(family, qualifier)));
206      assertArrayEquals(newVal, result.getValue(family, qualifier));
207    }
208  }
209
210  @Test
211  public void testMetaMoveDuringClientZkClusterRestart() throws Exception {
212    TableName tn = name.getTableName();
213    // create table
214    Connection conn = TEST_UTIL.getConnection();
215    try (Admin admin = conn.getAdmin(); Table table = conn.getTable(tn)) {
216      ColumnFamilyDescriptorBuilder cfDescBuilder =
217        ColumnFamilyDescriptorBuilder.newBuilder(family);
218      TableDescriptorBuilder tableDescBuilder =
219        TableDescriptorBuilder.newBuilder(tn).setColumnFamily(cfDescBuilder.build());
220      admin.createTable(tableDescBuilder.build());
221      // put some data
222      Put put = new Put(row);
223      put.addColumn(family, qualifier, value);
224      table.put(put);
225      // invalid connection cache
226      conn.clearRegionLocationCache();
227      // stop client zk cluster
228      clientZkCluster.shutdown();
229      // stop current meta server and confirm the server shutdown process
230      // is not affected by client ZK crash
231      SingleProcessHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
232      int metaServerId = cluster.getServerWithMeta();
233      HRegionServer metaServer = cluster.getRegionServer(metaServerId);
234      metaServer.stop("Stop current RS holding meta region");
235      while (metaServer.isAlive()) {
236        Thread.sleep(200);
237      }
238      // wait for meta region online
239      AssignmentTestingUtil.waitForAssignment(cluster.getMaster().getAssignmentManager(),
240        RegionInfoBuilder.FIRST_META_REGIONINFO);
241      // wait some long time to make sure we will retry sync data to client ZK until data set
242      Thread.sleep(10000);
243      clientZkCluster.startup(clientZkDir);
244      // new request should pass
245      Get get = new Get(row);
246      Result result = table.get(get);
247      LOG.debug("Result: " + Bytes.toString(result.getValue(family, qualifier)));
248      assertArrayEquals(value, result.getValue(family, qualifier));
249    }
250  }
251
252  @Test
253  public void testAsyncTable() throws Exception {
254    TableName tn = name.getTableName();
255    ColumnFamilyDescriptorBuilder cfDescBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family);
256    TableDescriptorBuilder tableDescBuilder =
257      TableDescriptorBuilder.newBuilder(tn).setColumnFamily(cfDescBuilder.build());
258    try (AsyncConnection ASYNC_CONN =
259      ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get()) {
260      ASYNC_CONN.getAdmin().createTable(tableDescBuilder.build()).get();
261      AsyncTable<?> table = ASYNC_CONN.getTable(tn);
262      // put some data
263      Put put = new Put(row);
264      put.addColumn(family, qualifier, value);
265      table.put(put).get();
266      // get and verify
267      Get get = new Get(row);
268      Result result = table.get(get).get();
269      LOG.debug("Result: " + Bytes.toString(result.getValue(family, qualifier)));
270      assertArrayEquals(value, result.getValue(family, qualifier));
271    }
272  }
273
274  @Test
275  public void testChangeMetaReplicaCount() throws Exception {
276    Admin admin = TEST_UTIL.getAdmin();
277    try (RegionLocator locator =
278      TEST_UTIL.getConnection().getRegionLocator(TableName.META_TABLE_NAME)) {
279      assertEquals(1, locator.getAllRegionLocations().size());
280      HBaseTestingUtil.setReplicas(admin, TableName.META_TABLE_NAME, 3);
281      TEST_UTIL.waitFor(30000, () -> locator.getAllRegionLocations().size() == 3);
282      HBaseTestingUtil.setReplicas(admin, TableName.META_TABLE_NAME, 2);
283      TEST_UTIL.waitFor(30000, () -> locator.getAllRegionLocations().size() == 2);
284      HBaseTestingUtil.setReplicas(admin, TableName.META_TABLE_NAME, 1);
285      TEST_UTIL.waitFor(30000, () -> locator.getAllRegionLocations().size() == 1);
286    }
287  }
288}