001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master;
019
020import static org.junit.Assert.assertEquals;
021
022import java.io.IOException;
023import java.util.Arrays;
024import java.util.Collection;
025import java.util.List;
026import java.util.NavigableSet;
027import java.util.Set;
028import java.util.TreeSet;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.hbase.HBaseClassTestRule;
031import org.apache.hadoop.hbase.HBaseConfiguration;
032import org.apache.hadoop.hbase.HBaseTestingUtility;
033import org.apache.hadoop.hbase.HConstants;
034import org.apache.hadoop.hbase.MiniHBaseCluster;
035import org.apache.hadoop.hbase.ServerName;
036import org.apache.hadoop.hbase.StartMiniClusterOption;
037import org.apache.hadoop.hbase.TableName;
038import org.apache.hadoop.hbase.client.RegionInfo;
039import org.apache.hadoop.hbase.client.RegionLocator;
040import org.apache.hadoop.hbase.client.Table;
041import org.apache.hadoop.hbase.testclassification.LargeTests;
042import org.apache.hadoop.hbase.testclassification.MasterTests;
043import org.apache.hadoop.hbase.util.Bytes;
044import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
045import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
046import org.junit.ClassRule;
047import org.junit.Rule;
048import org.junit.Test;
049import org.junit.experimental.categories.Category;
050import org.junit.rules.TestName;
051import org.junit.runner.RunWith;
052import org.junit.runners.Parameterized;
053import org.slf4j.Logger;
054import org.slf4j.LoggerFactory;
055
056import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
057
058/**
059 * Tests the restarting of everything as done during rolling restarts.
060 */
061@RunWith(Parameterized.class)
062@Category({MasterTests.class, LargeTests.class})
063public class TestRollingRestart {
064
065  @ClassRule
066  public static final HBaseClassTestRule CLASS_RULE =
067      HBaseClassTestRule.forClass(TestRollingRestart.class);
068
069  private static final Logger LOG = LoggerFactory.getLogger(TestRollingRestart.class);
070
071  @Rule
072  public TestName name = new TestName();
073
074  @Parameterized.Parameter
075  public boolean splitWALCoordinatedByZK;
076
077  @Test
078  public void testBasicRollingRestart() throws Exception {
079
080    // Start a cluster with 2 masters and 4 regionservers
081    final int NUM_MASTERS = 2;
082    final int NUM_RS = 3;
083    final int NUM_REGIONS_TO_CREATE = 20;
084
085    int expectedNumRS = 3;
086
087    // Start the cluster
088    log("Starting cluster");
089    Configuration conf = HBaseConfiguration.create();
090    conf.setBoolean(HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK,
091        splitWALCoordinatedByZK);
092    HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(conf);
093    StartMiniClusterOption option = StartMiniClusterOption.builder()
094        .numMasters(NUM_MASTERS).numRegionServers(NUM_RS).numDataNodes(NUM_RS).build();
095    TEST_UTIL.startMiniCluster(option);
096    MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
097    log("Waiting for active/ready master");
098    cluster.waitForActiveAndReadyMaster();
099
100    // Create a table with regions
101    final TableName tableName =
102        TableName.valueOf(name.getMethodName().replaceAll("[\\[|\\]]", "-"));
103    byte [] family = Bytes.toBytes("family");
104    log("Creating table with " + NUM_REGIONS_TO_CREATE + " regions");
105    Table ht = TEST_UTIL.createMultiRegionTable(tableName, family, NUM_REGIONS_TO_CREATE);
106    int numRegions = -1;
107    try (RegionLocator r = TEST_UTIL.getConnection().getRegionLocator(tableName)) {
108      numRegions = r.getStartKeys().length;
109    }
110    numRegions += 1; // catalogs
111    log("Waiting for no more RIT\n");
112    TEST_UTIL.waitUntilNoRegionsInTransition(60000);
113    log("Disabling table\n");
114    TEST_UTIL.getAdmin().disableTable(tableName);
115    log("Waiting for no more RIT\n");
116    TEST_UTIL.waitUntilNoRegionsInTransition(60000);
117    NavigableSet<String> regions = HBaseTestingUtility.getAllOnlineRegions(cluster);
118    log("Verifying only catalog and namespace regions are assigned\n");
119    if (regions.size() != 2) {
120      for (String oregion : regions) log("Region still online: " + oregion);
121    }
122    assertEquals(2, regions.size());
123    log("Enabling table\n");
124    TEST_UTIL.getAdmin().enableTable(tableName);
125    log("Waiting for no more RIT\n");
126    TEST_UTIL.waitUntilNoRegionsInTransition(60000);
127    log("Verifying there are " + numRegions + " assigned on cluster\n");
128    regions = HBaseTestingUtility.getAllOnlineRegions(cluster);
129    assertRegionsAssigned(cluster, regions);
130    assertEquals(expectedNumRS, cluster.getRegionServerThreads().size());
131
132    // Add a new regionserver
133    log("Adding a fourth RS");
134    RegionServerThread restarted = cluster.startRegionServer();
135    expectedNumRS++;
136    restarted.waitForServerOnline();
137    log("Additional RS is online");
138    log("Waiting for no more RIT");
139    TEST_UTIL.waitUntilNoRegionsInTransition(60000);
140    log("Verifying there are " + numRegions + " assigned on cluster");
141    assertRegionsAssigned(cluster, regions);
142    assertEquals(expectedNumRS, cluster.getRegionServerThreads().size());
143
144    // Master Restarts
145    List<MasterThread> masterThreads = cluster.getMasterThreads();
146    MasterThread activeMaster = null;
147    MasterThread backupMaster = null;
148    assertEquals(2, masterThreads.size());
149    if (masterThreads.get(0).getMaster().isActiveMaster()) {
150      activeMaster = masterThreads.get(0);
151      backupMaster = masterThreads.get(1);
152    } else {
153      activeMaster = masterThreads.get(1);
154      backupMaster = masterThreads.get(0);
155    }
156
157    // Bring down the backup master
158    log("Stopping backup master\n\n");
159    backupMaster.getMaster().stop("Stop of backup during rolling restart");
160    cluster.hbaseCluster.waitOnMaster(backupMaster);
161
162    // Bring down the primary master
163    log("Stopping primary master\n\n");
164    activeMaster.getMaster().stop("Stop of active during rolling restart");
165    cluster.hbaseCluster.waitOnMaster(activeMaster);
166
167    // Start primary master
168    log("Restarting primary master\n\n");
169    activeMaster = cluster.startMaster();
170    cluster.waitForActiveAndReadyMaster();
171
172    // Start backup master
173    log("Restarting backup master\n\n");
174    backupMaster = cluster.startMaster();
175
176    assertEquals(expectedNumRS, cluster.getRegionServerThreads().size());
177
178    // RegionServer Restarts
179
180    // Bring them down, one at a time, waiting between each to complete
181    List<RegionServerThread> regionServers =
182      cluster.getLiveRegionServerThreads();
183    int num = 1;
184    int total = regionServers.size();
185    for (RegionServerThread rst : regionServers) {
186      ServerName serverName = rst.getRegionServer().getServerName();
187      log("Stopping region server " + num + " of " + total + " [ " +
188          serverName + "]");
189      rst.getRegionServer().stop("Stopping RS during rolling restart");
190      cluster.hbaseCluster.waitOnRegionServer(rst);
191      log("Waiting for RS shutdown to be handled by master");
192      waitForRSShutdownToStartAndFinish(activeMaster, serverName);
193      log("RS shutdown done, waiting for no more RIT");
194      TEST_UTIL.waitUntilNoRegionsInTransition(60000);
195      log("Verifying there are " + numRegions + " assigned on cluster");
196      assertRegionsAssigned(cluster, regions);
197      expectedNumRS--;
198      assertEquals(expectedNumRS, cluster.getRegionServerThreads().size());
199      log("Restarting region server " + num + " of " + total);
200      restarted = cluster.startRegionServer();
201      restarted.waitForServerOnline();
202      expectedNumRS++;
203      log("Region server " + num + " is back online");
204      log("Waiting for no more RIT");
205      TEST_UTIL.waitUntilNoRegionsInTransition(60000);
206      log("Verifying there are " + numRegions + " assigned on cluster");
207      assertRegionsAssigned(cluster, regions);
208      assertEquals(expectedNumRS, cluster.getRegionServerThreads().size());
209      num++;
210    }
211    Thread.sleep(1000);
212    assertRegionsAssigned(cluster, regions);
213
214    // TODO: Bring random 3 of 4 RS down at the same time
215
216    ht.close();
217    // Stop the cluster
218    TEST_UTIL.shutdownMiniCluster();
219  }
220
221  private void waitForRSShutdownToStartAndFinish(MasterThread activeMaster,
222      ServerName serverName) throws InterruptedException {
223    ServerManager sm = activeMaster.getMaster().getServerManager();
224    // First wait for it to be in dead list
225    while (!sm.getDeadServers().isDeadServer(serverName)) {
226      log("Waiting for [" + serverName + "] to be listed as dead in master");
227      Thread.sleep(1);
228    }
229    log("Server [" + serverName + "] marked as dead, waiting for it to " +
230        "finish dead processing");
231    while (sm.areDeadServersInProgress()) {
232      log("Server [" + serverName + "] still being processed, waiting");
233      Thread.sleep(100);
234    }
235    log("Server [" + serverName + "] done with server shutdown processing");
236  }
237
238  private void log(String msg) {
239    LOG.debug("\n\nTRR: " + msg + "\n");
240  }
241
242  private int getNumberOfOnlineRegions(MiniHBaseCluster cluster) {
243    int numFound = 0;
244    for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) {
245      numFound += rst.getRegionServer().getNumberOfOnlineRegions();
246    }
247    for (MasterThread mt : cluster.getMasterThreads()) {
248      numFound += mt.getMaster().getNumberOfOnlineRegions();
249    }
250    return numFound;
251  }
252
253  private void assertRegionsAssigned(MiniHBaseCluster cluster,
254      Set<String> expectedRegions) throws IOException {
255    int numFound = getNumberOfOnlineRegions(cluster);
256    if (expectedRegions.size() > numFound) {
257      log("Expected to find " + expectedRegions.size() + " but only found"
258          + " " + numFound);
259      NavigableSet<String> foundRegions =
260        HBaseTestingUtility.getAllOnlineRegions(cluster);
261      for (String region : expectedRegions) {
262        if (!foundRegions.contains(region)) {
263          log("Missing region: " + region);
264        }
265      }
266      assertEquals(expectedRegions.size(), numFound);
267    } else if (expectedRegions.size() < numFound) {
268      int doubled = numFound - expectedRegions.size();
269      log("Expected to find " + expectedRegions.size() + " but found"
270          + " " + numFound + " (" + doubled + " double assignments?)");
271      NavigableSet<String> doubleRegions = getDoubleAssignedRegions(cluster);
272      for (String region : doubleRegions) {
273        log("Region is double assigned: " + region);
274      }
275      assertEquals(expectedRegions.size(), numFound);
276    } else {
277      log("Success!  Found expected number of " + numFound + " regions");
278    }
279  }
280
281  private NavigableSet<String> getDoubleAssignedRegions(
282      MiniHBaseCluster cluster) throws IOException {
283    NavigableSet<String> online = new TreeSet<>();
284    NavigableSet<String> doubled = new TreeSet<>();
285    for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) {
286      for (RegionInfo region : ProtobufUtil.getOnlineRegions(
287          rst.getRegionServer().getRSRpcServices())) {
288        if(!online.add(region.getRegionNameAsString())) {
289          doubled.add(region.getRegionNameAsString());
290        }
291      }
292    }
293    return doubled;
294  }
295
296
297  @Parameterized.Parameters
298  public static Collection coordinatedByZK() {
299    return Arrays.asList(false, true);
300  }
301}
302