001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.chaos.actions;
020
021import java.io.IOException;
022import java.util.LinkedList;
023import java.util.List;
024import java.util.Queue;
025
026import org.apache.commons.lang3.RandomUtils;
027import org.apache.hadoop.hbase.ServerName;
028import org.apache.hadoop.hbase.chaos.monkies.PolicyBasedChaosMonkey;
029import org.apache.hadoop.hbase.util.Threads;
030import org.apache.hadoop.util.Shell;
031import org.slf4j.Logger;
032import org.slf4j.LoggerFactory;
033
034/**
035 * Suspend then resume a ratio of the regionservers in a rolling fashion. At each step, either
036 * suspend a server, or resume one, sleeping (sleepTime) in between steps. The parameter
037 * maxSuspendedServers limits the maximum number of servers that can be down at the same time
038 * during rolling restarts.
039 */
040public class RollingBatchSuspendResumeRsAction extends Action {
041  private static final Logger LOG =
042      LoggerFactory.getLogger(RollingBatchSuspendResumeRsAction.class);
043  private float ratio;
044  private long sleepTime;
045  private int maxSuspendedServers; // number of maximum suspended servers at any given time.
046
047  public RollingBatchSuspendResumeRsAction(long sleepTime, float ratio) {
048    this(sleepTime, ratio, 5);
049  }
050
051  public RollingBatchSuspendResumeRsAction(long sleepTime, float ratio, int maxSuspendedServers) {
052    this.ratio = ratio;
053    this.sleepTime = sleepTime;
054    this.maxSuspendedServers = maxSuspendedServers;
055  }
056
057  enum SuspendOrResume {
058    SUSPEND, RESUME
059  }
060
061  @Override
062  public void perform() throws Exception {
063    LOG.info(String.format("Performing action: Rolling batch restarting %d%% of region servers",
064        (int) (ratio * 100)));
065    List<ServerName> selectedServers = selectServers();
066
067    Queue<ServerName> serversToBeSuspended = new LinkedList<>(selectedServers);
068    Queue<ServerName> suspendedServers = new LinkedList<>();
069
070    // loop while there are servers to be suspended or suspended servers to be resumed
071    while ((!serversToBeSuspended.isEmpty() || !suspendedServers.isEmpty()) && !context
072        .isStopping()) {
073      SuspendOrResume action;
074
075      if (serversToBeSuspended.isEmpty()) { // no more servers to suspend
076        action = SuspendOrResume.RESUME;
077      } else if (suspendedServers.isEmpty()) {
078        action = SuspendOrResume.SUSPEND; // no more servers to resume
079      } else if (suspendedServers.size() >= maxSuspendedServers) {
080        // we have too many suspended servers. Don't suspend any more
081        action = SuspendOrResume.RESUME;
082      } else {
083        // do a coin toss
084        action = RandomUtils.nextBoolean() ? SuspendOrResume.SUSPEND : SuspendOrResume.RESUME;
085      }
086
087      ServerName server;
088      switch (action) {
089        case SUSPEND:
090          server = serversToBeSuspended.remove();
091          try {
092            suspendRs(server);
093          } catch (Shell.ExitCodeException e) {
094            LOG.warn("Problem suspending but presume successful; code={}", e.getExitCode(), e);
095          }
096          suspendedServers.add(server);
097          break;
098        case RESUME:
099          server = suspendedServers.remove();
100          try {
101            resumeRs(server);
102          } catch (Shell.ExitCodeException e) {
103            LOG.info("Problem resuming, will retry; code={}", e.getExitCode(), e);
104          }
105          break;
106      }
107
108      LOG.info("Sleeping for:{}", sleepTime);
109      Threads.sleep(sleepTime);
110    }
111  }
112
113  protected List<ServerName> selectServers() throws IOException {
114    return PolicyBasedChaosMonkey.selectRandomItems(getCurrentServers(), ratio);
115  }
116
117}