001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022
023import java.io.IOException;
024import java.io.StringWriter;
025import java.util.concurrent.ScheduledThreadPoolExecutor;
026import java.util.concurrent.TimeUnit;
027import org.apache.commons.lang3.StringUtils;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.hbase.HBaseClassTestRule;
030import org.apache.hadoop.hbase.HBaseTestingUtility;
031import org.apache.hadoop.hbase.HConstants;
032import org.apache.hadoop.hbase.LocalHBaseCluster;
033import org.apache.hadoop.hbase.MiniHBaseCluster.MiniHBaseClusterRegionServer;
034import org.apache.hadoop.hbase.ServerName;
035import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
036import org.apache.hadoop.hbase.master.HMaster;
037import org.apache.hadoop.hbase.master.LoadBalancer;
038import org.apache.hadoop.hbase.master.ServerManager;
039import org.apache.hadoop.hbase.testclassification.LargeTests;
040import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
041import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
042import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
043import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
044import org.apache.hadoop.hbase.util.Threads;
045import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
046import org.apache.log4j.Appender;
047import org.apache.log4j.Layout;
048import org.apache.log4j.PatternLayout;
049import org.apache.log4j.WriterAppender;
050import org.apache.zookeeper.KeeperException;
051import org.junit.After;
052import org.junit.Before;
053import org.junit.ClassRule;
054import org.junit.Test;
055import org.junit.experimental.categories.Category;
056import org.slf4j.Logger;
057import org.slf4j.LoggerFactory;
058
059@Category(LargeTests.class)
060public class TestRegionServerReportForDuty {
061
062  @ClassRule
063  public static final HBaseClassTestRule CLASS_RULE =
064      HBaseClassTestRule.forClass(TestRegionServerReportForDuty.class);
065
066  private static final Logger LOG = LoggerFactory.getLogger(TestRegionServerReportForDuty.class);
067
068  private static final long SLEEP_INTERVAL = 500;
069
070  private HBaseTestingUtility testUtil;
071  private LocalHBaseCluster cluster;
072  private RegionServerThread rs;
073  private RegionServerThread rs2;
074  private MasterThread master;
075  private MasterThread backupMaster;
076
077  @Before
078  public void setUp() throws Exception {
079    testUtil = new HBaseTestingUtility();
080    testUtil.startMiniDFSCluster(1);
081    testUtil.startMiniZKCluster(1);
082    testUtil.createRootDir();
083    cluster = new LocalHBaseCluster(testUtil.getConfiguration(), 0, 0);
084  }
085
086  @After
087  public void tearDown() throws Exception {
088    cluster.shutdown();
089    cluster.join();
090    testUtil.shutdownMiniZKCluster();
091    testUtil.shutdownMiniDFSCluster();
092  }
093
094  /**
095   * LogCapturer is similar to {@link org.apache.hadoop.test.GenericTestUtils.LogCapturer}
096   * except that this implementation has a default appender to the root logger.
097   * Hadoop 2.8+ supports the default appender in the LogCapture it ships and this can be replaced.
098   * TODO: This class can be removed after we upgrade Hadoop dependency.
099   */
100  static class LogCapturer {
101    private StringWriter sw = new StringWriter();
102    private WriterAppender appender;
103    private org.apache.log4j.Logger logger;
104
105    LogCapturer(org.apache.log4j.Logger logger) {
106      this.logger = logger;
107      Appender defaultAppender = org.apache.log4j.Logger.getRootLogger().getAppender("stdout");
108      if (defaultAppender == null) {
109        defaultAppender = org.apache.log4j.Logger.getRootLogger().getAppender("console");
110      }
111      final Layout layout = (defaultAppender == null) ? new PatternLayout() :
112          defaultAppender.getLayout();
113      this.appender = new WriterAppender(layout, sw);
114      this.logger.addAppender(this.appender);
115    }
116
117    String getOutput() {
118      return sw.toString();
119    }
120
121    public void stopCapturing() {
122      this.logger.removeAppender(this.appender);
123    }
124  }
125
126  /**
127   * This test HMaster class will always throw ServerNotRunningYetException if checked.
128   */
129  public static class NeverInitializedMaster extends HMaster {
130    public NeverInitializedMaster(Configuration conf) throws IOException {
131      super(conf);
132    }
133
134    @Override
135    protected void checkServiceStarted() throws ServerNotRunningYetException {
136      throw new ServerNotRunningYetException("Server is not running yet");
137    }
138  }
139
140  /**
141   * Tests region server should backoff to report for duty if master is not ready.
142   */
143  @Test
144  public void testReportForDutyBackoff() throws IOException, InterruptedException {
145    cluster.getConfiguration().set(HConstants.MASTER_IMPL, NeverInitializedMaster.class.getName());
146    master = cluster.addMaster();
147    master.start();
148
149    LogCapturer capturer = new LogCapturer(org.apache.log4j.Logger.getLogger(HRegionServer.class));
150    // Set sleep interval relatively low so that exponential backoff is more demanding.
151    int msginterval = 100;
152    cluster.getConfiguration().setInt("hbase.regionserver.msginterval", msginterval);
153    rs = cluster.addRegionServer();
154    rs.start();
155
156    int interval = 10_000;
157    Thread.sleep(interval);
158    capturer.stopCapturing();
159    String output = capturer.getOutput();
160    LOG.info("{}", output);
161    String failMsg = "reportForDuty failed;";
162    int count = StringUtils.countMatches(output, failMsg);
163
164    // Following asserts the actual retry number is in range (expectedRetry/2, expectedRetry*2).
165    // Ideally we can assert the exact retry count. We relax here to tolerate contention error.
166    int expectedRetry = (int)Math.ceil(Math.log(interval - msginterval));
167    assertTrue(String.format("reportForDuty retries %d times, less than expected min %d",
168        count, expectedRetry / 2), count > expectedRetry / 2);
169    assertTrue(String.format("reportForDuty retries %d times, more than expected max %d",
170        count, expectedRetry * 2), count < expectedRetry * 2);
171  }
172
173  /**
174   * Tests region sever reportForDuty with backup master becomes primary master after
175   * the first master goes away.
176   */
177  @Test
178  public void testReportForDutyWithMasterChange() throws Exception {
179
180    // Start a master and wait for it to become the active/primary master.
181    // Use a random unique port
182    cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtility.randomFreePort());
183    // master has a rs. defaultMinToStart = 2
184    boolean tablesOnMaster = LoadBalancer.isTablesOnMaster(testUtil.getConfiguration());
185    cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, tablesOnMaster? 2: 1);
186    cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, tablesOnMaster? 2: 1);
187    master = cluster.addMaster();
188    rs = cluster.addRegionServer();
189    LOG.debug("Starting master: " + master.getMaster().getServerName());
190    master.start();
191    rs.start();
192
193    waitForClusterOnline(master);
194
195    // Add a 2nd region server
196    cluster.getConfiguration().set(HConstants.REGION_SERVER_IMPL, MyRegionServer.class.getName());
197    rs2 = cluster.addRegionServer();
198    // Start the region server. This region server will refresh RPC connection
199    // from the current active master to the next active master before completing
200    // reportForDuty
201    LOG.debug("Starting 2nd region server: " + rs2.getRegionServer().getServerName());
202    rs2.start();
203
204    waitForSecondRsStarted();
205
206    // Stop the current master.
207    master.getMaster().stop("Stopping master");
208
209    // Start a new master and use another random unique port
210    // Also let it wait for exactly 2 region severs to report in.
211    // TODO: Add handling bindexception. Random port is not enough!!! Flakie test!
212    cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtility.randomFreePort());
213    cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART,
214      tablesOnMaster? 3: 2);
215    cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART,
216      tablesOnMaster? 3: 2);
217    backupMaster = cluster.addMaster();
218    LOG.debug("Starting new master: " + backupMaster.getMaster().getServerName());
219    backupMaster.start();
220
221    waitForClusterOnline(backupMaster);
222
223    // Do some checking/asserts here.
224    assertTrue(backupMaster.getMaster().isActiveMaster());
225    assertTrue(backupMaster.getMaster().isInitialized());
226    assertEquals(backupMaster.getMaster().getServerManager().getOnlineServersList().size(),
227      tablesOnMaster? 3: 2);
228
229  }
230  
231  /**
232   * Tests region sever reportForDuty with RS RPC retry
233   */
234  @Test
235  public void testReportForDutyWithRSRpcRetry() throws Exception {
236    ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1,
237      new ThreadFactoryBuilder().setNameFormat("RSDelayedStart-pool-%d").setDaemon(true)
238        .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
239
240    // Start a master and wait for it to become the active/primary master.
241    // Use a random unique port
242    cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtility.randomFreePort());
243    // Override the default RS RPC retry interval of 100ms to 300ms
244    cluster.getConfiguration().setLong("hbase.regionserver.rpc.retry.interval", 300);
245    // master has a rs. defaultMinToStart = 2
246    boolean tablesOnMaster = LoadBalancer.isTablesOnMaster(testUtil.getConfiguration());
247    cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART,
248      tablesOnMaster ? 2 : 1);
249    cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART,
250      tablesOnMaster ? 2 : 1);
251    master = cluster.addMaster();
252    rs = cluster.addRegionServer();
253    LOG.debug("Starting master: " + master.getMaster().getServerName());
254    master.start();
255    // Delay the RS start so that the meta assignment fails in first attempt and goes to retry block
256    scheduledThreadPoolExecutor.schedule(new Runnable() {
257      @Override
258      public void run() {
259        rs.start();
260      }
261    }, 1000, TimeUnit.MILLISECONDS);
262
263    waitForClusterOnline(master);
264  }
265
266  /**
267   * Tests region sever reportForDuty with manual environment edge
268   */
269  @Test
270  public void testReportForDutyWithEnvironmentEdge() throws Exception {
271    // Start a master and wait for it to become the active/primary master.
272    // Use a random unique port
273    cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtility.randomFreePort());
274    // Set the dispatch and retry delay to 0 since we want the rpc request to be sent immediately
275    cluster.getConfiguration().setInt("hbase.procedure.remote.dispatcher.delay.msec", 0);
276    cluster.getConfiguration().setLong("hbase.regionserver.rpc.retry.interval", 0);
277
278    // master has a rs. defaultMinToStart = 2
279    boolean tablesOnMaster = LoadBalancer.isTablesOnMaster(testUtil.getConfiguration());
280    cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART,
281      tablesOnMaster ? 2 : 1);
282    cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART,
283      tablesOnMaster ? 2 : 1);
284
285    // Inject manual environment edge for clock skew computation between RS and master
286    ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
287    EnvironmentEdgeManager.injectEdge(edge);
288    master = cluster.addMaster();
289    rs = cluster.addRegionServer();
290    LOG.debug("Starting master: " + master.getMaster().getServerName());
291    master.start();
292    rs.start();
293
294    waitForClusterOnline(master);
295  }
296
297  private void waitForClusterOnline(MasterThread master) throws InterruptedException {
298    while (true) {
299      if (master.getMaster().isInitialized()) {
300        break;
301      }
302      Thread.sleep(SLEEP_INTERVAL);
303      LOG.debug("Waiting for master to come online ...");
304    }
305    rs.waitForServerOnline();
306  }
307
308  private void waitForSecondRsStarted() throws InterruptedException {
309    while (true) {
310      if (((MyRegionServer) rs2.getRegionServer()).getRpcStubCreatedFlag() == true) {
311        break;
312      }
313      Thread.sleep(SLEEP_INTERVAL);
314      LOG.debug("Waiting 2nd RS to be started ...");
315    }
316  }
317
318  // Create a Region Server that provide a hook so that we can wait for the master switch over
319  // before continuing reportForDuty to the mater.
320  // The idea is that we get a RPC connection to the first active master, then we wait.
321  // The first master goes down, the second master becomes the active master. The region
322  // server continues reportForDuty. It should succeed with the new master.
323  public static class MyRegionServer extends MiniHBaseClusterRegionServer {
324
325    private ServerName sn;
326    // This flag is to make sure this rs has obtained the rpcStub to the first master.
327    // The first master will go down after this.
328    private boolean rpcStubCreatedFlag = false;
329    private boolean masterChanged = false;
330
331    public MyRegionServer(Configuration conf) throws IOException, KeeperException,
332        InterruptedException {
333      super(conf);
334    }
335
336    @Override
337    protected synchronized ServerName createRegionServerStatusStub(boolean refresh) {
338      sn = super.createRegionServerStatusStub(refresh);
339      rpcStubCreatedFlag = true;
340
341      // Wait for master switch over. Only do this for the second region server.
342      while (!masterChanged) {
343        ServerName newSn = super.getMasterAddressTracker().getMasterAddress(true);
344        if (newSn != null && !newSn.equals(sn)) {
345          masterChanged = true;
346          break;
347        }
348        try {
349          Thread.sleep(SLEEP_INTERVAL);
350        } catch (InterruptedException e) {
351          return null;
352        }
353        LOG.debug("Waiting for master switch over ... ");
354      }
355      return sn;
356    }
357
358    public boolean getRpcStubCreatedFlag() {
359      return rpcStubCreatedFlag;
360    }
361  }
362}