001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022
023import java.io.IOException;
024import java.io.StringWriter;
025import java.util.concurrent.ScheduledThreadPoolExecutor;
026import java.util.concurrent.TimeUnit;
027import org.apache.commons.lang3.StringUtils;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.hbase.HBaseClassTestRule;
030import org.apache.hadoop.hbase.HBaseTestingUtility;
031import org.apache.hadoop.hbase.HConstants;
032import org.apache.hadoop.hbase.LocalHBaseCluster;
033import org.apache.hadoop.hbase.MiniHBaseCluster.MiniHBaseClusterRegionServer;
034import org.apache.hadoop.hbase.ServerName;
035import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
036import org.apache.hadoop.hbase.master.HMaster;
037import org.apache.hadoop.hbase.master.LoadBalancer;
038import org.apache.hadoop.hbase.master.ServerManager;
039import org.apache.hadoop.hbase.testclassification.LargeTests;
040import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
041import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
042import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
043import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
044import org.apache.hadoop.hbase.util.Threads;
045import org.apache.zookeeper.KeeperException;
046import org.junit.After;
047import org.junit.Before;
048import org.junit.ClassRule;
049import org.junit.Test;
050import org.junit.experimental.categories.Category;
051import org.slf4j.Logger;
052import org.slf4j.LoggerFactory;
053
054import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
055
056@Category(LargeTests.class)
057public class TestRegionServerReportForDuty {
058
059  @ClassRule
060  public static final HBaseClassTestRule CLASS_RULE =
061    HBaseClassTestRule.forClass(TestRegionServerReportForDuty.class);
062
063  private static final Logger LOG = LoggerFactory.getLogger(TestRegionServerReportForDuty.class);
064
065  private static final long SLEEP_INTERVAL = 500;
066
067  private HBaseTestingUtility testUtil;
068  private LocalHBaseCluster cluster;
069  private RegionServerThread rs;
070  private RegionServerThread rs2;
071  private MasterThread master;
072  private MasterThread backupMaster;
073
074  @Before
075  public void setUp() throws Exception {
076    testUtil = new HBaseTestingUtility();
077    testUtil.startMiniDFSCluster(1);
078    testUtil.startMiniZKCluster(1);
079    testUtil.createRootDir();
080    cluster = new LocalHBaseCluster(testUtil.getConfiguration(), 0, 0);
081  }
082
083  @After
084  public void tearDown() throws Exception {
085    cluster.shutdown();
086    cluster.join();
087    testUtil.shutdownMiniZKCluster();
088    testUtil.shutdownMiniDFSCluster();
089  }
090
091  private static class LogCapturer {
092    private StringWriter sw = new StringWriter();
093    private org.apache.logging.log4j.core.appender.WriterAppender appender;
094    private org.apache.logging.log4j.core.Logger logger;
095
096    LogCapturer(org.apache.logging.log4j.core.Logger logger) {
097      this.logger = logger;
098      this.appender = org.apache.logging.log4j.core.appender.WriterAppender.newBuilder()
099        .setName("test").setTarget(sw).build();
100      this.logger.addAppender(this.appender);
101    }
102
103    String getOutput() {
104      return sw.toString();
105    }
106
107    public void stopCapturing() {
108      this.logger.removeAppender(this.appender);
109    }
110  }
111
112  /**
113   * This test HMaster class will always throw ServerNotRunningYetException if checked.
114   */
115  public static class NeverInitializedMaster extends HMaster {
116    public NeverInitializedMaster(Configuration conf) throws IOException {
117      super(conf);
118    }
119
120    @Override
121    protected void checkServiceStarted() throws ServerNotRunningYetException {
122      throw new ServerNotRunningYetException("Server is not running yet");
123    }
124  }
125
126  /**
127   * Tests region server should backoff to report for duty if master is not ready.
128   */
129  @Test
130  public void testReportForDutyBackoff() throws IOException, InterruptedException {
131    cluster.getConfiguration().set(HConstants.MASTER_IMPL, NeverInitializedMaster.class.getName());
132    master = cluster.addMaster();
133    master.start();
134
135    LogCapturer capturer =
136      new LogCapturer((org.apache.logging.log4j.core.Logger) org.apache.logging.log4j.LogManager
137        .getLogger(HRegionServer.class));
138    // Set sleep interval relatively low so that exponential backoff is more demanding.
139    int msginterval = 100;
140    cluster.getConfiguration().setInt("hbase.regionserver.msginterval", msginterval);
141    rs = cluster.addRegionServer();
142    rs.start();
143
144    int interval = 10_000;
145    Thread.sleep(interval);
146    capturer.stopCapturing();
147    String output = capturer.getOutput();
148    LOG.info("{}", output);
149    String failMsg = "reportForDuty failed;";
150    int count = StringUtils.countMatches(output, failMsg);
151
152    // Following asserts the actual retry number is in range (expectedRetry/2, expectedRetry*2).
153    // Ideally we can assert the exact retry count. We relax here to tolerate contention error.
154    int expectedRetry = (int) Math.ceil(Math.log(interval - msginterval));
155    assertTrue(String.format("reportForDuty retries %d times, less than expected min %d", count,
156      expectedRetry / 2), count > expectedRetry / 2);
157    assertTrue(String.format("reportForDuty retries %d times, more than expected max %d", count,
158      expectedRetry * 2), count < expectedRetry * 2);
159  }
160
161  /**
162   * Tests region sever reportForDuty with backup master becomes primary master after the first
163   * master goes away.
164   */
165  @Test
166  public void testReportForDutyWithMasterChange() throws Exception {
167
168    // Start a master and wait for it to become the active/primary master.
169    // Use a random unique port
170    cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtility.randomFreePort());
171    // master has a rs. defaultMinToStart = 2
172    boolean tablesOnMaster = LoadBalancer.isTablesOnMaster(testUtil.getConfiguration());
173    cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART,
174      tablesOnMaster ? 2 : 1);
175    cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART,
176      tablesOnMaster ? 2 : 1);
177    master = cluster.addMaster();
178    rs = cluster.addRegionServer();
179    LOG.debug("Starting master: " + master.getMaster().getServerName());
180    master.start();
181    rs.start();
182
183    waitForClusterOnline(master);
184
185    // Add a 2nd region server
186    cluster.getConfiguration().set(HConstants.REGION_SERVER_IMPL, MyRegionServer.class.getName());
187    rs2 = cluster.addRegionServer();
188    // Start the region server. This region server will refresh RPC connection
189    // from the current active master to the next active master before completing
190    // reportForDuty
191    LOG.debug("Starting 2nd region server: " + rs2.getRegionServer().getServerName());
192    rs2.start();
193
194    waitForSecondRsStarted();
195
196    // Stop the current master.
197    master.getMaster().stop("Stopping master");
198
199    // Start a new master and use another random unique port
200    // Also let it wait for exactly 2 region severs to report in.
201    // TODO: Add handling bindexception. Random port is not enough!!! Flakie test!
202    cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtility.randomFreePort());
203    cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART,
204      tablesOnMaster ? 3 : 2);
205    cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART,
206      tablesOnMaster ? 3 : 2);
207    backupMaster = cluster.addMaster();
208    LOG.debug("Starting new master: " + backupMaster.getMaster().getServerName());
209    backupMaster.start();
210
211    waitForClusterOnline(backupMaster);
212
213    // Do some checking/asserts here.
214    assertTrue(backupMaster.getMaster().isActiveMaster());
215    assertTrue(backupMaster.getMaster().isInitialized());
216    assertEquals(backupMaster.getMaster().getServerManager().getOnlineServersList().size(),
217      tablesOnMaster ? 3 : 2);
218
219  }
220
221  /**
222   * Tests region sever reportForDuty with RS RPC retry
223   */
224  @Test
225  public void testReportForDutyWithRSRpcRetry() throws Exception {
226    ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1,
227      new ThreadFactoryBuilder().setNameFormat("RSDelayedStart-pool-%d").setDaemon(true)
228        .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
229
230    // Start a master and wait for it to become the active/primary master.
231    // Use a random unique port
232    cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtility.randomFreePort());
233    // Override the default RS RPC retry interval of 100ms to 300ms
234    cluster.getConfiguration().setLong("hbase.regionserver.rpc.retry.interval", 300);
235    // master has a rs. defaultMinToStart = 2
236    boolean tablesOnMaster = LoadBalancer.isTablesOnMaster(testUtil.getConfiguration());
237    cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART,
238      tablesOnMaster ? 2 : 1);
239    cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART,
240      tablesOnMaster ? 2 : 1);
241    master = cluster.addMaster();
242    rs = cluster.addRegionServer();
243    LOG.debug("Starting master: " + master.getMaster().getServerName());
244    master.start();
245    // Delay the RS start so that the meta assignment fails in first attempt and goes to retry block
246    scheduledThreadPoolExecutor.schedule(new Runnable() {
247      @Override
248      public void run() {
249        rs.start();
250      }
251    }, 1000, TimeUnit.MILLISECONDS);
252
253    waitForClusterOnline(master);
254  }
255
256  /**
257   * Tests region sever reportForDuty with a non-default environment edge
258   */
259  @Test
260  public void testReportForDutyWithEnvironmentEdge() throws Exception {
261    // Start a master and wait for it to become the active/primary master.
262    // Use a random unique port
263    cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtility.randomFreePort());
264    // Set the dispatch and retry delay to 0 since we want the rpc request to be sent immediately
265    cluster.getConfiguration().setInt("hbase.procedure.remote.dispatcher.delay.msec", 0);
266    cluster.getConfiguration().setLong("hbase.regionserver.rpc.retry.interval", 0);
267
268    // master has a rs. defaultMinToStart = 2
269    boolean tablesOnMaster = LoadBalancer.isTablesOnMaster(testUtil.getConfiguration());
270    cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART,
271      tablesOnMaster ? 2 : 1);
272    cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART,
273      tablesOnMaster ? 2 : 1);
274
275    // Inject non-default environment edge
276    IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge();
277    EnvironmentEdgeManager.injectEdge(edge);
278    master = cluster.addMaster();
279    rs = cluster.addRegionServer();
280    LOG.debug("Starting master: " + master.getMaster().getServerName());
281    master.start();
282    rs.start();
283    waitForClusterOnline(master);
284  }
285
286  private void waitForClusterOnline(MasterThread master) throws InterruptedException {
287    while (true) {
288      if (master.getMaster().isInitialized()) {
289        break;
290      }
291      Thread.sleep(SLEEP_INTERVAL);
292      LOG.debug("Waiting for master to come online ...");
293    }
294    rs.waitForServerOnline();
295  }
296
297  private void waitForSecondRsStarted() throws InterruptedException {
298    while (true) {
299      if (((MyRegionServer) rs2.getRegionServer()).getRpcStubCreatedFlag() == true) {
300        break;
301      }
302      Thread.sleep(SLEEP_INTERVAL);
303      LOG.debug("Waiting 2nd RS to be started ...");
304    }
305  }
306
307  // Create a Region Server that provide a hook so that we can wait for the master switch over
308  // before continuing reportForDuty to the mater.
309  // The idea is that we get a RPC connection to the first active master, then we wait.
310  // The first master goes down, the second master becomes the active master. The region
311  // server continues reportForDuty. It should succeed with the new master.
312  public static class MyRegionServer extends MiniHBaseClusterRegionServer {
313
314    private ServerName sn;
315    // This flag is to make sure this rs has obtained the rpcStub to the first master.
316    // The first master will go down after this.
317    private boolean rpcStubCreatedFlag = false;
318    private boolean masterChanged = false;
319
320    public MyRegionServer(Configuration conf)
321      throws IOException, KeeperException, InterruptedException {
322      super(conf);
323    }
324
325    @Override
326    protected synchronized ServerName createRegionServerStatusStub(boolean refresh) {
327      sn = super.createRegionServerStatusStub(refresh);
328      rpcStubCreatedFlag = true;
329
330      // Wait for master switch over. Only do this for the second region server.
331      while (!masterChanged) {
332        ServerName newSn = super.getMasterAddressTracker().getMasterAddress(true);
333        if (newSn != null && !newSn.equals(sn)) {
334          masterChanged = true;
335          break;
336        }
337        try {
338          Thread.sleep(SLEEP_INTERVAL);
339        } catch (InterruptedException e) {
340          return null;
341        }
342        LOG.debug("Waiting for master switch over ... ");
343      }
344      return sn;
345    }
346
347    public boolean getRpcStubCreatedFlag() {
348      return rpcStubCreatedFlag;
349    }
350  }
351}