001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.chaos.actions; 019 020import java.io.IOException; 021import java.util.ArrayDeque; 022import java.util.HashSet; 023import java.util.List; 024import java.util.Queue; 025import java.util.Random; 026import java.util.Set; 027import java.util.concurrent.ThreadLocalRandom; 028import java.util.stream.Collectors; 029import org.apache.hadoop.hbase.ServerName; 030import org.apache.hadoop.hbase.chaos.monkies.PolicyBasedChaosMonkey; 031import org.apache.hadoop.hbase.net.Address; 032import org.apache.hadoop.hbase.util.Threads; 033import org.apache.hadoop.util.Shell; 034import org.slf4j.Logger; 035import org.slf4j.LoggerFactory; 036 037/** 038 * Suspend then resume a ratio of the regionservers in a rolling fashion. At each step, either 039 * suspend a server, or resume one, sleeping (sleepTime) in between steps. The parameter 040 * maxSuspendedServers limits the maximum number of servers that can be down at the same time during 041 * rolling restarts. 042 */ 043public class RollingBatchSuspendResumeRsAction extends Action { 044 private static final Logger LOG = 045 LoggerFactory.getLogger(RollingBatchSuspendResumeRsAction.class); 046 private final float ratio; 047 private final long sleepTime; 048 private final int maxSuspendedServers; // number of maximum suspended servers at any given time. 049 050 public RollingBatchSuspendResumeRsAction(long sleepTime, float ratio) { 051 this(sleepTime, ratio, 5); 052 } 053 054 public RollingBatchSuspendResumeRsAction(long sleepTime, float ratio, int maxSuspendedServers) { 055 this.ratio = ratio; 056 this.sleepTime = sleepTime; 057 this.maxSuspendedServers = maxSuspendedServers; 058 } 059 060 enum SuspendOrResume { 061 SUSPEND, 062 RESUME 063 } 064 065 @Override 066 protected Logger getLogger() { 067 return LOG; 068 } 069 070 private void confirmResumed(Set<ServerName> resumedServers) { 071 if (resumedServers.isEmpty()) { 072 return; 073 } 074 try { 075 Set<Address> addrs = 076 resumedServers.stream().map(ServerName::getAddress).collect(Collectors.toSet()); 077 cluster.getClusterMetrics().getLiveServerMetrics().keySet().stream() 078 .map(ServerName::getAddress).forEach(addrs::remove); 079 for (Address addr : addrs) { 080 LOG.warn("Region server {} is crashed after resuming, starting", addr); 081 startRs(ServerName.valueOf(addr, -1)); 082 } 083 } catch (IOException e) { 084 LOG.warn("Failed to check liveness for region servers {}", resumedServers); 085 } 086 } 087 088 @Override 089 public void perform() throws Exception { 090 getLogger().info("Performing action: Rolling batch suspending {}% of region servers", 091 (int) (ratio * 100)); 092 List<ServerName> selectedServers = selectServers(); 093 Queue<ServerName> serversToBeSuspended = new ArrayDeque<>(selectedServers); 094 Queue<ServerName> suspendedServers = new ArrayDeque<>(); 095 // After resuming, usually the region server will crash soon because of session expired, and if 096 // the region server is not started by 'autostart', it will crash for ever. So here we record 097 // these region servers and make sure that they are all alive before exiting this action. See 098 // HBASE-29206 for more details. 099 Set<ServerName> resumedServers = new HashSet<>(); 100 Random rand = ThreadLocalRandom.current(); 101 // loop while there are servers to be suspended or suspended servers to be resumed 102 while ( 103 (!serversToBeSuspended.isEmpty() || !suspendedServers.isEmpty()) && !context.isStopping() 104 ) { 105 final SuspendOrResume action; 106 if (serversToBeSuspended.isEmpty()) { // no more servers to suspend 107 action = SuspendOrResume.RESUME; 108 } else if (suspendedServers.isEmpty()) { 109 action = SuspendOrResume.SUSPEND; // no more servers to resume 110 } else if (suspendedServers.size() >= maxSuspendedServers) { 111 // we have too many suspended servers. Don't suspend any more 112 action = SuspendOrResume.RESUME; 113 } else { 114 // do a coin toss 115 action = rand.nextBoolean() ? SuspendOrResume.SUSPEND : SuspendOrResume.RESUME; 116 } 117 ServerName server; 118 switch (action) { 119 case SUSPEND: 120 server = serversToBeSuspended.remove(); 121 try { 122 suspendRs(server); 123 } catch (Shell.ExitCodeException e) { 124 LOG.warn("Problem suspending but presume successful; code={}", e.getExitCode(), e); 125 } 126 suspendedServers.add(server); 127 break; 128 case RESUME: 129 server = suspendedServers.remove(); 130 try { 131 resumeRs(server); 132 } catch (Shell.ExitCodeException e) { 133 LOG.info("Problem resuming, will retry; code={}", e.getExitCode(), e); 134 } 135 resumedServers.add(server); 136 break; 137 } 138 139 getLogger().info("Sleeping for:{}", sleepTime); 140 Threads.sleep(sleepTime); 141 confirmResumed(resumedServers); 142 } 143 } 144 145 protected List<ServerName> selectServers() throws IOException { 146 return PolicyBasedChaosMonkey.selectRandomItems(getCurrentServers(), ratio); 147 } 148 149}