001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master;
019
020import static org.junit.Assert.assertEquals;
021
022import java.io.IOException;
023import java.util.Arrays;
024import java.util.Collection;
025import java.util.List;
026import java.util.NavigableSet;
027import java.util.Set;
028import java.util.TreeSet;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.hbase.HBaseClassTestRule;
031import org.apache.hadoop.hbase.HBaseConfiguration;
032import org.apache.hadoop.hbase.HBaseTestingUtil;
033import org.apache.hadoop.hbase.HConstants;
034import org.apache.hadoop.hbase.ServerName;
035import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
036import org.apache.hadoop.hbase.StartTestingClusterOption;
037import org.apache.hadoop.hbase.TableName;
038import org.apache.hadoop.hbase.client.RegionInfo;
039import org.apache.hadoop.hbase.client.RegionLocator;
040import org.apache.hadoop.hbase.client.Table;
041import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
042import org.apache.hadoop.hbase.testclassification.LargeTests;
043import org.apache.hadoop.hbase.testclassification.MasterTests;
044import org.apache.hadoop.hbase.util.Bytes;
045import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
046import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
047import org.junit.ClassRule;
048import org.junit.Rule;
049import org.junit.Test;
050import org.junit.experimental.categories.Category;
051import org.junit.rules.TestName;
052import org.junit.runner.RunWith;
053import org.junit.runners.Parameterized;
054import org.slf4j.Logger;
055import org.slf4j.LoggerFactory;
056
057import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
058
059/**
060 * Tests the restarting of everything as done during rolling restarts.
061 */
062@RunWith(Parameterized.class)
063@Category({MasterTests.class, LargeTests.class})
064public class TestRollingRestart {
065
066  @ClassRule
067  public static final HBaseClassTestRule CLASS_RULE =
068      HBaseClassTestRule.forClass(TestRollingRestart.class);
069
070  private static final Logger LOG = LoggerFactory.getLogger(TestRollingRestart.class);
071
072  private static HBaseTestingUtil TEST_UTIL;
073  @Rule
074  public TestName name = new TestName();
075
076  @Parameterized.Parameter
077  public boolean splitWALCoordinatedByZK;
078
079  @Test
080  public void testBasicRollingRestart() throws Exception {
081
082    // Start a cluster with 2 masters and 4 regionservers
083    final int NUM_MASTERS = 2;
084    final int NUM_RS = 3;
085    final int NUM_REGIONS_TO_CREATE = 20;
086
087    int expectedNumRS = 3;
088
089    // Start the cluster
090    log("Starting cluster");
091    Configuration conf = HBaseConfiguration.create();
092    conf.setBoolean(HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK,
093        splitWALCoordinatedByZK);
094    TEST_UTIL = new HBaseTestingUtil(conf);
095    StartTestingClusterOption option = StartTestingClusterOption.builder()
096        .numMasters(NUM_MASTERS).numRegionServers(NUM_RS).numDataNodes(NUM_RS).build();
097    TEST_UTIL.startMiniCluster(option);
098    SingleProcessHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
099    log("Waiting for active/ready master");
100    cluster.waitForActiveAndReadyMaster();
101
102    // Create a table with regions
103    final TableName tableName =
104        TableName.valueOf(name.getMethodName().replaceAll("[\\[|\\]]", "-"));
105    byte [] family = Bytes.toBytes("family");
106    log("Creating table with " + NUM_REGIONS_TO_CREATE + " regions");
107    Table ht = TEST_UTIL.createMultiRegionTable(tableName, family, NUM_REGIONS_TO_CREATE);
108    int numRegions = -1;
109    try (RegionLocator r = TEST_UTIL.getConnection().getRegionLocator(tableName)) {
110      numRegions = r.getStartKeys().length;
111    }
112    numRegions += 1; // catalogs
113    log("Waiting for no more RIT\n");
114    TEST_UTIL.waitUntilNoRegionsInTransition(60000);
115    log("Disabling table\n");
116    TEST_UTIL.getAdmin().disableTable(tableName);
117    log("Waiting for no more RIT\n");
118    TEST_UTIL.waitUntilNoRegionsInTransition(60000);
119    NavigableSet<String> regions = HBaseTestingUtil.getAllOnlineRegions(cluster);
120    log("Verifying only catalog region is assigned\n");
121    if (regions.size() != 1) {
122      for (String oregion : regions) {
123        log("Region still online: " + oregion);
124      }
125    }
126    assertEquals(1, regions.size());
127    log("Enabling table\n");
128    TEST_UTIL.getAdmin().enableTable(tableName);
129    log("Waiting for no more RIT\n");
130    TEST_UTIL.waitUntilNoRegionsInTransition(60000);
131    log("Verifying there are " + numRegions + " assigned on cluster\n");
132    regions = HBaseTestingUtil.getAllOnlineRegions(cluster);
133    assertRegionsAssigned(cluster, regions);
134    assertEquals(expectedNumRS, cluster.getRegionServerThreads().size());
135
136    // Add a new regionserver
137    log("Adding a fourth RS");
138    RegionServerThread restarted = cluster.startRegionServer();
139    expectedNumRS++;
140    restarted.waitForServerOnline();
141    log("Additional RS is online");
142    log("Waiting for no more RIT");
143    TEST_UTIL.waitUntilNoRegionsInTransition(60000);
144    log("Verifying there are " + numRegions + " assigned on cluster");
145    assertRegionsAssigned(cluster, regions);
146    assertEquals(expectedNumRS, cluster.getRegionServerThreads().size());
147
148    // Master Restarts
149    List<MasterThread> masterThreads = cluster.getMasterThreads();
150    MasterThread activeMaster = null;
151    MasterThread backupMaster = null;
152    assertEquals(2, masterThreads.size());
153    if (masterThreads.get(0).getMaster().isActiveMaster()) {
154      activeMaster = masterThreads.get(0);
155      backupMaster = masterThreads.get(1);
156    } else {
157      activeMaster = masterThreads.get(1);
158      backupMaster = masterThreads.get(0);
159    }
160
161    // Bring down the backup master
162    log("Stopping backup master\n\n");
163    backupMaster.getMaster().stop("Stop of backup during rolling restart");
164    cluster.hbaseCluster.waitOnMaster(backupMaster);
165
166    // Bring down the primary master
167    log("Stopping primary master\n\n");
168    activeMaster.getMaster().stop("Stop of active during rolling restart");
169    cluster.hbaseCluster.waitOnMaster(activeMaster);
170
171    // Start primary master
172    log("Restarting primary master\n\n");
173    activeMaster = cluster.startMaster();
174    cluster.waitForActiveAndReadyMaster();
175
176    // Start backup master
177    log("Restarting backup master\n\n");
178    backupMaster = cluster.startMaster();
179
180    assertEquals(expectedNumRS, cluster.getRegionServerThreads().size());
181
182    // RegionServer Restarts
183
184    // Bring them down, one at a time, waiting between each to complete
185    List<RegionServerThread> regionServers =
186      cluster.getLiveRegionServerThreads();
187    int num = 1;
188    int total = regionServers.size();
189    for (RegionServerThread rst : regionServers) {
190      ServerName serverName = rst.getRegionServer().getServerName();
191      log("Stopping region server " + num + " of " + total + " [ " +
192          serverName + "]");
193      rst.getRegionServer().stop("Stopping RS during rolling restart");
194      cluster.hbaseCluster.waitOnRegionServer(rst);
195      log("Waiting for RS shutdown to be handled by master");
196      waitForRSShutdownToStartAndFinish(activeMaster, serverName);
197      log("RS shutdown done, waiting for no more RIT");
198      TEST_UTIL.waitUntilNoRegionsInTransition(60000);
199      log("Verifying there are " + numRegions + " assigned on cluster");
200      assertRegionsAssigned(cluster, regions);
201      expectedNumRS--;
202      assertEquals(expectedNumRS, cluster.getRegionServerThreads().size());
203      log("Restarting region server " + num + " of " + total);
204      restarted = cluster.startRegionServer();
205      restarted.waitForServerOnline();
206      expectedNumRS++;
207      log("Region server " + num + " is back online");
208      log("Waiting for no more RIT");
209      TEST_UTIL.waitUntilNoRegionsInTransition(60000);
210      log("Verifying there are " + numRegions + " assigned on cluster");
211      assertRegionsAssigned(cluster, regions);
212      assertEquals(expectedNumRS, cluster.getRegionServerThreads().size());
213      num++;
214    }
215    Thread.sleep(1000);
216    assertRegionsAssigned(cluster, regions);
217
218    // TODO: Bring random 3 of 4 RS down at the same time
219
220    ht.close();
221    // Stop the cluster
222    TEST_UTIL.shutdownMiniCluster();
223  }
224
225  /**
226   * Checks if the SCP of specific dead server has been executed.
227   * @return true if the SCP of specific serverName has been executed, false if not
228   */
229  private boolean isDeadServerSCPExecuted(ServerName serverName) throws IOException {
230    return TEST_UTIL.getMiniHBaseCluster().getMaster().getProcedures().stream()
231        .anyMatch(p -> p instanceof ServerCrashProcedure
232            && ((ServerCrashProcedure) p).getServerName().equals(serverName));
233  }
234
235  private void waitForRSShutdownToStartAndFinish(MasterThread activeMaster,
236      ServerName serverName) throws InterruptedException, IOException {
237    ServerManager sm = activeMaster.getMaster().getServerManager();
238    // First wait for it to be in dead list
239    while (!sm.getDeadServers().isDeadServer(serverName)) {
240      log("Waiting for [" + serverName + "] to be listed as dead in master");
241      Thread.sleep(1);
242    }
243    log("Server [" + serverName + "] marked as dead, waiting for it to " +
244        "finish dead processing");
245
246    TEST_UTIL.waitFor(60000, () -> isDeadServerSCPExecuted(serverName));
247
248    while (sm.areDeadServersInProgress()) {
249      log("Server [" + serverName + "] still being processed, waiting");
250      Thread.sleep(100);
251    }
252    log("Server [" + serverName + "] done with server shutdown processing");
253  }
254
255  private void log(String msg) {
256    LOG.debug("\n\nTRR: " + msg + "\n");
257  }
258
259  private int getNumberOfOnlineRegions(SingleProcessHBaseCluster cluster) {
260    int numFound = 0;
261    for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) {
262      numFound += rst.getRegionServer().getNumberOfOnlineRegions();
263    }
264    return numFound;
265  }
266
267  private void assertRegionsAssigned(SingleProcessHBaseCluster cluster,
268      Set<String> expectedRegions) throws IOException {
269    int numFound = getNumberOfOnlineRegions(cluster);
270    if (expectedRegions.size() > numFound) {
271      log("Expected to find " + expectedRegions.size() + " but only found"
272          + " " + numFound);
273      NavigableSet<String> foundRegions =
274        HBaseTestingUtil.getAllOnlineRegions(cluster);
275      for (String region : expectedRegions) {
276        if (!foundRegions.contains(region)) {
277          log("Missing region: " + region);
278        }
279      }
280      assertEquals(expectedRegions.size(), numFound);
281    } else if (expectedRegions.size() < numFound) {
282      int doubled = numFound - expectedRegions.size();
283      log("Expected to find " + expectedRegions.size() + " but found"
284          + " " + numFound + " (" + doubled + " double assignments?)");
285      NavigableSet<String> doubleRegions = getDoubleAssignedRegions(cluster);
286      for (String region : doubleRegions) {
287        log("Region is double assigned: " + region);
288      }
289      assertEquals(expectedRegions.size(), numFound);
290    } else {
291      log("Success!  Found expected number of " + numFound + " regions");
292    }
293  }
294
295  private NavigableSet<String> getDoubleAssignedRegions(
296      SingleProcessHBaseCluster cluster) throws IOException {
297    NavigableSet<String> online = new TreeSet<>();
298    NavigableSet<String> doubled = new TreeSet<>();
299    for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) {
300      for (RegionInfo region : ProtobufUtil.getOnlineRegions(
301          rst.getRegionServer().getRSRpcServices())) {
302        if(!online.add(region.getRegionNameAsString())) {
303          doubled.add(region.getRegionNameAsString());
304        }
305      }
306    }
307    return doubled;
308  }
309
310
311  @Parameterized.Parameters
312  public static Collection coordinatedByZK() {
313    return Arrays.asList(false, true);
314  }
315}
316