001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertFalse;
022import static org.junit.jupiter.api.Assertions.assertTrue;
023
024import edu.umd.cs.findbugs.annotations.NonNull;
025import java.util.List;
026import java.util.Map;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.hbase.client.Admin;
029import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
030import org.apache.hadoop.hbase.client.Put;
031import org.apache.hadoop.hbase.client.RegionInfo;
032import org.apache.hadoop.hbase.client.ResultScanner;
033import org.apache.hadoop.hbase.client.Scan;
034import org.apache.hadoop.hbase.client.Table;
035import org.apache.hadoop.hbase.client.TableDescriptor;
036import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
037import org.apache.hadoop.hbase.coordination.ZkSplitLogWorkerCoordination;
038import org.apache.hadoop.hbase.master.HMaster;
039import org.apache.hadoop.hbase.master.LoadBalancer;
040import org.apache.hadoop.hbase.master.balancer.SimpleLoadBalancer;
041import org.apache.hadoop.hbase.testclassification.MediumTests;
042import org.apache.hadoop.hbase.testclassification.MiscTests;
043import org.apache.hadoop.hbase.util.Bytes;
044import org.apache.hadoop.hbase.util.CommonFSUtils;
045import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
046import org.apache.hadoop.hbase.zookeeper.ZKUtil;
047import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
048import org.apache.zookeeper.KeeperException;
049import org.junit.jupiter.api.AfterAll;
050import org.junit.jupiter.api.AfterEach;
051import org.junit.jupiter.api.BeforeAll;
052import org.junit.jupiter.api.BeforeEach;
053import org.junit.jupiter.api.Tag;
054import org.junit.jupiter.api.Test;
055import org.junit.jupiter.api.TestInfo;
056import org.slf4j.Logger;
057import org.slf4j.LoggerFactory;
058
059@Tag(MiscTests.TAG)
060@Tag(MediumTests.TAG)
061public class TestZooKeeper {
062
063  private static final Logger LOG = LoggerFactory.getLogger(TestZooKeeper.class);
064
065  private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
066
067  @BeforeAll
068  public static void setUpBeforeClass() throws Exception {
069    // Test we can first start the ZK cluster by itself
070    Configuration conf = TEST_UTIL.getConfiguration();
071    TEST_UTIL.startMiniDFSCluster(2);
072    TEST_UTIL.startMiniZKCluster();
073    conf.set(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
074      HConstants.ZK_CONNECTION_REGISTRY_CLASS);
075    conf.setInt(HConstants.ZK_SESSION_TIMEOUT, 1000);
076    conf.setClass(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, MockLoadBalancer.class,
077      LoadBalancer.class);
078    TEST_UTIL.startMiniDFSCluster(2);
079  }
080
081  @AfterAll
082  public static void tearDownAfterClass() throws Exception {
083    TEST_UTIL.shutdownMiniCluster();
084  }
085
086  @BeforeEach
087  public void setUp() throws Exception {
088    StartTestingClusterOption option =
089      StartTestingClusterOption.builder().numMasters(2).numRegionServers(2).build();
090    TEST_UTIL.startMiniHBaseCluster(option);
091  }
092
093  @AfterEach
094  public void after() throws Exception {
095    try {
096      TEST_UTIL.getHBaseCluster().waitForActiveAndReadyMaster(10000);
097      // Some regionserver could fail to delete its znode.
098      // So shutdown could hang. Let's kill them all instead.
099      TEST_UTIL.getHBaseCluster().killAll();
100
101      // Still need to clean things up
102      TEST_UTIL.shutdownMiniHBaseCluster();
103    } finally {
104      TEST_UTIL.getTestFileSystem().delete(CommonFSUtils.getRootDir(TEST_UTIL.getConfiguration()),
105        true);
106      ZKUtil.deleteNodeRecursively(TEST_UTIL.getZooKeeperWatcher(), "/hbase");
107    }
108  }
109
110  @Test
111  public void testRegionServerSessionExpired(TestInfo testInfo) throws Exception {
112    LOG.info("Starting " + testInfo.getTestMethod().get().getName());
113    TEST_UTIL.expireRegionServerSession(0);
114    testSanity(testInfo.getTestMethod().get().getName());
115  }
116
117  @Test
118  public void testMasterSessionExpired(TestInfo testInfo) throws Exception {
119    LOG.info("Starting " + testInfo.getTestMethod().get().getName());
120    TEST_UTIL.expireMasterSession();
121    testSanity(testInfo.getTestMethod().get().getName());
122  }
123
124  /**
125   * Master recovery when the znode already exists. Internally, this test differs from
126   * {@link #testMasterSessionExpired} because here the master znode will exist in ZK.
127   */
128  @Test
129  public void testMasterZKSessionRecoveryFailure(TestInfo testInfo) throws Exception {
130    LOG.info("Starting " + testInfo.getTestMethod().get().getName());
131    SingleProcessHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
132    HMaster m = cluster.getMaster();
133    m.abort("Test recovery from zk session expired", new KeeperException.SessionExpiredException());
134    assertTrue(m.isStopped()); // Master doesn't recover any more
135    testSanity(testInfo.getTestMethod().get().getName());
136  }
137
138  /**
139   * Make sure we can use the cluster
140   */
141  private void testSanity(final String testName) throws Exception {
142    String tableName = testName + "_" + EnvironmentEdgeManager.currentTime();
143    TableDescriptor desc = TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName))
144      .setColumnFamily(ColumnFamilyDescriptorBuilder.of("fam")).build();
145    LOG.info("Creating table " + tableName);
146    Admin admin = TEST_UTIL.getAdmin();
147    try {
148      admin.createTable(desc);
149    } finally {
150      admin.close();
151    }
152
153    Table table = TEST_UTIL.getConnection().getTable(desc.getTableName());
154    Put put = new Put(Bytes.toBytes("testrow"));
155    put.addColumn(Bytes.toBytes("fam"), Bytes.toBytes("col"), Bytes.toBytes("testdata"));
156    LOG.info("Putting table " + tableName);
157    table.put(put);
158    table.close();
159  }
160
161  /**
162   * Tests that the master does not call retainAssignment after recovery from expired zookeeper
163   * session. Without the HBASE-6046 fix master always tries to assign all the user regions by
164   * calling retainAssignment.
165   */
166  @Test
167  public void testRegionAssignmentAfterMasterRecoveryDueToZKExpiry(TestInfo testInfo)
168    throws Exception {
169    SingleProcessHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
170    cluster.startRegionServer();
171    cluster.waitForActiveAndReadyMaster(10000);
172    HMaster m = cluster.getMaster();
173    final ZKWatcher zkw = m.getZooKeeper();
174    // now the cluster is up. So assign some regions.
175    try (Admin admin = TEST_UTIL.getAdmin()) {
176      byte[][] SPLIT_KEYS = new byte[][] { Bytes.toBytes("a"), Bytes.toBytes("b"),
177        Bytes.toBytes("c"), Bytes.toBytes("d"), Bytes.toBytes("e"), Bytes.toBytes("f"),
178        Bytes.toBytes("g"), Bytes.toBytes("h"), Bytes.toBytes("i"), Bytes.toBytes("j") };
179      TableDescriptor htd = TableDescriptorBuilder
180        .newBuilder(TableName.valueOf(testInfo.getTestMethod().get().getName()))
181        .setColumnFamily(ColumnFamilyDescriptorBuilder.of(HConstants.CATALOG_FAMILY)).build();
182      admin.createTable(htd, SPLIT_KEYS);
183      TEST_UTIL.waitUntilNoRegionsInTransition(60000);
184      m.getZooKeeper().close();
185      MockLoadBalancer.retainAssignCalled = false;
186      final int expectedNumOfListeners = countPermanentListeners(zkw);
187      // the master could already been aborted by some background tasks but here we call abort
188      // directly to make sure this will happen
189      m.abort("Test recovery from zk session expired",
190        new KeeperException.SessionExpiredException());
191      // it is possible that our abort call above returned earlier because of someone else has
192      // already called abort, but it is possible that it has not finished the abort call yet so the
193      // isStopped flag is still false, let's wait for sometime.
194      TEST_UTIL.waitFor(5000, () -> m.isStopped()); // Master doesn't recover any more
195
196      // The recovered master should not call retainAssignment, as it is not a
197      // clean startup.
198      assertFalse(MockLoadBalancer.retainAssignCalled, "Retain assignment should not be called");
199      // number of listeners should be same as the value before master aborted
200      // wait for new master is initialized
201      cluster.waitForActiveAndReadyMaster(120000);
202      final HMaster newMaster = cluster.getMasterThread().getMaster();
203      assertEquals(expectedNumOfListeners, countPermanentListeners(newMaster.getZooKeeper()));
204    }
205  }
206
207  /**
208   * Count listeners in zkw excluding listeners, that belongs to workers or other temporary
209   * processes.
210   */
211  private int countPermanentListeners(ZKWatcher watcher) {
212    return countListeners(watcher, ZkSplitLogWorkerCoordination.class);
213  }
214
215  /**
216   * Count listeners in zkw excluding provided classes
217   */
218  private int countListeners(ZKWatcher watcher, Class<?>... exclude) {
219    int cnt = 0;
220    for (Object o : watcher.getListeners()) {
221      boolean skip = false;
222      for (Class<?> aClass : exclude) {
223        if (aClass.isAssignableFrom(o.getClass())) {
224          skip = true;
225          break;
226        }
227      }
228      if (!skip) {
229        cnt += 1;
230      }
231    }
232    return cnt;
233  }
234
235  /**
236   * Tests whether the logs are split when master recovers from a expired zookeeper session and an
237   * RS goes down.
238   */
239  @Test
240  public void testLogSplittingAfterMasterRecoveryDueToZKExpiry(TestInfo testInfo) throws Exception {
241    SingleProcessHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
242    cluster.startRegionServer();
243    TableName tableName = TableName.valueOf(testInfo.getTestMethod().get().getName());
244    byte[] family = Bytes.toBytes("col");
245    try (Admin admin = TEST_UTIL.getAdmin()) {
246      byte[][] SPLIT_KEYS = new byte[][] { Bytes.toBytes("1"), Bytes.toBytes("2"),
247        Bytes.toBytes("3"), Bytes.toBytes("4"), Bytes.toBytes("5") };
248      TableDescriptor htd = TableDescriptorBuilder.newBuilder(tableName)
249        .setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)).build();
250      admin.createTable(htd, SPLIT_KEYS);
251    }
252    TEST_UTIL.waitUntilNoRegionsInTransition(60000);
253    HMaster m = cluster.getMaster();
254    try (Table table = TEST_UTIL.getConnection().getTable(tableName)) {
255      int numberOfPuts;
256      for (numberOfPuts = 0; numberOfPuts < 6; numberOfPuts++) {
257        Put p = new Put(Bytes.toBytes(numberOfPuts));
258        p.addColumn(Bytes.toBytes("col"), Bytes.toBytes("ql"),
259          Bytes.toBytes("value" + numberOfPuts));
260        table.put(p);
261      }
262      m.abort("Test recovery from zk session expired",
263        new KeeperException.SessionExpiredException());
264      assertTrue(m.isStopped()); // Master doesn't recover any more
265      cluster.killRegionServer(TEST_UTIL.getRSForFirstRegionInTable(tableName).getServerName());
266      // Without patch for HBASE-6046 this test case will always timeout
267      // with patch the test case should pass.
268      int numberOfRows = 0;
269      try (ResultScanner scanner = table.getScanner(new Scan())) {
270        while (scanner.next() != null) {
271          numberOfRows++;
272        }
273      }
274      assertEquals(numberOfPuts, numberOfRows, "Number of rows should be equal to number of puts.");
275    }
276  }
277
278  static class MockLoadBalancer extends SimpleLoadBalancer {
279    static boolean retainAssignCalled = false;
280
281    @Override
282    @NonNull
283    public Map<ServerName, List<RegionInfo>> retainAssignment(Map<RegionInfo, ServerName> regions,
284      List<ServerName> servers) throws HBaseIOException {
285      retainAssignCalled = true;
286      return super.retainAssignment(regions, servers);
287    }
288  }
289
290}