001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master;
019
020import static org.junit.Assert.assertFalse;
021import static org.junit.Assert.assertNotNull;
022import static org.junit.Assert.assertNull;
023import static org.junit.Assert.assertTrue;
024
025import java.io.IOException;
026import java.util.List;
027import java.util.Optional;
028import java.util.concurrent.CountDownLatch;
029import java.util.stream.Collectors;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.hbase.CompatibilityFactory;
032import org.apache.hadoop.hbase.HBaseClassTestRule;
033import org.apache.hadoop.hbase.ServerName;
034import org.apache.hadoop.hbase.StartMiniClusterOption;
035import org.apache.hadoop.hbase.TableName;
036import org.apache.hadoop.hbase.client.RegionInfo;
037import org.apache.hadoop.hbase.client.Table;
038import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
039import org.apache.hadoop.hbase.master.assignment.ServerState;
040import org.apache.hadoop.hbase.master.assignment.ServerStateNode;
041import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
042import org.apache.hadoop.hbase.master.region.MasterRegion;
043import org.apache.hadoop.hbase.procedure2.Procedure;
044import org.apache.hadoop.hbase.regionserver.HRegionServer;
045import org.apache.hadoop.hbase.test.MetricsAssertHelper;
046import org.apache.hadoop.hbase.testclassification.LargeTests;
047import org.apache.hadoop.hbase.testclassification.MasterTests;
048import org.apache.hadoop.hbase.util.JVMClusterUtil;
049import org.junit.ClassRule;
050import org.junit.Test;
051import org.junit.experimental.categories.Category;
052import org.slf4j.Logger;
053import org.slf4j.LoggerFactory;
054
055@Category({ MasterTests.class, LargeTests.class })
056public class TestClusterRestartFailover extends AbstractTestRestartCluster {
057
058  @ClassRule
059  public static final HBaseClassTestRule CLASS_RULE =
060    HBaseClassTestRule.forClass(TestClusterRestartFailover.class);
061
062  private static final Logger LOG = LoggerFactory.getLogger(TestClusterRestartFailover.class);
063  private static final MetricsAssertHelper metricsHelper =
064    CompatibilityFactory.getInstance(MetricsAssertHelper.class);
065
066  private volatile static CountDownLatch SCP_LATCH;
067  private static ServerName SERVER_FOR_TEST;
068
069  @Override
070  protected boolean splitWALCoordinatedByZk() {
071    return true;
072  }
073
074  private ServerStateNode getServerStateNode(ServerName serverName) {
075    return UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates()
076      .getServerNode(serverName);
077  }
078
079  /**
080   * Test for HBASE-22964
081   */
082  @Test
083  public void test() throws Exception {
084    setupCluster();
085    setupTable();
086
087    // Find server that does not have hbase:namespace on it. This tests holds up SCPs. If it
088    // holds up the server w/ hbase:namespace, the Master initialization will be held up
089    // because this table is not online and test fails.
090    for (JVMClusterUtil.RegionServerThread rst : UTIL.getHBaseCluster()
091      .getLiveRegionServerThreads()) {
092      HRegionServer rs = rst.getRegionServer();
093      if (rs.getRegions(TableName.NAMESPACE_TABLE_NAME).isEmpty()) {
094        SERVER_FOR_TEST = rs.getServerName();
095      }
096    }
097    UTIL.waitFor(60000, () -> getServerStateNode(SERVER_FOR_TEST) != null);
098    ServerStateNode serverNode = getServerStateNode(SERVER_FOR_TEST);
099    assertNotNull(serverNode);
100    assertTrue("serverNode should be ONLINE when cluster runs normally",
101      serverNode.isInState(ServerState.ONLINE));
102
103    SCP_LATCH = new CountDownLatch(1);
104
105    // Shutdown cluster and restart
106    List<Integer> ports =
107      UTIL.getHBaseCluster().getMaster().getServerManager().getOnlineServersList().stream()
108        .map(serverName -> serverName.getPort()).collect(Collectors.toList());
109    LOG.info("Shutting down cluster");
110    UTIL.getHBaseCluster().killAll();
111    UTIL.getHBaseCluster().waitUntilShutDown();
112    LOG.info("Restarting cluster");
113    UTIL.restartHBaseCluster(StartMiniClusterOption.builder().masterClass(HMasterForTest.class)
114      .numMasters(1).numRegionServers(3).rsPorts(ports).build());
115    LOG.info("Started cluster");
116    UTIL.waitFor(60000, () -> UTIL.getHBaseCluster().getMaster().isInitialized());
117    LOG.info("Started cluster master, waiting for {}", SERVER_FOR_TEST);
118    UTIL.waitFor(60000, () -> getServerStateNode(SERVER_FOR_TEST) != null);
119    serverNode = getServerStateNode(SERVER_FOR_TEST);
120    assertFalse("serverNode should not be ONLINE during SCP processing",
121      serverNode.isInState(ServerState.ONLINE));
122    Optional<Procedure<?>> procedure = UTIL.getHBaseCluster().getMaster().getProcedures().stream()
123      .filter(p -> (p instanceof ServerCrashProcedure)
124        && ((ServerCrashProcedure) p).getServerName().equals(SERVER_FOR_TEST))
125      .findAny();
126    assertTrue("Should have one SCP for " + SERVER_FOR_TEST, procedure.isPresent());
127    assertTrue("Submit the SCP for the same serverName " + SERVER_FOR_TEST + " which should fail",
128      UTIL.getHBaseCluster().getMaster().getServerManager().expireServer(SERVER_FOR_TEST)
129          == Procedure.NO_PROC_ID);
130
131    // Wait the SCP to finish
132    LOG.info("Waiting on latch");
133    SCP_LATCH.countDown();
134    UTIL.waitFor(60000, () -> procedure.get().isFinished());
135
136    assertFalse(
137      "Even when the SCP is finished, the duplicate SCP should not be scheduled for "
138        + SERVER_FOR_TEST,
139      UTIL.getHBaseCluster().getMaster().getServerManager().expireServer(SERVER_FOR_TEST)
140          == Procedure.NO_PROC_ID);
141    serverNode = UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates()
142      .getServerNode(SERVER_FOR_TEST);
143    assertNull("serverNode should be deleted after SCP finished", serverNode);
144
145    MetricsMasterSource masterSource =
146      UTIL.getHBaseCluster().getMaster().getMasterMetrics().getMetricsSource();
147    metricsHelper.assertCounter(MetricsMasterSource.SERVER_CRASH_METRIC_PREFIX + "SubmittedCount",
148      4, masterSource);
149  }
150
151  private void setupCluster() throws Exception {
152    LOG.info("Setup cluster");
153    UTIL.startMiniCluster(StartMiniClusterOption.builder().masterClass(HMasterForTest.class)
154      .numMasters(1).numRegionServers(3).build());
155    LOG.info("Cluster is up");
156    UTIL.waitFor(60000, () -> UTIL.getMiniHBaseCluster().getMaster().isInitialized());
157    LOG.info("Master is up");
158    // wait for all SCPs finished
159    UTIL.waitFor(60000, () -> UTIL.getHBaseCluster().getMaster().getProcedures().stream()
160      .noneMatch(p -> p instanceof ServerCrashProcedure));
161    LOG.info("No SCPs");
162  }
163
164  private void setupTable() throws Exception {
165    TableName tableName = TABLES[0];
166    UTIL.createMultiRegionTable(tableName, FAMILY);
167    UTIL.waitTableAvailable(tableName);
168    Table table = UTIL.getConnection().getTable(tableName);
169    for (int i = 0; i < 100; i++) {
170      UTIL.loadTable(table, FAMILY);
171    }
172  }
173
174  public static final class HMasterForTest extends HMaster {
175
176    public HMasterForTest(Configuration conf) throws IOException {
177      super(conf);
178    }
179
180    @Override
181    protected AssignmentManager createAssignmentManager(MasterServices master,
182      MasterRegion masterRegion) {
183      return new AssignmentManagerForTest(master, masterRegion);
184    }
185  }
186
187  private static final class AssignmentManagerForTest extends AssignmentManager {
188
189    public AssignmentManagerForTest(MasterServices master, MasterRegion masterRegion) {
190      super(master, masterRegion);
191    }
192
193    @Override
194    public List<RegionInfo> getRegionsOnServer(ServerName serverName) {
195      List<RegionInfo> regions = super.getRegionsOnServer(serverName);
196      // ServerCrashProcedure will call this method, so wait the CountDownLatch here
197      if (SCP_LATCH != null && SERVER_FOR_TEST != null && serverName.equals(SERVER_FOR_TEST)) {
198        try {
199          LOG.info("ServerCrashProcedure wait the CountDownLatch here");
200          SCP_LATCH.await();
201          LOG.info("Continue the ServerCrashProcedure");
202          SCP_LATCH = null;
203        } catch (InterruptedException e) {
204          throw new RuntimeException(e);
205        }
206      }
207      return regions;
208    }
209  }
210}