001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021
022import java.io.IOException;
023import java.util.Arrays;
024import java.util.List;
025import java.util.NavigableSet;
026import java.util.Set;
027import java.util.TreeSet;
028import java.util.stream.Stream;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.hbase.HBaseConfiguration;
031import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate;
032import org.apache.hadoop.hbase.HBaseTestingUtil;
033import org.apache.hadoop.hbase.HConstants;
034import org.apache.hadoop.hbase.ServerName;
035import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
036import org.apache.hadoop.hbase.StartTestingClusterOption;
037import org.apache.hadoop.hbase.TableName;
038import org.apache.hadoop.hbase.client.RegionInfo;
039import org.apache.hadoop.hbase.client.RegionLocator;
040import org.apache.hadoop.hbase.client.Table;
041import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
042import org.apache.hadoop.hbase.testclassification.LargeTests;
043import org.apache.hadoop.hbase.testclassification.MasterTests;
044import org.apache.hadoop.hbase.util.Bytes;
045import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
046import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
047import org.junit.jupiter.api.BeforeEach;
048import org.junit.jupiter.api.Tag;
049import org.junit.jupiter.api.TestInfo;
050import org.junit.jupiter.api.TestTemplate;
051import org.junit.jupiter.params.provider.Arguments;
052import org.slf4j.Logger;
053import org.slf4j.LoggerFactory;
054
055import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
056
057/**
058 * Tests the restarting of everything as done during rolling restarts.
059 */
060@Tag(MasterTests.TAG)
061@Tag(LargeTests.TAG)
062@HBaseParameterizedTestTemplate(name = "{index}: splitWALCoordinatedByZK={0}")
063public class TestRollingRestart {
064
065  private static final Logger LOG = LoggerFactory.getLogger(TestRollingRestart.class);
066
067  private static HBaseTestingUtil TEST_UTIL;
068  private String testMethodName;
069
070  @BeforeEach
071  public void setTestMethod(TestInfo testInfo) {
072    testMethodName = testInfo.getTestMethod().get().getName();
073  }
074
075  private final boolean splitWALCoordinatedByZK;
076
077  public TestRollingRestart(boolean splitWALCoordinatedByZK) {
078    this.splitWALCoordinatedByZK = splitWALCoordinatedByZK;
079  }
080
081  @TestTemplate
082  public void testBasicRollingRestart() throws Exception {
083
084    // Start a cluster with 2 masters and 4 regionservers
085    final int NUM_MASTERS = 2;
086    final int NUM_RS = 3;
087    final int NUM_REGIONS_TO_CREATE = 20;
088
089    int expectedNumRS = 3;
090
091    // Start the cluster
092    log("Starting cluster");
093    Configuration conf = HBaseConfiguration.create();
094    conf.setBoolean(HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK, splitWALCoordinatedByZK);
095    TEST_UTIL = new HBaseTestingUtil(conf);
096    StartTestingClusterOption option = StartTestingClusterOption.builder().numMasters(NUM_MASTERS)
097      .numRegionServers(NUM_RS).numDataNodes(NUM_RS).build();
098    TEST_UTIL.startMiniCluster(option);
099    SingleProcessHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
100    log("Waiting for active/ready master");
101    cluster.waitForActiveAndReadyMaster();
102
103    // Create a table with regions
104    final TableName tableName = TableName.valueOf(testMethodName.replaceAll("[\\[|\\]]", "-"));
105    byte[] family = Bytes.toBytes("family");
106    log("Creating table with " + NUM_REGIONS_TO_CREATE + " regions");
107    Table ht = TEST_UTIL.createMultiRegionTable(tableName, family, NUM_REGIONS_TO_CREATE);
108    int numRegions = -1;
109    try (RegionLocator r = TEST_UTIL.getConnection().getRegionLocator(tableName)) {
110      numRegions = r.getStartKeys().length;
111    }
112    numRegions += 1; // catalogs
113    log("Waiting for no more RIT\n");
114    TEST_UTIL.waitUntilNoRegionsInTransition(60000);
115    log("Disabling table\n");
116    TEST_UTIL.getAdmin().disableTable(tableName);
117    log("Waiting for no more RIT\n");
118    TEST_UTIL.waitUntilNoRegionsInTransition(60000);
119    NavigableSet<String> regions = HBaseTestingUtil.getAllOnlineRegions(cluster);
120    log("Verifying only catalog region is assigned\n");
121    if (regions.size() != 1) {
122      for (String oregion : regions) {
123        log("Region still online: " + oregion);
124      }
125    }
126    assertEquals(1, regions.size());
127    log("Enabling table\n");
128    TEST_UTIL.getAdmin().enableTable(tableName);
129    log("Waiting for no more RIT\n");
130    TEST_UTIL.waitUntilNoRegionsInTransition(60000);
131    log("Verifying there are " + numRegions + " assigned on cluster\n");
132    regions = HBaseTestingUtil.getAllOnlineRegions(cluster);
133    assertRegionsAssigned(cluster, regions);
134    assertEquals(expectedNumRS, cluster.getRegionServerThreads().size());
135
136    // Add a new regionserver
137    log("Adding a fourth RS");
138    RegionServerThread restarted = cluster.startRegionServer();
139    expectedNumRS++;
140    restarted.waitForServerOnline();
141    log("Additional RS is online");
142    log("Waiting for no more RIT");
143    TEST_UTIL.waitUntilNoRegionsInTransition(60000);
144    log("Verifying there are " + numRegions + " assigned on cluster");
145    assertRegionsAssigned(cluster, regions);
146    assertEquals(expectedNumRS, cluster.getRegionServerThreads().size());
147
148    // Master Restarts
149    List<MasterThread> masterThreads = cluster.getMasterThreads();
150    MasterThread activeMaster = null;
151    MasterThread backupMaster = null;
152    assertEquals(2, masterThreads.size());
153    if (masterThreads.get(0).getMaster().isActiveMaster()) {
154      activeMaster = masterThreads.get(0);
155      backupMaster = masterThreads.get(1);
156    } else {
157      activeMaster = masterThreads.get(1);
158      backupMaster = masterThreads.get(0);
159    }
160
161    // Bring down the backup master
162    log("Stopping backup master\n\n");
163    backupMaster.getMaster().stop("Stop of backup during rolling restart");
164    cluster.hbaseCluster.waitOnMaster(backupMaster);
165
166    // Bring down the primary master
167    log("Stopping primary master\n\n");
168    activeMaster.getMaster().stop("Stop of active during rolling restart");
169    cluster.hbaseCluster.waitOnMaster(activeMaster);
170
171    // Start primary master
172    log("Restarting primary master\n\n");
173    activeMaster = cluster.startMaster();
174    cluster.waitForActiveAndReadyMaster();
175
176    // Start backup master
177    log("Restarting backup master\n\n");
178    backupMaster = cluster.startMaster();
179
180    assertEquals(expectedNumRS, cluster.getRegionServerThreads().size());
181
182    // RegionServer Restarts
183
184    // Bring them down, one at a time, waiting between each to complete
185    List<RegionServerThread> regionServers = cluster.getLiveRegionServerThreads();
186    int num = 1;
187    int total = regionServers.size();
188    for (RegionServerThread rst : regionServers) {
189      ServerName serverName = rst.getRegionServer().getServerName();
190      log("Stopping region server " + num + " of " + total + " [ " + serverName + "]");
191      rst.getRegionServer().stop("Stopping RS during rolling restart");
192      cluster.hbaseCluster.waitOnRegionServer(rst);
193      log("Waiting for RS shutdown to be handled by master");
194      waitForRSShutdownToStartAndFinish(activeMaster, serverName);
195      log("RS shutdown done, waiting for no more RIT");
196      TEST_UTIL.waitUntilNoRegionsInTransition(60000);
197      log("Verifying there are " + numRegions + " assigned on cluster");
198      assertRegionsAssigned(cluster, regions);
199      expectedNumRS--;
200      assertEquals(expectedNumRS, cluster.getRegionServerThreads().size());
201      log("Restarting region server " + num + " of " + total);
202      restarted = cluster.startRegionServer();
203      restarted.waitForServerOnline();
204      expectedNumRS++;
205      log("Region server " + num + " is back online");
206      log("Waiting for no more RIT");
207      TEST_UTIL.waitUntilNoRegionsInTransition(60000);
208      log("Verifying there are " + numRegions + " assigned on cluster");
209      assertRegionsAssigned(cluster, regions);
210      assertEquals(expectedNumRS, cluster.getRegionServerThreads().size());
211      num++;
212    }
213    Thread.sleep(1000);
214    assertRegionsAssigned(cluster, regions);
215
216    // TODO: Bring random 3 of 4 RS down at the same time
217
218    ht.close();
219    // Stop the cluster
220    TEST_UTIL.shutdownMiniCluster();
221  }
222
223  /**
224   * Checks if the SCP of specific dead server has been executed.
225   * @return true if the SCP of specific serverName has been executed, false if not
226   */
227  private boolean isDeadServerSCPExecuted(ServerName serverName) throws IOException {
228    return TEST_UTIL.getMiniHBaseCluster().getMaster().getProcedures().stream()
229      .anyMatch(p -> p instanceof ServerCrashProcedure
230        && ((ServerCrashProcedure) p).getServerName().equals(serverName));
231  }
232
233  private void waitForRSShutdownToStartAndFinish(MasterThread activeMaster, ServerName serverName)
234    throws InterruptedException, IOException {
235    ServerManager sm = activeMaster.getMaster().getServerManager();
236    // First wait for it to be in dead list
237    while (!sm.getDeadServers().isDeadServer(serverName)) {
238      log("Waiting for [" + serverName + "] to be listed as dead in master");
239      Thread.sleep(1);
240    }
241    log(
242      "Server [" + serverName + "] marked as dead, waiting for it to " + "finish dead processing");
243
244    TEST_UTIL.waitFor(60000, () -> isDeadServerSCPExecuted(serverName));
245
246    while (sm.areDeadServersInProgress()) {
247      log("Server [" + serverName + "] still being processed, waiting");
248      Thread.sleep(100);
249    }
250    log("Server [" + serverName + "] done with server shutdown processing");
251  }
252
253  private void log(String msg) {
254    LOG.debug("\n\nTRR: " + msg + "\n");
255  }
256
257  private int getNumberOfOnlineRegions(SingleProcessHBaseCluster cluster) {
258    int numFound = 0;
259    for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) {
260      numFound += rst.getRegionServer().getNumberOfOnlineRegions();
261    }
262    return numFound;
263  }
264
265  private void assertRegionsAssigned(SingleProcessHBaseCluster cluster, Set<String> expectedRegions)
266    throws IOException {
267    int numFound = getNumberOfOnlineRegions(cluster);
268    if (expectedRegions.size() > numFound) {
269      log("Expected to find " + expectedRegions.size() + " but only found" + " " + numFound);
270      NavigableSet<String> foundRegions = HBaseTestingUtil.getAllOnlineRegions(cluster);
271      for (String region : expectedRegions) {
272        if (!foundRegions.contains(region)) {
273          log("Missing region: " + region);
274        }
275      }
276      assertEquals(expectedRegions.size(), numFound);
277    } else if (expectedRegions.size() < numFound) {
278      int doubled = numFound - expectedRegions.size();
279      log("Expected to find " + expectedRegions.size() + " but found" + " " + numFound + " ("
280        + doubled + " double assignments?)");
281      NavigableSet<String> doubleRegions = getDoubleAssignedRegions(cluster);
282      for (String region : doubleRegions) {
283        log("Region is double assigned: " + region);
284      }
285      assertEquals(expectedRegions.size(), numFound);
286    } else {
287      log("Success!  Found expected number of " + numFound + " regions");
288    }
289  }
290
291  private NavigableSet<String> getDoubleAssignedRegions(SingleProcessHBaseCluster cluster)
292    throws IOException {
293    NavigableSet<String> online = new TreeSet<>();
294    NavigableSet<String> doubled = new TreeSet<>();
295    for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) {
296      for (RegionInfo region : ProtobufUtil
297        .getOnlineRegions(rst.getRegionServer().getRSRpcServices())) {
298        if (!online.add(region.getRegionNameAsString())) {
299          doubled.add(region.getRegionNameAsString());
300        }
301      }
302    }
303    return doubled;
304  }
305
306  public static Stream<Arguments> parameters() {
307    return Arrays.asList(false, true).stream().map(Arguments::of);
308  }
309}