001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.chaos.actions;
020
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Collection;
024import java.util.HashSet;
025import java.util.LinkedList;
026import java.util.List;
027import java.util.Map;
028import java.util.Set;
029import java.util.function.BiConsumer;
030import java.util.function.Consumer;
031import org.apache.commons.lang3.RandomUtils;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.hbase.ClusterMetrics;
034import org.apache.hadoop.hbase.HBaseCluster;
035import org.apache.hadoop.hbase.HBaseTestingUtility;
036import org.apache.hadoop.hbase.HRegionInfo;
037import org.apache.hadoop.hbase.IntegrationTestingUtility;
038import org.apache.hadoop.hbase.MiniHBaseCluster;
039import org.apache.hadoop.hbase.ServerMetrics;
040import org.apache.hadoop.hbase.ServerName;
041import org.apache.hadoop.hbase.TableName;
042import org.apache.hadoop.hbase.chaos.monkies.PolicyBasedChaosMonkey;
043import org.apache.hadoop.hbase.client.Admin;
044import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
045import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
046import org.apache.hadoop.hbase.client.TableDescriptor;
047import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
048import org.apache.hadoop.hbase.util.Bytes;
049import org.slf4j.Logger;
050import org.slf4j.LoggerFactory;
051
052/**
053 * A (possibly mischievous) action that the ChaosMonkey can perform.
054 */
055public class Action {
056
057  public static final String KILL_MASTER_TIMEOUT_KEY =
058    "hbase.chaosmonkey.action.killmastertimeout";
059  public static final String START_MASTER_TIMEOUT_KEY =
060    "hbase.chaosmonkey.action.startmastertimeout";
061  public static final String KILL_RS_TIMEOUT_KEY = "hbase.chaosmonkey.action.killrstimeout";
062  public static final String START_RS_TIMEOUT_KEY = "hbase.chaosmonkey.action.startrstimeout";
063  public static final String KILL_ZK_NODE_TIMEOUT_KEY =
064    "hbase.chaosmonkey.action.killzknodetimeout";
065  public static final String START_ZK_NODE_TIMEOUT_KEY =
066    "hbase.chaosmonkey.action.startzknodetimeout";
067  public static final String KILL_DATANODE_TIMEOUT_KEY =
068    "hbase.chaosmonkey.action.killdatanodetimeout";
069  public static final String START_DATANODE_TIMEOUT_KEY =
070    "hbase.chaosmonkey.action.startdatanodetimeout";
071  public static final String KILL_NAMENODE_TIMEOUT_KEY =
072      "hbase.chaosmonkey.action.killnamenodetimeout";
073  public static final String START_NAMENODE_TIMEOUT_KEY =
074      "hbase.chaosmonkey.action.startnamenodetimeout";
075
076  private static final Logger LOG = LoggerFactory.getLogger(Action.class);
077
078  protected static final long KILL_MASTER_TIMEOUT_DEFAULT = PolicyBasedChaosMonkey.TIMEOUT;
079  protected static final long START_MASTER_TIMEOUT_DEFAULT = PolicyBasedChaosMonkey.TIMEOUT;
080  protected static final long KILL_RS_TIMEOUT_DEFAULT = PolicyBasedChaosMonkey.TIMEOUT;
081  protected static final long START_RS_TIMEOUT_DEFAULT = PolicyBasedChaosMonkey.TIMEOUT;
082  protected static final long KILL_ZK_NODE_TIMEOUT_DEFAULT = PolicyBasedChaosMonkey.TIMEOUT;
083  protected static final long START_ZK_NODE_TIMEOUT_DEFAULT = PolicyBasedChaosMonkey.TIMEOUT;
084  protected static final long KILL_DATANODE_TIMEOUT_DEFAULT = PolicyBasedChaosMonkey.TIMEOUT;
085  protected static final long START_DATANODE_TIMEOUT_DEFAULT = PolicyBasedChaosMonkey.TIMEOUT;
086  protected static final long KILL_NAMENODE_TIMEOUT_DEFAULT = PolicyBasedChaosMonkey.TIMEOUT;
087  protected static final long START_NAMENODE_TIMEOUT_DEFAULT = PolicyBasedChaosMonkey.TIMEOUT;
088
089  protected ActionContext context;
090  protected HBaseCluster cluster;
091  protected ClusterMetrics initialStatus;
092  protected ServerName[] initialServers;
093
094  protected long killMasterTimeout;
095  protected long startMasterTimeout;
096  protected long killRsTimeout;
097  protected long startRsTimeout;
098  protected long killZkNodeTimeout;
099  protected long startZkNodeTimeout;
100  protected long killDataNodeTimeout;
101  protected long startDataNodeTimeout;
102  protected long killNameNodeTimeout;
103  protected long startNameNodeTimeout;
104
105  public void init(ActionContext context) throws IOException {
106    this.context = context;
107    cluster = context.getHBaseCluster();
108    initialStatus = cluster.getInitialClusterMetrics();
109    Collection<ServerName> regionServers = initialStatus.getLiveServerMetrics().keySet();
110    initialServers = regionServers.toArray(new ServerName[regionServers.size()]);
111
112    killMasterTimeout = cluster.getConf().getLong(KILL_MASTER_TIMEOUT_KEY,
113      KILL_MASTER_TIMEOUT_DEFAULT);
114    startMasterTimeout = cluster.getConf().getLong(START_MASTER_TIMEOUT_KEY,
115      START_MASTER_TIMEOUT_DEFAULT);
116    killRsTimeout = cluster.getConf().getLong(KILL_RS_TIMEOUT_KEY, KILL_RS_TIMEOUT_DEFAULT);
117    startRsTimeout = cluster.getConf().getLong(START_RS_TIMEOUT_KEY, START_RS_TIMEOUT_DEFAULT);
118    killZkNodeTimeout = cluster.getConf().getLong(KILL_ZK_NODE_TIMEOUT_KEY,
119      KILL_ZK_NODE_TIMEOUT_DEFAULT);
120    startZkNodeTimeout = cluster.getConf().getLong(START_ZK_NODE_TIMEOUT_KEY,
121      START_ZK_NODE_TIMEOUT_DEFAULT);
122    killDataNodeTimeout = cluster.getConf().getLong(KILL_DATANODE_TIMEOUT_KEY,
123      KILL_DATANODE_TIMEOUT_DEFAULT);
124    startDataNodeTimeout = cluster.getConf().getLong(START_DATANODE_TIMEOUT_KEY,
125      START_DATANODE_TIMEOUT_DEFAULT);
126    killNameNodeTimeout =
127        cluster.getConf().getLong(KILL_NAMENODE_TIMEOUT_KEY, KILL_NAMENODE_TIMEOUT_DEFAULT);
128    startNameNodeTimeout =
129        cluster.getConf().getLong(START_NAMENODE_TIMEOUT_KEY, START_NAMENODE_TIMEOUT_DEFAULT);
130
131  }
132
133  public void perform() throws Exception { }
134
135  /** Returns current region servers - active master */
136  protected ServerName[] getCurrentServers() throws IOException {
137    ClusterMetrics clusterStatus = cluster.getClusterMetrics();
138    Collection<ServerName> regionServers = clusterStatus.getLiveServerMetrics().keySet();
139    int count = regionServers == null ? 0 : regionServers.size();
140    if (count <= 0) {
141      return new ServerName [] {};
142    }
143    ServerName master = clusterStatus.getMasterName();
144    Set<ServerName> masters = new HashSet<ServerName>();
145    masters.add(master);
146    masters.addAll(clusterStatus.getBackupMasterNames());
147    ArrayList<ServerName> tmp = new ArrayList<>(count);
148    tmp.addAll(regionServers);
149    tmp.removeAll(masters);
150    return tmp.toArray(new ServerName[tmp.size()]);
151  }
152
153  protected void killMaster(ServerName server) throws IOException {
154    LOG.info("Killing master {}", server);
155    cluster.killMaster(server);
156    cluster.waitForMasterToStop(server, killMasterTimeout);
157    LOG.info("Killed master " + server);
158  }
159
160  protected void startMaster(ServerName server) throws IOException {
161    LOG.info("Starting master {}", server.getHostname());
162    cluster.startMaster(server.getHostname(), server.getPort());
163    cluster.waitForActiveAndReadyMaster(startMasterTimeout);
164    LOG.info("Started master " + server.getHostname());
165  }
166
167  protected void stopRs(ServerName server) throws IOException {
168    LOG.info("Stopping regionserver {}", server);
169    cluster.stopRegionServer(server);
170    cluster.waitForRegionServerToStop(server, killRsTimeout);
171    LOG.info("Stoppiong regionserver {}. Reported num of rs:{}", server,
172        cluster.getClusterMetrics().getLiveServerMetrics().size());
173  }
174
175  protected void suspendRs(ServerName server) throws IOException {
176    LOG.info("Suspending regionserver {}", server);
177    cluster.suspendRegionServer(server);
178    if(!(cluster instanceof MiniHBaseCluster)){
179      cluster.waitForRegionServerToStop(server, killRsTimeout);
180    }
181    LOG.info("Suspending regionserver {}. Reported num of rs:{}", server,
182        cluster.getClusterMetrics().getLiveServerMetrics().size());
183  }
184
185  protected void resumeRs(ServerName server) throws IOException {
186    LOG.info("Resuming regionserver {}", server);
187    cluster.resumeRegionServer(server);
188    if(!(cluster instanceof MiniHBaseCluster)){
189      cluster.waitForRegionServerToStart(server.getHostname(), server.getPort(), startRsTimeout);
190    }
191    LOG.info("Resuming regionserver {}. Reported num of rs:{}", server,
192        cluster.getClusterMetrics().getLiveServerMetrics().size());
193  }
194
195  protected void killRs(ServerName server) throws IOException {
196    LOG.info("Killing regionserver {}", server);
197    cluster.killRegionServer(server);
198    cluster.waitForRegionServerToStop(server, killRsTimeout);
199    LOG.info("Killed regionserver {}. Reported num of rs:{}", server,
200        cluster.getClusterMetrics().getLiveServerMetrics().size());
201  }
202
203  protected void startRs(ServerName server) throws IOException {
204    LOG.info("Starting regionserver {}", server.getAddress());
205    cluster.startRegionServer(server.getHostname(), server.getPort());
206    cluster.waitForRegionServerToStart(server.getHostname(), server.getPort(), startRsTimeout);
207    LOG.info("Started regionserver {}. Reported num of rs:{}", server.getAddress(),
208      cluster.getClusterMetrics().getLiveServerMetrics().size());
209  }
210
211  protected void killZKNode(ServerName server) throws IOException {
212    LOG.info("Killing zookeeper node {}", server);
213    cluster.killZkNode(server);
214    cluster.waitForZkNodeToStop(server, killZkNodeTimeout);
215    LOG.info("Killed zookeeper node {}. Reported num of rs:{}", server,
216      cluster.getClusterMetrics().getLiveServerMetrics().size());
217  }
218
219  protected void startZKNode(ServerName server) throws IOException {
220    LOG.info("Starting zookeeper node {}", server.getHostname());
221    cluster.startZkNode(server.getHostname(), server.getPort());
222    cluster.waitForZkNodeToStart(server, startZkNodeTimeout);
223    LOG.info("Started zookeeper node {}", server);
224  }
225
226  protected void killDataNode(ServerName server) throws IOException {
227    LOG.info("Killing datanode {}", server);
228    cluster.killDataNode(server);
229    cluster.waitForDataNodeToStop(server, killDataNodeTimeout);
230    LOG.info("Killed datanode {}. Reported num of rs:{}", server,
231      cluster.getClusterMetrics().getLiveServerMetrics().size());
232  }
233
234  protected void startDataNode(ServerName server) throws IOException {
235    LOG.info("Starting datanode {}", server.getHostname());
236    cluster.startDataNode(server);
237    cluster.waitForDataNodeToStart(server, startDataNodeTimeout);
238    LOG.info("Started datanode {}", server);
239  }
240
241  protected void killNameNode(ServerName server) throws IOException {
242    LOG.info("Killing namenode :-{}", server.getHostname());
243    cluster.killNameNode(server);
244    cluster.waitForNameNodeToStop(server, killNameNodeTimeout);
245    LOG.info("Killed namenode:{}. Reported num of rs:{}", server,
246        cluster.getClusterMetrics().getLiveServerMetrics().size());
247  }
248
249  protected void startNameNode(ServerName server) throws IOException {
250    LOG.info("Starting Namenode :-{}", server.getHostname());
251    cluster.startNameNode(server);
252    cluster.waitForNameNodeToStart(server, startNameNodeTimeout);
253    LOG.info("Started namenode:{}", server);
254  }
255  protected void unbalanceRegions(ClusterMetrics clusterStatus,
256      List<ServerName> fromServers, List<ServerName> toServers,
257      double fractionOfRegions) throws Exception {
258    List<byte[]> victimRegions = new LinkedList<>();
259    for (Map.Entry<ServerName, ServerMetrics> entry
260      : clusterStatus.getLiveServerMetrics().entrySet()) {
261      ServerName sn = entry.getKey();
262      ServerMetrics serverLoad = entry.getValue();
263      // Ugh.
264      List<byte[]> regions = new LinkedList<>(serverLoad.getRegionMetrics().keySet());
265      int victimRegionCount = (int)Math.ceil(fractionOfRegions * regions.size());
266      LOG.debug("Removing {} regions from {}", victimRegionCount, sn);
267      for (int i = 0; i < victimRegionCount; ++i) {
268        int victimIx = RandomUtils.nextInt(0, regions.size());
269        String regionId = HRegionInfo.encodeRegionName(regions.remove(victimIx));
270        victimRegions.add(Bytes.toBytes(regionId));
271      }
272    }
273
274    LOG.info("Moving {} regions from {} servers to {} different servers", victimRegions.size(),
275        fromServers.size(), toServers.size());
276    Admin admin = this.context.getHBaseIntegrationTestingUtility().getAdmin();
277    for (byte[] victimRegion : victimRegions) {
278      // Don't keep moving regions if we're
279      // trying to stop the monkey.
280      if (context.isStopping()) {
281        break;
282      }
283      int targetIx = RandomUtils.nextInt(0, toServers.size());
284      admin.move(victimRegion, toServers.get(targetIx));
285    }
286  }
287
288  protected void forceBalancer() throws Exception {
289    Admin admin = this.context.getHBaseIntegrationTestingUtility().getAdmin();
290    boolean result = false;
291    try {
292      result = admin.balancer();
293    } catch (Exception e) {
294      LOG.warn("Got exception while doing balance ", e);
295    }
296    if (!result) {
297      LOG.error("Balancer didn't succeed");
298    }
299  }
300
301  protected void setBalancer(boolean onOrOff, boolean synchronous) throws Exception {
302    Admin admin = this.context.getHBaseIntegrationTestingUtility().getAdmin();
303    try {
304      admin.balancerSwitch(onOrOff, synchronous);
305    } catch (Exception e) {
306      LOG.warn("Got exception while switching balance ", e);
307    }
308  }
309
310  public Configuration getConf() {
311    return cluster.getConf();
312  }
313
314  /**
315   * Apply a transform to all columns in a given table. If there are no columns in a table
316   * or if the context is stopping does nothing.
317   * @param tableName the table to modify
318   * @param transform the modification to perform. Callers will have the
319   *                  column name as a string and a column family builder available to them
320   */
321  protected void modifyAllTableColumns(TableName tableName, BiConsumer<String, ColumnFamilyDescriptorBuilder> transform) throws IOException {
322    HBaseTestingUtility util = this.context.getHBaseIntegrationTestingUtility();
323    Admin admin = util.getAdmin();
324
325    TableDescriptor tableDescriptor = admin.getDescriptor(tableName);
326    ColumnFamilyDescriptor[] columnDescriptors = tableDescriptor.getColumnFamilies();
327
328    if (columnDescriptors == null || columnDescriptors.length == 0) {
329      return;
330    }
331
332    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableDescriptor);
333    for (ColumnFamilyDescriptor descriptor : columnDescriptors) {
334      ColumnFamilyDescriptorBuilder cfd = ColumnFamilyDescriptorBuilder.newBuilder(descriptor);
335      transform.accept(descriptor.getNameAsString(), cfd);
336      builder.modifyColumnFamily(cfd.build());
337    }
338
339    // Don't try the modify if we're stopping
340    if (this.context.isStopping()) {
341      return;
342    }
343    admin.modifyTable(builder.build());
344  }
345
346  /**
347   * Apply a transform to all columns in a given table.
348   * If there are no columns in a table or if the context is stopping does nothing.
349   * @param tableName the table to modify
350   * @param transform the modification to perform on each column family descriptor builder
351   */
352  protected void modifyAllTableColumns(TableName tableName, Consumer<ColumnFamilyDescriptorBuilder> transform) throws IOException {
353    modifyAllTableColumns(tableName, (name, cfd) -> transform.accept(cfd));
354  }
355
356  /**
357   * Context for Action's
358   */
359  public static class ActionContext {
360    private IntegrationTestingUtility util;
361
362    public ActionContext(IntegrationTestingUtility util) {
363      this.util = util;
364    }
365
366    public IntegrationTestingUtility getHBaseIntegrationTestingUtility() {
367      return util;
368    }
369
370    public HBaseCluster getHBaseCluster() {
371      return util.getHBaseClusterInterface();
372    }
373
374    public boolean isStopping() {
375      return false;
376    }
377  }
378}