001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertNotNull;
022import static org.junit.jupiter.api.Assertions.assertNull;
023import static org.junit.jupiter.api.Assertions.assertTrue;
024
025import java.io.IOException;
026import java.util.List;
027import java.util.Optional;
028import java.util.concurrent.CountDownLatch;
029import java.util.stream.Collectors;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.hbase.CompatibilityFactory;
032import org.apache.hadoop.hbase.ServerName;
033import org.apache.hadoop.hbase.StartTestingClusterOption;
034import org.apache.hadoop.hbase.TableName;
035import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
036import org.apache.hadoop.hbase.client.RegionInfo;
037import org.apache.hadoop.hbase.client.Table;
038import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
039import org.apache.hadoop.hbase.master.assignment.ServerState;
040import org.apache.hadoop.hbase.master.assignment.ServerStateNode;
041import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
042import org.apache.hadoop.hbase.master.region.MasterRegion;
043import org.apache.hadoop.hbase.procedure2.Procedure;
044import org.apache.hadoop.hbase.test.MetricsAssertHelper;
045import org.apache.hadoop.hbase.testclassification.LargeTests;
046import org.apache.hadoop.hbase.testclassification.MasterTests;
047import org.junit.jupiter.api.Tag;
048import org.junit.jupiter.api.Test;
049import org.slf4j.Logger;
050import org.slf4j.LoggerFactory;
051
052@Tag(MasterTests.TAG)
053@Tag(LargeTests.TAG)
054public class TestClusterRestartFailover extends AbstractTestRestartCluster {
055
056  private static final Logger LOG = LoggerFactory.getLogger(TestClusterRestartFailover.class);
057  private static final MetricsAssertHelper metricsHelper =
058    CompatibilityFactory.getInstance(MetricsAssertHelper.class);
059
060  private volatile static CountDownLatch SCP_LATCH;
061  private static ServerName SERVER_FOR_TEST;
062
063  @Override
064  protected boolean splitWALCoordinatedByZk() {
065    return true;
066  }
067
068  private ServerStateNode getServerStateNode(ServerName serverName) {
069    return UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates()
070      .getServerNode(serverName);
071  }
072
073  /**
074   * Test for HBASE-22964
075   */
076  @Test
077  public void test() throws Exception {
078    setupCluster();
079    setupTable();
080
081    SERVER_FOR_TEST = UTIL.getHBaseCluster().getRegionServer(0).getServerName();
082    UTIL.waitFor(60000, () -> getServerStateNode(SERVER_FOR_TEST) != null);
083    ServerStateNode serverNode = getServerStateNode(SERVER_FOR_TEST);
084    assertNotNull(serverNode);
085    assertTrue(serverNode.isInState(ServerState.ONLINE),
086      "serverNode should be ONLINE when cluster runs normally");
087
088    SCP_LATCH = new CountDownLatch(1);
089
090    // Shutdown cluster and restart
091    List<Integer> ports =
092      UTIL.getHBaseCluster().getMaster().getServerManager().getOnlineServersList().stream()
093        .map(serverName -> serverName.getPort()).collect(Collectors.toList());
094    LOG.info("Shutting down cluster");
095    UTIL.getHBaseCluster().killAll();
096    UTIL.getHBaseCluster().waitUntilShutDown();
097    LOG.info("Restarting cluster");
098    UTIL.restartHBaseCluster(StartTestingClusterOption.builder().masterClass(HMasterForTest.class)
099      .numMasters(1).numRegionServers(3).rsPorts(ports).build());
100    LOG.info("Started cluster");
101    UTIL.waitFor(60000, () -> UTIL.getHBaseCluster().getMaster().isInitialized());
102    LOG.info("Started cluster master, waiting for {}", SERVER_FOR_TEST);
103    UTIL.waitFor(60000, () -> getServerStateNode(SERVER_FOR_TEST) != null);
104    UTIL.waitFor(30000, new ExplainingPredicate<Exception>() {
105
106      @Override
107      public boolean evaluate() throws Exception {
108        return !getServerStateNode(SERVER_FOR_TEST).isInState(ServerState.ONLINE);
109      }
110
111      @Override
112      public String explainFailure() throws Exception {
113        return "serverNode should not be ONLINE during SCP processing";
114      }
115    });
116    Optional<Procedure<?>> procedure = UTIL.getHBaseCluster().getMaster().getProcedures().stream()
117      .filter(p -> (p instanceof ServerCrashProcedure)
118        && ((ServerCrashProcedure) p).getServerName().equals(SERVER_FOR_TEST))
119      .findAny();
120    assertTrue(procedure.isPresent(), "Should have one SCP for " + SERVER_FOR_TEST);
121    assertEquals(Procedure.NO_PROC_ID,
122      UTIL.getHBaseCluster().getMaster().getServerManager().expireServer(SERVER_FOR_TEST),
123      "Submit the SCP for the same serverName " + SERVER_FOR_TEST + " which should fail");
124
125    // Wait the SCP to finish
126    LOG.info("Waiting on latch");
127    SCP_LATCH.countDown();
128    UTIL.waitFor(60000, () -> procedure.get().isFinished());
129    assertNull(getServerStateNode(SERVER_FOR_TEST),
130      "serverNode should be deleted after SCP finished");
131
132    assertEquals(Procedure.NO_PROC_ID,
133      UTIL.getHBaseCluster().getMaster().getServerManager().expireServer(SERVER_FOR_TEST),
134      "Even when the SCP is finished, the duplicate SCP should not be scheduled for "
135        + SERVER_FOR_TEST);
136
137    MetricsMasterSource masterSource =
138      UTIL.getHBaseCluster().getMaster().getMasterMetrics().getMetricsSource();
139    metricsHelper.assertCounter(MetricsMasterSource.SERVER_CRASH_METRIC_PREFIX + "SubmittedCount",
140      3, masterSource);
141  }
142
143  private void setupCluster() throws Exception {
144    LOG.info("Setup cluster");
145    UTIL.startMiniCluster(StartTestingClusterOption.builder().masterClass(HMasterForTest.class)
146      .numMasters(1).numRegionServers(3).build());
147    // this test has been flaky. When it is rerun by surefire, the underlying minicluster isn't
148    // completely cleaned. specifically, the metrics system isn't reset. The result is an otherwise
149    // successful re-run is failed because there's 8 or 12 SCPcounts instead of the 4 that a
150    // single run of the test would otherwise produce. Thus, explicitly reset the metrics source
151    // each time we setup the cluster.
152    UTIL.getMiniHBaseCluster().getMaster().getMasterMetrics().getMetricsSource().init();
153    LOG.info("Cluster is up");
154    UTIL.waitFor(60000, () -> UTIL.getMiniHBaseCluster().getMaster().isInitialized());
155    LOG.info("Master is up");
156    // wait for all SCPs finished
157    UTIL.waitFor(60000, () -> UTIL.getHBaseCluster().getMaster().getProcedures().stream()
158      .noneMatch(p -> p instanceof ServerCrashProcedure));
159    LOG.info("No SCPs");
160  }
161
162  private void setupTable() throws Exception {
163    TableName tableName = TABLES[0];
164    UTIL.createMultiRegionTable(tableName, FAMILY);
165    UTIL.waitTableAvailable(tableName);
166    Table table = UTIL.getConnection().getTable(tableName);
167    for (int i = 0; i < 100; i++) {
168      UTIL.loadTable(table, FAMILY);
169    }
170  }
171
172  public static final class HMasterForTest extends HMaster {
173
174    public HMasterForTest(Configuration conf) throws IOException {
175      super(conf);
176    }
177
178    @Override
179    protected AssignmentManager createAssignmentManager(MasterServices master,
180      MasterRegion masterRegion) {
181      return new AssignmentManagerForTest(master, masterRegion);
182    }
183  }
184
185  private static final class AssignmentManagerForTest extends AssignmentManager {
186
187    public AssignmentManagerForTest(MasterServices master, MasterRegion masterRegion) {
188      super(master, masterRegion);
189    }
190
191    @Override
192    public List<RegionInfo> getRegionsOnServer(ServerName serverName) {
193      List<RegionInfo> regions = super.getRegionsOnServer(serverName);
194      // ServerCrashProcedure will call this method, so wait the CountDownLatch here
195      if (SCP_LATCH != null && SERVER_FOR_TEST != null && serverName.equals(SERVER_FOR_TEST)) {
196        try {
197          LOG.info("ServerCrashProcedure wait the CountDownLatch here");
198          SCP_LATCH.await();
199          LOG.info("Continue the ServerCrashProcedure");
200          SCP_LATCH = null;
201        } catch (InterruptedException e) {
202          throw new RuntimeException(e);
203        }
204      }
205      return regions;
206    }
207  }
208}