001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertNotEquals;
022import static org.junit.Assert.assertTrue;
023
024import java.io.IOException;
025import java.io.UncheckedIOException;
026import java.util.List;
027import java.util.Map;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.hbase.HBaseClassTestRule;
030import org.apache.hadoop.hbase.HConstants;
031import org.apache.hadoop.hbase.MiniHBaseCluster;
032import org.apache.hadoop.hbase.ServerName;
033import org.apache.hadoop.hbase.StartMiniClusterOption;
034import org.apache.hadoop.hbase.TableName;
035import org.apache.hadoop.hbase.client.RegionInfo;
036import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
037import org.apache.hadoop.hbase.testclassification.MasterTests;
038import org.apache.hadoop.hbase.testclassification.MediumTests;
039import org.apache.hadoop.hbase.util.JVMClusterUtil;
040import org.junit.ClassRule;
041import org.junit.Test;
042import org.junit.experimental.categories.Category;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046@Category({ MasterTests.class, MediumTests.class })
047public class TestRetainAssignmentOnRestart extends AbstractTestRestartCluster {
048
049  @ClassRule
050  public static final HBaseClassTestRule CLASS_RULE =
051    HBaseClassTestRule.forClass(TestRetainAssignmentOnRestart.class);
052
053  private static final Logger LOG = LoggerFactory.getLogger(TestRetainAssignmentOnRestart.class);
054
055  private static int NUM_OF_RS = 3;
056
057  public static final class HMasterForTest extends HMaster {
058
059    public HMasterForTest(Configuration conf) throws IOException {
060      super(conf);
061    }
062
063    @Override
064    protected void startProcedureExecutor() throws IOException {
065      // only start procedure executor when we have all the regionservers ready to take regions
066      new Thread(() -> {
067        for (;;) {
068          if (getServerManager().createDestinationServersList().size() == NUM_OF_RS) {
069            try {
070              HMasterForTest.super.startProcedureExecutor();
071            } catch (IOException e) {
072              throw new UncheckedIOException(e);
073            }
074            break;
075          }
076          try {
077            Thread.sleep(1000);
078          } catch (InterruptedException e) {
079          }
080        }
081      }).start();
082    }
083  }
084
085  @Override
086  protected boolean splitWALCoordinatedByZk() {
087    return true;
088  }
089
090  /**
091   * This tests retaining assignments on a cluster restart
092   */
093  @Test
094  public void testRetainAssignmentOnClusterRestart() throws Exception {
095    setupCluster();
096    HMaster master = UTIL.getMiniHBaseCluster().getMaster();
097    MiniHBaseCluster cluster = UTIL.getHBaseCluster();
098    List<JVMClusterUtil.RegionServerThread> threads = cluster.getLiveRegionServerThreads();
099    assertEquals(NUM_OF_RS, threads.size());
100    int[] rsPorts = new int[NUM_OF_RS];
101    for (int i = 0; i < NUM_OF_RS; i++) {
102      rsPorts[i] = threads.get(i).getRegionServer().getServerName().getPort();
103    }
104
105    // We don't have to use SnapshotOfRegionAssignmentFromMeta. We use it here because AM used to
106    // use it to load all user region placements
107    SnapshotOfRegionAssignmentFromMeta snapshot =
108      new SnapshotOfRegionAssignmentFromMeta(master.getConnection());
109    snapshot.initialize();
110    Map<RegionInfo, ServerName> regionToRegionServerMap = snapshot.getRegionToRegionServerMap();
111    for (ServerName serverName : regionToRegionServerMap.values()) {
112      boolean found = false; // Test only, no need to optimize
113      for (int k = 0; k < NUM_OF_RS && !found; k++) {
114        found = serverName.getPort() == rsPorts[k];
115      }
116      assertTrue(found);
117    }
118
119    LOG.info("\n\nShutting down HBase cluster");
120    cluster.stopMaster(0);
121    cluster.shutdown();
122    cluster.waitUntilShutDown();
123
124    LOG.info("\n\nSleeping a bit");
125    Thread.sleep(2000);
126
127    LOG.info("\n\nStarting cluster the second time with the same ports");
128    cluster.getConf().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 3);
129    master = cluster.startMaster().getMaster();
130    for (int i = 0; i < NUM_OF_RS; i++) {
131      cluster.getConf().setInt(HConstants.REGIONSERVER_PORT, rsPorts[i]);
132      cluster.startRegionServer();
133    }
134
135    ensureServersWithSamePort(master, rsPorts);
136
137    // Wait till master is initialized and all regions are assigned
138    for (TableName TABLE : TABLES) {
139      UTIL.waitTableAvailable(TABLE);
140    }
141    UTIL.waitUntilNoRegionsInTransition(60000);
142
143    snapshot = new SnapshotOfRegionAssignmentFromMeta(master.getConnection());
144    snapshot.initialize();
145    Map<RegionInfo, ServerName> newRegionToRegionServerMap = snapshot.getRegionToRegionServerMap();
146    assertEquals(regionToRegionServerMap.size(), newRegionToRegionServerMap.size());
147    for (Map.Entry<RegionInfo, ServerName> entry : newRegionToRegionServerMap.entrySet()) {
148      ServerName oldServer = regionToRegionServerMap.get(entry.getKey());
149      ServerName currentServer = entry.getValue();
150      LOG.info(
151        "Key=" + entry.getKey() + " oldServer=" + oldServer + ", currentServer=" + currentServer);
152      assertEquals(entry.getKey().toString(), oldServer.getAddress(), currentServer.getAddress());
153      assertNotEquals(oldServer.getStartcode(), currentServer.getStartcode());
154    }
155  }
156
157  /**
158   * This tests retaining assignments on a single node restart
159   */
160  @Test
161  public void testRetainAssignmentOnSingleRSRestart() throws Exception {
162    setupCluster();
163    HMaster master = UTIL.getMiniHBaseCluster().getMaster();
164    MiniHBaseCluster cluster = UTIL.getHBaseCluster();
165    List<JVMClusterUtil.RegionServerThread> threads = cluster.getLiveRegionServerThreads();
166    assertEquals(NUM_OF_RS, threads.size());
167    int[] rsPorts = new int[NUM_OF_RS];
168    for (int i = 0; i < NUM_OF_RS; i++) {
169      rsPorts[i] = threads.get(i).getRegionServer().getServerName().getPort();
170    }
171
172    // We don't have to use SnapshotOfRegionAssignmentFromMeta. We use it here because AM used to
173    // use it to load all user region placements
174    SnapshotOfRegionAssignmentFromMeta snapshot =
175      new SnapshotOfRegionAssignmentFromMeta(master.getConnection());
176    snapshot.initialize();
177    Map<RegionInfo, ServerName> regionToRegionServerMap = snapshot.getRegionToRegionServerMap();
178    for (ServerName serverName : regionToRegionServerMap.values()) {
179      boolean found = false; // Test only, no need to optimize
180      for (int k = 0; k < NUM_OF_RS && !found; k++) {
181        found = serverName.getPort() == rsPorts[k];
182      }
183      assertTrue(found);
184    }
185
186    // Server to be restarted
187    ServerName deadRS = threads.get(0).getRegionServer().getServerName();
188    LOG.info("\n\nStopping HMaster and {} server", deadRS);
189    // Stopping master first so that region server SCP will not be initiated
190    cluster.stopMaster(0);
191    cluster.waitForMasterToStop(master.getServerName(), 5000);
192    cluster.stopRegionServer(deadRS);
193
194    LOG.info("\n\nSleeping a bit");
195    Thread.sleep(2000);
196
197    LOG.info("\n\nStarting HMaster and region server {} second time with the same port", deadRS);
198    cluster.getConf().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 3);
199    master = cluster.startMaster().getMaster();
200    cluster.getConf().setInt(HConstants.REGIONSERVER_PORT, deadRS.getPort());
201    cluster.startRegionServer();
202
203    ensureServersWithSamePort(master, rsPorts);
204
205    // Wait till master is initialized and all regions are assigned
206    for (TableName TABLE : TABLES) {
207      UTIL.waitTableAvailable(TABLE);
208    }
209    UTIL.waitUntilNoRegionsInTransition(60000);
210
211    snapshot = new SnapshotOfRegionAssignmentFromMeta(master.getConnection());
212    snapshot.initialize();
213    Map<RegionInfo, ServerName> newRegionToRegionServerMap = snapshot.getRegionToRegionServerMap();
214    assertEquals(regionToRegionServerMap.size(), newRegionToRegionServerMap.size());
215    for (Map.Entry<RegionInfo, ServerName> entry : newRegionToRegionServerMap.entrySet()) {
216      ServerName oldServer = regionToRegionServerMap.get(entry.getKey());
217      ServerName currentServer = entry.getValue();
218      LOG.info(
219        "Key=" + entry.getKey() + " oldServer=" + oldServer + ", currentServer=" + currentServer);
220      assertEquals(entry.getKey().toString(), oldServer.getAddress(), currentServer.getAddress());
221
222      if (deadRS.getPort() == oldServer.getPort()) {
223        // Restarted RS start code wont be same
224        assertNotEquals(oldServer.getStartcode(), currentServer.getStartcode());
225      } else {
226        assertEquals(oldServer.getStartcode(), currentServer.getStartcode());
227      }
228    }
229  }
230
231  private void setupCluster() throws Exception, IOException, InterruptedException {
232    // Set Zookeeper based connection registry since we will stop master and start a new master
233    // without populating the underlying config for the connection.
234    UTIL.getConfiguration().set(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
235      HConstants.ZK_CONNECTION_REGISTRY_CLASS);
236    // Enable retain assignment during ServerCrashProcedure
237    UTIL.getConfiguration().setBoolean(ServerCrashProcedure.MASTER_SCP_RETAIN_ASSIGNMENT, true);
238    UTIL.startMiniCluster(StartMiniClusterOption.builder().masterClass(HMasterForTest.class)
239      .numRegionServers(NUM_OF_RS).build());
240
241    // Turn off balancer
242    UTIL.getMiniHBaseCluster().getMaster().getMasterRpcServices().synchronousBalanceSwitch(false);
243
244    LOG.info("\n\nCreating tables");
245    for (TableName TABLE : TABLES) {
246      UTIL.createTable(TABLE, FAMILY);
247    }
248    for (TableName TABLE : TABLES) {
249      UTIL.waitTableEnabled(TABLE);
250    }
251
252    UTIL.getMiniHBaseCluster().getMaster();
253    UTIL.waitUntilNoRegionsInTransition(60000);
254  }
255
256  private void ensureServersWithSamePort(HMaster master, int[] rsPorts) {
257    // Make sure live regionservers are on the same host/port
258    List<ServerName> localServers = master.getServerManager().getOnlineServersList();
259    assertEquals(NUM_OF_RS, localServers.size());
260    for (int i = 0; i < NUM_OF_RS; i++) {
261      boolean found = false;
262      for (ServerName serverName : localServers) {
263        if (serverName.getPort() == rsPorts[i]) {
264          found = true;
265          break;
266        }
267      }
268      assertTrue(found);
269    }
270  }
271}