001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master;
019
020import static org.junit.Assert.assertFalse;
021import static org.junit.Assert.assertNotNull;
022import static org.junit.Assert.assertNull;
023import static org.junit.Assert.assertTrue;
024
025import java.io.IOException;
026import java.util.List;
027import java.util.Optional;
028import java.util.concurrent.CountDownLatch;
029import java.util.stream.Collectors;
030
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.hbase.CompatibilityFactory;
033import org.apache.hadoop.hbase.HBaseClassTestRule;
034import org.apache.hadoop.hbase.ServerName;
035import org.apache.hadoop.hbase.StartMiniClusterOption;
036import org.apache.hadoop.hbase.TableName;
037import org.apache.hadoop.hbase.client.RegionInfo;
038import org.apache.hadoop.hbase.client.Table;
039import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
040import org.apache.hadoop.hbase.master.assignment.ServerState;
041import org.apache.hadoop.hbase.master.assignment.ServerStateNode;
042import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
043import org.apache.hadoop.hbase.procedure2.Procedure;
044import org.apache.hadoop.hbase.regionserver.HRegionServer;
045import org.apache.hadoop.hbase.test.MetricsAssertHelper;
046import org.apache.hadoop.hbase.testclassification.LargeTests;
047import org.apache.hadoop.hbase.testclassification.MasterTests;
048import org.apache.hadoop.hbase.util.JVMClusterUtil;
049import org.junit.ClassRule;
050import org.junit.Test;
051import org.junit.experimental.categories.Category;
052import org.slf4j.Logger;
053import org.slf4j.LoggerFactory;
054
055@Category({ MasterTests.class, LargeTests.class })
056public class TestClusterRestartFailover extends AbstractTestRestartCluster {
057
058  @ClassRule
059  public static final HBaseClassTestRule CLASS_RULE =
060    HBaseClassTestRule.forClass(TestClusterRestartFailover.class);
061
062  private static final Logger LOG = LoggerFactory.getLogger(TestClusterRestartFailover.class);
063  private static final MetricsAssertHelper metricsHelper =
064    CompatibilityFactory.getInstance(MetricsAssertHelper.class);
065
066  private volatile static CountDownLatch SCP_LATCH;
067  private static ServerName SERVER_FOR_TEST;
068
069  @Override
070  protected boolean splitWALCoordinatedByZk() {
071    return true;
072  }
073
074  private ServerStateNode getServerStateNode(ServerName serverName) {
075    return UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates()
076      .getServerNode(serverName);
077  }
078
079  /**
080   * Test for HBASE-22964
081   */
082  @Test
083  public void test() throws Exception {
084    setupCluster();
085    setupTable();
086
087    // Find server that does not have hbase:namespace on it. This tests holds up SCPs. If it
088    // holds up the server w/ hbase:namespace, the Master initialization will be held up
089    // because this table is not online and test fails.
090    for (JVMClusterUtil.RegionServerThread rst:
091        UTIL.getHBaseCluster().getLiveRegionServerThreads()) {
092      HRegionServer rs = rst.getRegionServer();
093      if (rs.getRegions(TableName.NAMESPACE_TABLE_NAME).isEmpty()) {
094        SERVER_FOR_TEST = rs.getServerName();
095      }
096    }
097    UTIL.waitFor(60000, () -> getServerStateNode(SERVER_FOR_TEST) != null);
098    ServerStateNode serverNode = getServerStateNode(SERVER_FOR_TEST);
099    assertNotNull(serverNode);
100    assertTrue("serverNode should be ONLINE when cluster runs normally",
101        serverNode.isInState(ServerState.ONLINE));
102
103    SCP_LATCH = new CountDownLatch(1);
104
105    // Shutdown cluster and restart
106    List<Integer> ports =
107        UTIL.getHBaseCluster().getMaster().getServerManager().getOnlineServersList().stream()
108            .map(serverName -> serverName.getPort()).collect(Collectors.toList());
109    LOG.info("Shutting down cluster");
110    UTIL.getHBaseCluster().killAll();
111    UTIL.getHBaseCluster().waitUntilShutDown();
112    LOG.info("Restarting cluster");
113    UTIL.restartHBaseCluster(StartMiniClusterOption.builder().masterClass(HMasterForTest.class)
114        .numMasters(1).numRegionServers(3).rsPorts(ports).build());
115    LOG.info("Started cluster");
116    UTIL.waitFor(60000, () -> UTIL.getHBaseCluster().getMaster().isInitialized());
117    LOG.info("Started cluster master, waiting for {}", SERVER_FOR_TEST);
118    UTIL.waitFor(60000, () -> getServerStateNode(SERVER_FOR_TEST) != null);
119    serverNode = getServerStateNode(SERVER_FOR_TEST);
120    assertFalse("serverNode should not be ONLINE during SCP processing",
121        serverNode.isInState(ServerState.ONLINE));
122    Optional<Procedure<?>> procedure = UTIL.getHBaseCluster().getMaster().getProcedures().stream()
123        .filter(p -> (p instanceof ServerCrashProcedure) &&
124            ((ServerCrashProcedure) p).getServerName().equals(SERVER_FOR_TEST)).findAny();
125    assertTrue("Should have one SCP for " + SERVER_FOR_TEST, procedure.isPresent());
126    assertTrue("Submit the SCP for the same serverName " + SERVER_FOR_TEST + " which should fail",
127      UTIL.getHBaseCluster().getMaster().getServerManager().expireServer(SERVER_FOR_TEST) ==
128          Procedure.NO_PROC_ID);
129
130    // Wait the SCP to finish
131    LOG.info("Waiting on latch");
132    SCP_LATCH.countDown();
133    UTIL.waitFor(60000, () -> procedure.get().isFinished());
134
135    assertFalse("Even when the SCP is finished, the duplicate SCP should not be scheduled for " +
136            SERVER_FOR_TEST,
137      UTIL.getHBaseCluster().getMaster().getServerManager().expireServer(SERVER_FOR_TEST) ==
138        Procedure.NO_PROC_ID);
139    serverNode = UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates()
140        .getServerNode(SERVER_FOR_TEST);
141    assertNull("serverNode should be deleted after SCP finished", serverNode);
142
143    MetricsMasterSource masterSource = UTIL.getHBaseCluster().getMaster().getMasterMetrics()
144      .getMetricsSource();
145    metricsHelper.assertCounter(MetricsMasterSource.SERVER_CRASH_METRIC_PREFIX+"SubmittedCount",
146      4, masterSource);
147  }
148
149  private void setupCluster() throws Exception {
150    LOG.info("Setup cluster");
151    UTIL.startMiniCluster(
152        StartMiniClusterOption.builder().masterClass(HMasterForTest.class).numMasters(1)
153            .numRegionServers(3).build());
154    LOG.info("Cluster is up");
155    UTIL.waitFor(60000, () -> UTIL.getMiniHBaseCluster().getMaster().isInitialized());
156    LOG.info("Master is up");
157    // wait for all SCPs finished
158    UTIL.waitFor(60000, () -> UTIL.getHBaseCluster().getMaster().getProcedures().stream()
159        .noneMatch(p -> p instanceof ServerCrashProcedure));
160    LOG.info("No SCPs");
161  }
162
163  private void setupTable() throws Exception {
164    TableName tableName = TABLES[0];
165    UTIL.createMultiRegionTable(tableName, FAMILY);
166    UTIL.waitTableAvailable(tableName);
167    Table table = UTIL.getConnection().getTable(tableName);
168    for (int i = 0; i < 100; i++) {
169      UTIL.loadTable(table, FAMILY);
170    }
171  }
172
173  public static final class HMasterForTest extends HMaster {
174
175    public HMasterForTest(Configuration conf) throws IOException {
176      super(conf);
177    }
178
179    @Override
180    protected AssignmentManager createAssignmentManager(MasterServices master) {
181      return new AssignmentManagerForTest(master);
182    }
183  }
184
185  private static final class AssignmentManagerForTest extends AssignmentManager {
186
187    public AssignmentManagerForTest(MasterServices master) {
188      super(master);
189    }
190
191    @Override
192    public List<RegionInfo> getRegionsOnServer(ServerName serverName) {
193      List<RegionInfo> regions = super.getRegionsOnServer(serverName);
194      // ServerCrashProcedure will call this method, so wait the CountDownLatch here
195      if (SCP_LATCH != null && SERVER_FOR_TEST != null && serverName.equals(SERVER_FOR_TEST)) {
196        try {
197          LOG.info("ServerCrashProcedure wait the CountDownLatch here");
198          SCP_LATCH.await();
199          LOG.info("Continue the ServerCrashProcedure");
200          SCP_LATCH = null;
201        } catch (InterruptedException e) {
202          throw new RuntimeException(e);
203        }
204      }
205      return regions;
206    }
207  }
208}