001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.chaos.actions;
020
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Collection;
024import java.util.LinkedList;
025import java.util.List;
026import java.util.Map;
027import java.util.function.BiConsumer;
028import java.util.function.Consumer;
029import org.apache.commons.lang3.RandomUtils;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.hbase.ClusterMetrics;
032import org.apache.hadoop.hbase.HBaseCluster;
033import org.apache.hadoop.hbase.HBaseTestingUtility;
034import org.apache.hadoop.hbase.HRegionInfo;
035import org.apache.hadoop.hbase.IntegrationTestingUtility;
036import org.apache.hadoop.hbase.ServerMetrics;
037import org.apache.hadoop.hbase.ServerName;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.hadoop.hbase.chaos.monkies.PolicyBasedChaosMonkey;
040import org.apache.hadoop.hbase.client.Admin;
041import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
042import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
043import org.apache.hadoop.hbase.client.TableDescriptor;
044import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
045import org.apache.hadoop.hbase.util.Bytes;
046import org.slf4j.Logger;
047import org.slf4j.LoggerFactory;
048
049/**
050 * A (possibly mischievous) action that the ChaosMonkey can perform.
051 */
052public class Action {
053
054  public static final String KILL_MASTER_TIMEOUT_KEY =
055    "hbase.chaosmonkey.action.killmastertimeout";
056  public static final String START_MASTER_TIMEOUT_KEY =
057    "hbase.chaosmonkey.action.startmastertimeout";
058  public static final String KILL_RS_TIMEOUT_KEY = "hbase.chaosmonkey.action.killrstimeout";
059  public static final String START_RS_TIMEOUT_KEY = "hbase.chaosmonkey.action.startrstimeout";
060  public static final String KILL_ZK_NODE_TIMEOUT_KEY =
061    "hbase.chaosmonkey.action.killzknodetimeout";
062  public static final String START_ZK_NODE_TIMEOUT_KEY =
063    "hbase.chaosmonkey.action.startzknodetimeout";
064  public static final String KILL_DATANODE_TIMEOUT_KEY =
065    "hbase.chaosmonkey.action.killdatanodetimeout";
066  public static final String START_DATANODE_TIMEOUT_KEY =
067    "hbase.chaosmonkey.action.startdatanodetimeout";
068
069  private static final Logger LOG = LoggerFactory.getLogger(Action.class);
070
071  protected static final long KILL_MASTER_TIMEOUT_DEFAULT = PolicyBasedChaosMonkey.TIMEOUT;
072  protected static final long START_MASTER_TIMEOUT_DEFAULT = PolicyBasedChaosMonkey.TIMEOUT;
073  protected static final long KILL_RS_TIMEOUT_DEFAULT = PolicyBasedChaosMonkey.TIMEOUT;
074  protected static final long START_RS_TIMEOUT_DEFAULT = PolicyBasedChaosMonkey.TIMEOUT;
075  protected static final long KILL_ZK_NODE_TIMEOUT_DEFAULT = PolicyBasedChaosMonkey.TIMEOUT;
076  protected static final long START_ZK_NODE_TIMEOUT_DEFAULT = PolicyBasedChaosMonkey.TIMEOUT;
077  protected static final long KILL_DATANODE_TIMEOUT_DEFAULT = PolicyBasedChaosMonkey.TIMEOUT;
078  protected static final long START_DATANODE_TIMEOUT_DEFAULT = PolicyBasedChaosMonkey.TIMEOUT;
079
080  protected ActionContext context;
081  protected HBaseCluster cluster;
082  protected ClusterMetrics initialStatus;
083  protected ServerName[] initialServers;
084
085  protected long killMasterTimeout;
086  protected long startMasterTimeout;
087  protected long killRsTimeout;
088  protected long startRsTimeout;
089  protected long killZkNodeTimeout;
090  protected long startZkNodeTimeout;
091  protected long killDataNodeTimeout;
092  protected long startDataNodeTimeout;
093
094  public void init(ActionContext context) throws IOException {
095    this.context = context;
096    cluster = context.getHBaseCluster();
097    initialStatus = cluster.getInitialClusterMetrics();
098    Collection<ServerName> regionServers = initialStatus.getLiveServerMetrics().keySet();
099    initialServers = regionServers.toArray(new ServerName[regionServers.size()]);
100
101    killMasterTimeout = cluster.getConf().getLong(KILL_MASTER_TIMEOUT_KEY,
102      KILL_MASTER_TIMEOUT_DEFAULT);
103    startMasterTimeout = cluster.getConf().getLong(START_MASTER_TIMEOUT_KEY,
104      START_MASTER_TIMEOUT_DEFAULT);
105    killRsTimeout = cluster.getConf().getLong(KILL_RS_TIMEOUT_KEY, KILL_RS_TIMEOUT_DEFAULT);
106    startRsTimeout = cluster.getConf().getLong(START_RS_TIMEOUT_KEY, START_RS_TIMEOUT_DEFAULT);
107    killZkNodeTimeout = cluster.getConf().getLong(KILL_ZK_NODE_TIMEOUT_KEY,
108      KILL_ZK_NODE_TIMEOUT_DEFAULT);
109    startZkNodeTimeout = cluster.getConf().getLong(START_ZK_NODE_TIMEOUT_KEY,
110      START_ZK_NODE_TIMEOUT_DEFAULT);
111    killDataNodeTimeout = cluster.getConf().getLong(KILL_DATANODE_TIMEOUT_KEY,
112      KILL_DATANODE_TIMEOUT_DEFAULT);
113    startDataNodeTimeout = cluster.getConf().getLong(START_DATANODE_TIMEOUT_KEY,
114      START_DATANODE_TIMEOUT_DEFAULT);
115  }
116
117  public void perform() throws Exception { }
118
119  /** Returns current region servers - active master */
120  protected ServerName[] getCurrentServers() throws IOException {
121    ClusterMetrics clusterStatus = cluster.getClusterMetrics();
122    Collection<ServerName> regionServers = clusterStatus.getLiveServerMetrics().keySet();
123    int count = regionServers == null ? 0 : regionServers.size();
124    if (count <= 0) {
125      return new ServerName [] {};
126    }
127    ServerName master = clusterStatus.getMasterName();
128    if (master == null || !regionServers.contains(master)) {
129      return regionServers.toArray(new ServerName[count]);
130    }
131    if (count == 1) {
132      return new ServerName [] {};
133    }
134    ArrayList<ServerName> tmp = new ArrayList<>(count);
135    tmp.addAll(regionServers);
136    tmp.remove(master);
137    return tmp.toArray(new ServerName[count-1]);
138  }
139
140  protected void killMaster(ServerName server) throws IOException {
141    LOG.info("Killing master " + server);
142    cluster.killMaster(server);
143    cluster.waitForMasterToStop(server, killMasterTimeout);
144    LOG.info("Killed master " + server);
145  }
146
147  protected void startMaster(ServerName server) throws IOException {
148    LOG.info("Starting master " + server.getHostname());
149    cluster.startMaster(server.getHostname(), server.getPort());
150    cluster.waitForActiveAndReadyMaster(startMasterTimeout);
151    LOG.info("Started master " + server.getHostname());
152  }
153
154  protected void killRs(ServerName server) throws IOException {
155    LOG.info("Killing regionserver " + server);
156    cluster.killRegionServer(server);
157    cluster.waitForRegionServerToStop(server, killRsTimeout);
158    LOG.info("Killed regionserver " + server + ". Reported num of rs:"
159        + cluster.getClusterMetrics().getLiveServerMetrics().size());
160  }
161
162  protected void startRs(ServerName server) throws IOException {
163    LOG.info("Starting regionserver " + server.getAddress());
164    cluster.startRegionServer(server.getHostname(), server.getPort());
165    cluster.waitForRegionServerToStart(server.getHostname(), server.getPort(), startRsTimeout);
166    LOG.info("Started regionserver " + server.getAddress() + ". Reported num of rs:"
167      + cluster.getClusterMetrics().getLiveServerMetrics().size());
168  }
169
170  protected void killZKNode(ServerName server) throws IOException {
171    LOG.info("Killing zookeeper node " + server);
172    cluster.killZkNode(server);
173    cluster.waitForZkNodeToStop(server, killZkNodeTimeout);
174    LOG.info("Killed zookeeper node " + server + ". Reported num of rs:"
175      + cluster.getClusterMetrics().getLiveServerMetrics().size());
176  }
177
178  protected void startZKNode(ServerName server) throws IOException {
179    LOG.info("Starting zookeeper node " + server.getHostname());
180    cluster.startZkNode(server.getHostname(), server.getPort());
181    cluster.waitForZkNodeToStart(server, startZkNodeTimeout);
182    LOG.info("Started zookeeper node " + server);
183  }
184
185  protected void killDataNode(ServerName server) throws IOException {
186    LOG.info("Killing datanode " + server);
187    cluster.killDataNode(server);
188    cluster.waitForDataNodeToStop(server, killDataNodeTimeout);
189    LOG.info("Killed datanode " + server + ". Reported num of rs:"
190      + cluster.getClusterMetrics().getLiveServerMetrics().size());
191  }
192
193  protected void startDataNode(ServerName server) throws IOException {
194    LOG.info("Starting datanode " + server.getHostname());
195    cluster.startDataNode(server);
196    cluster.waitForDataNodeToStart(server, startDataNodeTimeout);
197    LOG.info("Started datanode " + server);
198  }
199
200  protected void unbalanceRegions(ClusterMetrics clusterStatus,
201      List<ServerName> fromServers, List<ServerName> toServers,
202      double fractionOfRegions) throws Exception {
203    List<byte[]> victimRegions = new LinkedList<>();
204    for (Map.Entry<ServerName, ServerMetrics> entry
205      : clusterStatus.getLiveServerMetrics().entrySet()) {
206      ServerName sn = entry.getKey();
207      ServerMetrics serverLoad = entry.getValue();
208      // Ugh.
209      List<byte[]> regions = new LinkedList<>(serverLoad.getRegionMetrics().keySet());
210      int victimRegionCount = (int)Math.ceil(fractionOfRegions * regions.size());
211      LOG.debug("Removing " + victimRegionCount + " regions from " + sn);
212      for (int i = 0; i < victimRegionCount; ++i) {
213        int victimIx = RandomUtils.nextInt(0, regions.size());
214        String regionId = HRegionInfo.encodeRegionName(regions.remove(victimIx));
215        victimRegions.add(Bytes.toBytes(regionId));
216      }
217    }
218
219    LOG.info("Moving " + victimRegions.size() + " regions from " + fromServers.size()
220        + " servers to " + toServers.size() + " different servers");
221    Admin admin = this.context.getHBaseIntegrationTestingUtility().getAdmin();
222    for (byte[] victimRegion : victimRegions) {
223      // Don't keep moving regions if we're
224      // trying to stop the monkey.
225      if (context.isStopping()) {
226        break;
227      }
228      int targetIx = RandomUtils.nextInt(0, toServers.size());
229      admin.move(victimRegion, Bytes.toBytes(toServers.get(targetIx).getServerName()));
230    }
231  }
232
233  protected void forceBalancer() throws Exception {
234    Admin admin = this.context.getHBaseIntegrationTestingUtility().getAdmin();
235    boolean result = false;
236    try {
237      result = admin.balancer();
238    } catch (Exception e) {
239      LOG.warn("Got exception while doing balance ", e);
240    }
241    if (!result) {
242      LOG.error("Balancer didn't succeed");
243    }
244  }
245
246  public Configuration getConf() {
247    return cluster.getConf();
248  }
249
250  /**
251   * Apply a transform to all columns in a given table. If there are no columns in a table
252   * or if the context is stopping does nothing.
253   * @param tableName the table to modify
254   * @param transform the modification to perform. Callers will have the
255   *                  column name as a string and a column family builder available to them
256   */
257  protected void modifyAllTableColumns(TableName tableName, BiConsumer<String, ColumnFamilyDescriptorBuilder> transform) throws IOException {
258    HBaseTestingUtility util = this.context.getHBaseIntegrationTestingUtility();
259    Admin admin = util.getAdmin();
260
261    TableDescriptor tableDescriptor = admin.getDescriptor(tableName);
262    ColumnFamilyDescriptor[] columnDescriptors = tableDescriptor.getColumnFamilies();
263
264    if (columnDescriptors == null || columnDescriptors.length == 0) {
265      return;
266    }
267
268    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableDescriptor);
269    for (ColumnFamilyDescriptor descriptor : columnDescriptors) {
270      ColumnFamilyDescriptorBuilder cfd = ColumnFamilyDescriptorBuilder.newBuilder(descriptor);
271      transform.accept(descriptor.getNameAsString(), cfd);
272      builder.modifyColumnFamily(cfd.build());
273    }
274
275    // Don't try the modify if we're stopping
276    if (this.context.isStopping()) {
277      return;
278    }
279    admin.modifyTable(builder.build());
280  }
281
282  /**
283   * Apply a transform to all columns in a given table.
284   * If there are no columns in a table or if the context is stopping does nothing.
285   * @param tableName the table to modify
286   * @param transform the modification to perform on each column family descriptor builder
287   */
288  protected void modifyAllTableColumns(TableName tableName, Consumer<ColumnFamilyDescriptorBuilder> transform) throws IOException {
289    modifyAllTableColumns(tableName, (name, cfd) -> transform.accept(cfd));
290  }
291
292  /**
293   * Context for Action's
294   */
295  public static class ActionContext {
296    private IntegrationTestingUtility util;
297
298    public ActionContext(IntegrationTestingUtility util) {
299      this.util = util;
300    }
301
302    public IntegrationTestingUtility getHBaseIntegrationTestingUtility() {
303      return util;
304    }
305
306    public HBaseCluster getHBaseCluster() {
307      return util.getHBaseClusterInterface();
308    }
309
310    public boolean isStopping() {
311      return false;
312    }
313  }
314}