001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022
023import java.io.IOException;
024import java.io.StringWriter;
025import java.util.concurrent.ScheduledThreadPoolExecutor;
026import java.util.concurrent.TimeUnit;
027import org.apache.commons.lang3.StringUtils;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.hbase.HBaseClassTestRule;
030import org.apache.hadoop.hbase.HBaseTestingUtil;
031import org.apache.hadoop.hbase.HConstants;
032import org.apache.hadoop.hbase.LocalHBaseCluster;
033import org.apache.hadoop.hbase.ServerName;
034import org.apache.hadoop.hbase.SingleProcessHBaseCluster.MiniHBaseClusterRegionServer;
035import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
036import org.apache.hadoop.hbase.master.HMaster;
037import org.apache.hadoop.hbase.master.ServerManager;
038import org.apache.hadoop.hbase.testclassification.LargeTests;
039import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
040import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
041import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
042import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
043import org.apache.hadoop.hbase.util.Threads;
044import org.apache.zookeeper.KeeperException;
045import org.junit.After;
046import org.junit.Before;
047import org.junit.ClassRule;
048import org.junit.Test;
049import org.junit.experimental.categories.Category;
050import org.slf4j.Logger;
051import org.slf4j.LoggerFactory;
052
053import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
054
055@Category(LargeTests.class)
056public class TestRegionServerReportForDuty {
057
058  @ClassRule
059  public static final HBaseClassTestRule CLASS_RULE =
060      HBaseClassTestRule.forClass(TestRegionServerReportForDuty.class);
061
062  private static final Logger LOG = LoggerFactory.getLogger(TestRegionServerReportForDuty.class);
063
064  private static final long SLEEP_INTERVAL = 500;
065
066  private HBaseTestingUtil testUtil;
067  private LocalHBaseCluster cluster;
068  private RegionServerThread rs;
069  private RegionServerThread rs2;
070  private MasterThread master;
071  private MasterThread backupMaster;
072
073  @Before
074  public void setUp() throws Exception {
075    testUtil = new HBaseTestingUtil();
076    testUtil.startMiniDFSCluster(1);
077    testUtil.startMiniZKCluster(1);
078    testUtil.createRootDir();
079    cluster = new LocalHBaseCluster(testUtil.getConfiguration(), 0, 0);
080  }
081
082  @After
083  public void tearDown() throws Exception {
084    cluster.shutdown();
085    cluster.join();
086    testUtil.shutdownMiniZKCluster();
087    testUtil.shutdownMiniDFSCluster();
088  }
089
090  private static class LogCapturer {
091    private StringWriter sw = new StringWriter();
092    private org.apache.logging.log4j.core.appender.WriterAppender appender;
093    private org.apache.logging.log4j.core.Logger logger;
094
095    LogCapturer(org.apache.logging.log4j.core.Logger logger) {
096      this.logger = logger;
097      this.appender = org.apache.logging.log4j.core.appender.WriterAppender.newBuilder()
098        .setName("test").setTarget(sw).build();
099      this.logger.addAppender(this.appender);
100    }
101
102    String getOutput() {
103      return sw.toString();
104    }
105
106    public void stopCapturing() {
107      this.logger.removeAppender(this.appender);
108    }
109  }
110
111  /**
112   * This test HMaster class will always throw ServerNotRunningYetException if checked.
113   */
114  public static class NeverInitializedMaster extends HMaster {
115    public NeverInitializedMaster(Configuration conf) throws IOException {
116      super(conf);
117    }
118
119    @Override
120    protected void checkServiceStarted() throws ServerNotRunningYetException {
121      throw new ServerNotRunningYetException("Server is not running yet");
122    }
123  }
124
125  /**
126   * Tests region server should backoff to report for duty if master is not ready.
127   */
128  @Test
129  public void testReportForDutyBackoff() throws IOException, InterruptedException {
130    cluster.getConfiguration().set(HConstants.MASTER_IMPL, NeverInitializedMaster.class.getName());
131    master = cluster.addMaster();
132    master.start();
133
134    LogCapturer capturer =
135      new LogCapturer((org.apache.logging.log4j.core.Logger) org.apache.logging.log4j.LogManager
136        .getLogger(HRegionServer.class));
137    // Set sleep interval relatively low so that exponential backoff is more demanding.
138    int msginterval = 100;
139    cluster.getConfiguration().setInt("hbase.regionserver.msginterval", msginterval);
140    rs = cluster.addRegionServer();
141    rs.start();
142
143    int interval = 10_000;
144    Thread.sleep(interval);
145    capturer.stopCapturing();
146    String output = capturer.getOutput();
147    LOG.info("{}", output);
148    String failMsg = "reportForDuty failed;";
149    int count = StringUtils.countMatches(output, failMsg);
150
151    // Following asserts the actual retry number is in range (expectedRetry/2, expectedRetry*2).
152    // Ideally we can assert the exact retry count. We relax here to tolerate contention error.
153    int expectedRetry = (int)Math.ceil(Math.log(interval - msginterval));
154    assertTrue(String.format("reportForDuty retries %d times, less than expected min %d",
155        count, expectedRetry / 2), count > expectedRetry / 2);
156    assertTrue(String.format("reportForDuty retries %d times, more than expected max %d",
157        count, expectedRetry * 2), count < expectedRetry * 2);
158  }
159
160  /**
161   * Tests region sever reportForDuty with backup master becomes primary master after
162   * the first master goes away.
163   */
164  @Test
165  public void testReportForDutyWithMasterChange() throws Exception {
166
167    // Start a master and wait for it to become the active/primary master.
168    // Use a random unique port
169    cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtil.randomFreePort());
170    cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 1);
171    cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, 1);
172    master = cluster.addMaster();
173    rs = cluster.addRegionServer();
174    LOG.debug("Starting master: " + master.getMaster().getServerName());
175    master.start();
176    rs.start();
177
178    waitForClusterOnline(master);
179
180    // Add a 2nd region server
181    cluster.getConfiguration().set(HConstants.REGION_SERVER_IMPL, MyRegionServer.class.getName());
182    rs2 = cluster.addRegionServer();
183    // Start the region server. This region server will refresh RPC connection
184    // from the current active master to the next active master before completing
185    // reportForDuty
186    LOG.debug("Starting 2nd region server: " + rs2.getRegionServer().getServerName());
187    rs2.start();
188
189    waitForSecondRsStarted();
190
191    // Stop the current master.
192    master.getMaster().stop("Stopping master");
193
194    // Start a new master and use another random unique port
195    // Also let it wait for exactly 2 region severs to report in.
196    // TODO: Add handling bindexception. Random port is not enough!!! Flakie test!
197    cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtil.randomFreePort());
198    cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 2);
199    cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, 2);
200    backupMaster = cluster.addMaster();
201    LOG.debug("Starting new master: " + backupMaster.getMaster().getServerName());
202    backupMaster.start();
203
204    waitForClusterOnline(backupMaster);
205
206    // Do some checking/asserts here.
207    assertTrue(backupMaster.getMaster().isActiveMaster());
208    assertTrue(backupMaster.getMaster().isInitialized());
209    assertEquals(backupMaster.getMaster().getServerManager().getOnlineServersList().size(), 2);
210
211  }
212  
213  /**
214   * Tests region sever reportForDuty with RS RPC retry
215   */
216  @Test
217  public void testReportForDutyWithRSRpcRetry() throws Exception {
218    ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1,
219      new ThreadFactoryBuilder().setNameFormat("RSDelayedStart-pool-%d").setDaemon(true)
220        .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
221
222    // Start a master and wait for it to become the active/primary master.
223    // Use a random unique port
224    cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtil.randomFreePort());
225    // Override the default RS RPC retry interval of 100ms to 300ms
226    cluster.getConfiguration().setLong("hbase.regionserver.rpc.retry.interval", 300);
227    cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 1);
228    cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, 1);
229    master = cluster.addMaster();
230    rs = cluster.addRegionServer();
231    LOG.debug("Starting master: " + master.getMaster().getServerName());
232    master.start();
233    // Delay the RS start so that the meta assignment fails in first attempt and goes to retry block
234    scheduledThreadPoolExecutor.schedule(new Runnable() {
235      @Override
236      public void run() {
237        rs.start();
238      }
239    }, 1000, TimeUnit.MILLISECONDS);
240
241    waitForClusterOnline(master);
242  }
243
244  /**
245   * Tests region sever reportForDuty with a non-default environment edge
246   */
247  @Test
248  public void testReportForDutyWithEnvironmentEdge() throws Exception {
249    // Start a master and wait for it to become the active/primary master.
250    // Use a random unique port
251    cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtil.randomFreePort());
252    // Set the dispatch and retry delay to 0 since we want the rpc request to be sent immediately
253    cluster.getConfiguration().setInt("hbase.procedure.remote.dispatcher.delay.msec", 0);
254    cluster.getConfiguration().setLong("hbase.regionserver.rpc.retry.interval", 0);
255
256    cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 1);
257    cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, 1);
258
259    // Inject non-default environment edge
260    IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge();
261    EnvironmentEdgeManager.injectEdge(edge);
262    master = cluster.addMaster();
263    rs = cluster.addRegionServer();
264    LOG.debug("Starting master: " + master.getMaster().getServerName());
265    master.start();
266    rs.start();
267    waitForClusterOnline(master);
268  }
269
270  private void waitForClusterOnline(MasterThread master) throws InterruptedException {
271    while (true) {
272      if (master.getMaster().isInitialized()) {
273        break;
274      }
275      Thread.sleep(SLEEP_INTERVAL);
276      LOG.debug("Waiting for master to come online ...");
277    }
278    rs.waitForServerOnline();
279  }
280
281  private void waitForSecondRsStarted() throws InterruptedException {
282    while (true) {
283      if (((MyRegionServer) rs2.getRegionServer()).getRpcStubCreatedFlag() == true) {
284        break;
285      }
286      Thread.sleep(SLEEP_INTERVAL);
287      LOG.debug("Waiting 2nd RS to be started ...");
288    }
289  }
290
291  // Create a Region Server that provide a hook so that we can wait for the master switch over
292  // before continuing reportForDuty to the mater.
293  // The idea is that we get a RPC connection to the first active master, then we wait.
294  // The first master goes down, the second master becomes the active master. The region
295  // server continues reportForDuty. It should succeed with the new master.
296  public static class MyRegionServer extends MiniHBaseClusterRegionServer {
297
298    private ServerName sn;
299    // This flag is to make sure this rs has obtained the rpcStub to the first master.
300    // The first master will go down after this.
301    private boolean rpcStubCreatedFlag = false;
302    private boolean masterChanged = false;
303
304    public MyRegionServer(Configuration conf) throws IOException, KeeperException,
305        InterruptedException {
306      super(conf);
307    }
308
309    @Override
310    protected synchronized ServerName createRegionServerStatusStub(boolean refresh) {
311      sn = super.createRegionServerStatusStub(refresh);
312      rpcStubCreatedFlag = true;
313
314      // Wait for master switch over. Only do this for the second region server.
315      while (!masterChanged) {
316        ServerName newSn = super.getMasterAddressTracker().getMasterAddress(true);
317        if (newSn != null && !newSn.equals(sn)) {
318          masterChanged = true;
319          break;
320        }
321        try {
322          Thread.sleep(SLEEP_INTERVAL);
323        } catch (InterruptedException e) {
324          return null;
325        }
326        LOG.debug("Waiting for master switch over ... ");
327      }
328      return sn;
329    }
330
331    public boolean getRpcStubCreatedFlag() {
332      return rpcStubCreatedFlag;
333    }
334  }
335}