001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master;
019
020import static org.junit.Assert.assertEquals;
021
022import java.io.IOException;
023import java.util.Arrays;
024import java.util.Collection;
025import java.util.List;
026import java.util.NavigableSet;
027import java.util.Set;
028import java.util.TreeSet;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.hbase.HBaseClassTestRule;
031import org.apache.hadoop.hbase.HBaseConfiguration;
032import org.apache.hadoop.hbase.HBaseTestingUtility;
033import org.apache.hadoop.hbase.HConstants;
034import org.apache.hadoop.hbase.MiniHBaseCluster;
035import org.apache.hadoop.hbase.ServerName;
036import org.apache.hadoop.hbase.StartMiniClusterOption;
037import org.apache.hadoop.hbase.TableName;
038import org.apache.hadoop.hbase.client.RegionInfo;
039import org.apache.hadoop.hbase.client.RegionLocator;
040import org.apache.hadoop.hbase.client.Table;
041import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
042import org.apache.hadoop.hbase.testclassification.LargeTests;
043import org.apache.hadoop.hbase.testclassification.MasterTests;
044import org.apache.hadoop.hbase.util.Bytes;
045import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
046import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
047import org.junit.ClassRule;
048import org.junit.Rule;
049import org.junit.Test;
050import org.junit.experimental.categories.Category;
051import org.junit.rules.TestName;
052import org.junit.runner.RunWith;
053import org.junit.runners.Parameterized;
054import org.slf4j.Logger;
055import org.slf4j.LoggerFactory;
056
057import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
058
059/**
060 * Tests the restarting of everything as done during rolling restarts.
061 */
062@RunWith(Parameterized.class)
063@Category({ MasterTests.class, LargeTests.class })
064public class TestRollingRestart {
065
066  @ClassRule
067  public static final HBaseClassTestRule CLASS_RULE =
068    HBaseClassTestRule.forClass(TestRollingRestart.class);
069
070  private static final Logger LOG = LoggerFactory.getLogger(TestRollingRestart.class);
071
072  private static HBaseTestingUtility TEST_UTIL;
073  @Rule
074  public TestName name = new TestName();
075
076  @Parameterized.Parameter
077  public boolean splitWALCoordinatedByZK;
078
079  @Test
080  public void testBasicRollingRestart() throws Exception {
081
082    // Start a cluster with 2 masters and 4 regionservers
083    final int NUM_MASTERS = 2;
084    final int NUM_RS = 3;
085    final int NUM_REGIONS_TO_CREATE = 20;
086
087    int expectedNumRS = 3;
088
089    // Start the cluster
090    log("Starting cluster");
091    Configuration conf = HBaseConfiguration.create();
092    conf.setBoolean(HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK, splitWALCoordinatedByZK);
093    TEST_UTIL = new HBaseTestingUtility(conf);
094    StartMiniClusterOption option = StartMiniClusterOption.builder().numMasters(NUM_MASTERS)
095      .numRegionServers(NUM_RS).numDataNodes(NUM_RS).build();
096    TEST_UTIL.startMiniCluster(option);
097    MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
098    log("Waiting for active/ready master");
099    cluster.waitForActiveAndReadyMaster();
100
101    // Create a table with regions
102    final TableName tableName =
103      TableName.valueOf(name.getMethodName().replaceAll("[\\[|\\]]", "-"));
104    byte[] family = Bytes.toBytes("family");
105    log("Creating table with " + NUM_REGIONS_TO_CREATE + " regions");
106    Table ht = TEST_UTIL.createMultiRegionTable(tableName, family, NUM_REGIONS_TO_CREATE);
107    int numRegions = -1;
108    try (RegionLocator r = TEST_UTIL.getConnection().getRegionLocator(tableName)) {
109      numRegions = r.getStartKeys().length;
110    }
111    numRegions += 1; // catalogs
112    log("Waiting for no more RIT\n");
113    TEST_UTIL.waitUntilNoRegionsInTransition(60000);
114    log("Disabling table\n");
115    TEST_UTIL.getAdmin().disableTable(tableName);
116    log("Waiting for no more RIT\n");
117    TEST_UTIL.waitUntilNoRegionsInTransition(60000);
118    NavigableSet<String> regions = HBaseTestingUtility.getAllOnlineRegions(cluster);
119    log("Verifying only catalog and namespace regions are assigned\n");
120    if (regions.size() != 2) {
121      for (String oregion : regions)
122        log("Region still online: " + oregion);
123    }
124    assertEquals(2, regions.size());
125    log("Enabling table\n");
126    TEST_UTIL.getAdmin().enableTable(tableName);
127    log("Waiting for no more RIT\n");
128    TEST_UTIL.waitUntilNoRegionsInTransition(60000);
129    log("Verifying there are " + numRegions + " assigned on cluster\n");
130    regions = HBaseTestingUtility.getAllOnlineRegions(cluster);
131    assertRegionsAssigned(cluster, regions);
132    assertEquals(expectedNumRS, cluster.getRegionServerThreads().size());
133
134    // Add a new regionserver
135    log("Adding a fourth RS");
136    RegionServerThread restarted = cluster.startRegionServer();
137    expectedNumRS++;
138    restarted.waitForServerOnline();
139    log("Additional RS is online");
140    log("Waiting for no more RIT");
141    TEST_UTIL.waitUntilNoRegionsInTransition(60000);
142    log("Verifying there are " + numRegions + " assigned on cluster");
143    assertRegionsAssigned(cluster, regions);
144    assertEquals(expectedNumRS, cluster.getRegionServerThreads().size());
145
146    // Master Restarts
147    List<MasterThread> masterThreads = cluster.getMasterThreads();
148    MasterThread activeMaster = null;
149    MasterThread backupMaster = null;
150    assertEquals(2, masterThreads.size());
151    if (masterThreads.get(0).getMaster().isActiveMaster()) {
152      activeMaster = masterThreads.get(0);
153      backupMaster = masterThreads.get(1);
154    } else {
155      activeMaster = masterThreads.get(1);
156      backupMaster = masterThreads.get(0);
157    }
158
159    // Bring down the backup master
160    log("Stopping backup master\n\n");
161    backupMaster.getMaster().stop("Stop of backup during rolling restart");
162    cluster.hbaseCluster.waitOnMaster(backupMaster);
163
164    // Bring down the primary master
165    log("Stopping primary master\n\n");
166    activeMaster.getMaster().stop("Stop of active during rolling restart");
167    cluster.hbaseCluster.waitOnMaster(activeMaster);
168
169    // Start primary master
170    log("Restarting primary master\n\n");
171    activeMaster = cluster.startMaster();
172    cluster.waitForActiveAndReadyMaster();
173
174    // Start backup master
175    log("Restarting backup master\n\n");
176    backupMaster = cluster.startMaster();
177
178    assertEquals(expectedNumRS, cluster.getRegionServerThreads().size());
179
180    // RegionServer Restarts
181
182    // Bring them down, one at a time, waiting between each to complete
183    List<RegionServerThread> regionServers = cluster.getLiveRegionServerThreads();
184    int num = 1;
185    int total = regionServers.size();
186    for (RegionServerThread rst : regionServers) {
187      ServerName serverName = rst.getRegionServer().getServerName();
188      log("Stopping region server " + num + " of " + total + " [ " + serverName + "]");
189      rst.getRegionServer().stop("Stopping RS during rolling restart");
190      cluster.hbaseCluster.waitOnRegionServer(rst);
191      log("Waiting for RS shutdown to be handled by master");
192      waitForRSShutdownToStartAndFinish(activeMaster, serverName);
193      log("RS shutdown done, waiting for no more RIT");
194      TEST_UTIL.waitUntilNoRegionsInTransition(60000);
195      log("Verifying there are " + numRegions + " assigned on cluster");
196      assertRegionsAssigned(cluster, regions);
197      expectedNumRS--;
198      assertEquals(expectedNumRS, cluster.getRegionServerThreads().size());
199      log("Restarting region server " + num + " of " + total);
200      restarted = cluster.startRegionServer();
201      restarted.waitForServerOnline();
202      expectedNumRS++;
203      log("Region server " + num + " is back online");
204      log("Waiting for no more RIT");
205      TEST_UTIL.waitUntilNoRegionsInTransition(60000);
206      log("Verifying there are " + numRegions + " assigned on cluster");
207      assertRegionsAssigned(cluster, regions);
208      assertEquals(expectedNumRS, cluster.getRegionServerThreads().size());
209      num++;
210    }
211    Thread.sleep(1000);
212    assertRegionsAssigned(cluster, regions);
213
214    // TODO: Bring random 3 of 4 RS down at the same time
215
216    ht.close();
217    // Stop the cluster
218    TEST_UTIL.shutdownMiniCluster();
219  }
220
221  /**
222   * Checks if the SCP of specific dead server has been executed.
223   * @return true if the SCP of specific serverName has been executed, false if not
224   */
225  private boolean isDeadServerSCPExecuted(ServerName serverName) throws IOException {
226    return TEST_UTIL.getMiniHBaseCluster().getMaster().getProcedures().stream()
227      .anyMatch(p -> p instanceof ServerCrashProcedure
228        && ((ServerCrashProcedure) p).getServerName().equals(serverName));
229  }
230
231  private void waitForRSShutdownToStartAndFinish(MasterThread activeMaster, ServerName serverName)
232    throws InterruptedException, IOException {
233    ServerManager sm = activeMaster.getMaster().getServerManager();
234    // First wait for it to be in dead list
235    while (!sm.getDeadServers().isDeadServer(serverName)) {
236      log("Waiting for [" + serverName + "] to be listed as dead in master");
237      Thread.sleep(1);
238    }
239    log(
240      "Server [" + serverName + "] marked as dead, waiting for it to " + "finish dead processing");
241
242    TEST_UTIL.waitFor(60000, () -> isDeadServerSCPExecuted(serverName));
243
244    while (sm.areDeadServersInProgress()) {
245      log("Server [" + serverName + "] still being processed, waiting");
246      Thread.sleep(100);
247    }
248    log("Server [" + serverName + "] done with server shutdown processing");
249  }
250
251  private void log(String msg) {
252    LOG.debug("\n\nTRR: " + msg + "\n");
253  }
254
255  private int getNumberOfOnlineRegions(MiniHBaseCluster cluster) {
256    int numFound = 0;
257    for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) {
258      numFound += rst.getRegionServer().getNumberOfOnlineRegions();
259    }
260    for (MasterThread mt : cluster.getMasterThreads()) {
261      numFound += mt.getMaster().getNumberOfOnlineRegions();
262    }
263    return numFound;
264  }
265
266  private void assertRegionsAssigned(MiniHBaseCluster cluster, Set<String> expectedRegions)
267    throws IOException {
268    int numFound = getNumberOfOnlineRegions(cluster);
269    if (expectedRegions.size() > numFound) {
270      log("Expected to find " + expectedRegions.size() + " but only found" + " " + numFound);
271      NavigableSet<String> foundRegions = HBaseTestingUtility.getAllOnlineRegions(cluster);
272      for (String region : expectedRegions) {
273        if (!foundRegions.contains(region)) {
274          log("Missing region: " + region);
275        }
276      }
277      assertEquals(expectedRegions.size(), numFound);
278    } else if (expectedRegions.size() < numFound) {
279      int doubled = numFound - expectedRegions.size();
280      log("Expected to find " + expectedRegions.size() + " but found" + " " + numFound + " ("
281        + doubled + " double assignments?)");
282      NavigableSet<String> doubleRegions = getDoubleAssignedRegions(cluster);
283      for (String region : doubleRegions) {
284        log("Region is double assigned: " + region);
285      }
286      assertEquals(expectedRegions.size(), numFound);
287    } else {
288      log("Success!  Found expected number of " + numFound + " regions");
289    }
290  }
291
292  private NavigableSet<String> getDoubleAssignedRegions(MiniHBaseCluster cluster)
293    throws IOException {
294    NavigableSet<String> online = new TreeSet<>();
295    NavigableSet<String> doubled = new TreeSet<>();
296    for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) {
297      for (RegionInfo region : ProtobufUtil
298        .getOnlineRegions(rst.getRegionServer().getRSRpcServices())) {
299        if (!online.add(region.getRegionNameAsString())) {
300          doubled.add(region.getRegionNameAsString());
301        }
302      }
303    }
304    return doubled;
305  }
306
307  @Parameterized.Parameters
308  public static Collection coordinatedByZK() {
309    return Arrays.asList(false, true);
310  }
311}