001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master;
019
020import static org.junit.Assert.assertFalse;
021import static org.junit.Assert.assertNotNull;
022import static org.junit.Assert.assertNull;
023import static org.junit.Assert.assertTrue;
024
025import java.io.IOException;
026import java.util.List;
027import java.util.Optional;
028import java.util.concurrent.CountDownLatch;
029import java.util.stream.Collectors;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.hbase.CompatibilityFactory;
032import org.apache.hadoop.hbase.HBaseClassTestRule;
033import org.apache.hadoop.hbase.ServerName;
034import org.apache.hadoop.hbase.StartTestingClusterOption;
035import org.apache.hadoop.hbase.TableName;
036import org.apache.hadoop.hbase.client.RegionInfo;
037import org.apache.hadoop.hbase.client.Table;
038import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
039import org.apache.hadoop.hbase.master.assignment.ServerState;
040import org.apache.hadoop.hbase.master.assignment.ServerStateNode;
041import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
042import org.apache.hadoop.hbase.master.region.MasterRegion;
043import org.apache.hadoop.hbase.procedure2.Procedure;
044import org.apache.hadoop.hbase.test.MetricsAssertHelper;
045import org.apache.hadoop.hbase.testclassification.LargeTests;
046import org.apache.hadoop.hbase.testclassification.MasterTests;
047import org.junit.ClassRule;
048import org.junit.Test;
049import org.junit.experimental.categories.Category;
050import org.slf4j.Logger;
051import org.slf4j.LoggerFactory;
052
053@Category({ MasterTests.class, LargeTests.class })
054public class TestClusterRestartFailover extends AbstractTestRestartCluster {
055
056  @ClassRule
057  public static final HBaseClassTestRule CLASS_RULE =
058    HBaseClassTestRule.forClass(TestClusterRestartFailover.class);
059
060  private static final Logger LOG = LoggerFactory.getLogger(TestClusterRestartFailover.class);
061  private static final MetricsAssertHelper metricsHelper =
062    CompatibilityFactory.getInstance(MetricsAssertHelper.class);
063
064  private volatile static CountDownLatch SCP_LATCH;
065  private static ServerName SERVER_FOR_TEST;
066
067  @Override
068  protected boolean splitWALCoordinatedByZk() {
069    return true;
070  }
071
072  private ServerStateNode getServerStateNode(ServerName serverName) {
073    return UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates()
074      .getServerNode(serverName);
075  }
076
077  /**
078   * Test for HBASE-22964
079   */
080  @Test
081  public void test() throws Exception {
082    setupCluster();
083    setupTable();
084
085    SERVER_FOR_TEST = UTIL.getHBaseCluster().getRegionServer(0).getServerName();
086    UTIL.waitFor(60000, () -> getServerStateNode(SERVER_FOR_TEST) != null);
087    ServerStateNode serverNode = getServerStateNode(SERVER_FOR_TEST);
088    assertNotNull(serverNode);
089    assertTrue("serverNode should be ONLINE when cluster runs normally",
090        serverNode.isInState(ServerState.ONLINE));
091
092    SCP_LATCH = new CountDownLatch(1);
093
094    // Shutdown cluster and restart
095    List<Integer> ports =
096        UTIL.getHBaseCluster().getMaster().getServerManager().getOnlineServersList().stream()
097            .map(serverName -> serverName.getPort()).collect(Collectors.toList());
098    LOG.info("Shutting down cluster");
099    UTIL.getHBaseCluster().killAll();
100    UTIL.getHBaseCluster().waitUntilShutDown();
101    LOG.info("Restarting cluster");
102    UTIL.restartHBaseCluster(StartTestingClusterOption.builder().masterClass(HMasterForTest.class)
103        .numMasters(1).numRegionServers(3).rsPorts(ports).build());
104    LOG.info("Started cluster");
105    UTIL.waitFor(60000, () -> UTIL.getHBaseCluster().getMaster().isInitialized());
106    LOG.info("Started cluster master, waiting for {}", SERVER_FOR_TEST);
107    UTIL.waitFor(60000, () -> getServerStateNode(SERVER_FOR_TEST) != null);
108    serverNode = getServerStateNode(SERVER_FOR_TEST);
109    assertFalse("serverNode should not be ONLINE during SCP processing",
110        serverNode.isInState(ServerState.ONLINE));
111    Optional<Procedure<?>> procedure = UTIL.getHBaseCluster().getMaster().getProcedures().stream()
112        .filter(p -> (p instanceof ServerCrashProcedure) &&
113            ((ServerCrashProcedure) p).getServerName().equals(SERVER_FOR_TEST)).findAny();
114    assertTrue("Should have one SCP for " + SERVER_FOR_TEST, procedure.isPresent());
115    assertTrue("Submit the SCP for the same serverName " + SERVER_FOR_TEST + " which should fail",
116      UTIL.getHBaseCluster().getMaster().getServerManager().expireServer(SERVER_FOR_TEST) ==
117          Procedure.NO_PROC_ID);
118
119    // Wait the SCP to finish
120    LOG.info("Waiting on latch");
121    SCP_LATCH.countDown();
122    UTIL.waitFor(60000, () -> procedure.get().isFinished());
123
124    assertFalse("Even when the SCP is finished, the duplicate SCP should not be scheduled for " +
125            SERVER_FOR_TEST,
126      UTIL.getHBaseCluster().getMaster().getServerManager().expireServer(SERVER_FOR_TEST) ==
127        Procedure.NO_PROC_ID);
128    serverNode = UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates()
129        .getServerNode(SERVER_FOR_TEST);
130    assertNull("serverNode should be deleted after SCP finished", serverNode);
131
132    MetricsMasterSource masterSource = UTIL.getHBaseCluster().getMaster().getMasterMetrics()
133      .getMetricsSource();
134    metricsHelper.assertCounter(MetricsMasterSource.SERVER_CRASH_METRIC_PREFIX+"SubmittedCount",
135      4, masterSource);
136  }
137
138  private void setupCluster() throws Exception {
139    LOG.info("Setup cluster");
140    UTIL.startMiniCluster(
141        StartTestingClusterOption.builder().masterClass(HMasterForTest.class).numMasters(1)
142            .numRegionServers(3).build());
143    LOG.info("Cluster is up");
144    UTIL.waitFor(60000, () -> UTIL.getMiniHBaseCluster().getMaster().isInitialized());
145    LOG.info("Master is up");
146    // wait for all SCPs finished
147    UTIL.waitFor(60000, () -> UTIL.getHBaseCluster().getMaster().getProcedures().stream()
148        .noneMatch(p -> p instanceof ServerCrashProcedure));
149    LOG.info("No SCPs");
150  }
151
152  private void setupTable() throws Exception {
153    TableName tableName = TABLES[0];
154    UTIL.createMultiRegionTable(tableName, FAMILY);
155    UTIL.waitTableAvailable(tableName);
156    Table table = UTIL.getConnection().getTable(tableName);
157    for (int i = 0; i < 100; i++) {
158      UTIL.loadTable(table, FAMILY);
159    }
160  }
161
162  public static final class HMasterForTest extends HMaster {
163
164    public HMasterForTest(Configuration conf) throws IOException {
165      super(conf);
166    }
167
168    @Override
169    protected AssignmentManager createAssignmentManager(MasterServices master,
170      MasterRegion masterRegion) {
171      return new AssignmentManagerForTest(master, masterRegion);
172    }
173  }
174
175  private static final class AssignmentManagerForTest extends AssignmentManager {
176
177    public AssignmentManagerForTest(MasterServices master, MasterRegion masterRegion) {
178      super(master, masterRegion);
179    }
180
181    @Override
182    public List<RegionInfo> getRegionsOnServer(ServerName serverName) {
183      List<RegionInfo> regions = super.getRegionsOnServer(serverName);
184      // ServerCrashProcedure will call this method, so wait the CountDownLatch here
185      if (SCP_LATCH != null && SERVER_FOR_TEST != null && serverName.equals(SERVER_FOR_TEST)) {
186        try {
187          LOG.info("ServerCrashProcedure wait the CountDownLatch here");
188          SCP_LATCH.await();
189          LOG.info("Continue the ServerCrashProcedure");
190          SCP_LATCH = null;
191        } catch (InterruptedException e) {
192          throw new RuntimeException(e);
193        }
194      }
195      return regions;
196    }
197  }
198}