001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.chaos.policies;
019
020import java.util.concurrent.ExecutionException;
021import java.util.concurrent.ExecutorService;
022import java.util.concurrent.Executors;
023import java.util.concurrent.Future;
024import org.apache.hadoop.hbase.chaos.actions.Action;
025import org.apache.hadoop.hbase.chaos.monkies.PolicyBasedChaosMonkey;
026import org.apache.hadoop.hbase.util.Threads;
027import org.apache.hadoop.util.StringUtils;
028
029import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
030
031/**
032 * Chaos Monkey policy that will run two different actions at the same time. A random action from
033 * each array of actions will be chosen and then run in parallel.
034 */
035public class TwoConcurrentActionPolicy extends PeriodicPolicy {
036  private final Action[] actionsOne;
037  private final Action[] actionsTwo;
038  private final ExecutorService executor;
039
040  public TwoConcurrentActionPolicy(long sleepTime, Action[] actionsOne, Action[] actionsTwo) {
041    super(sleepTime);
042    this.actionsOne = actionsOne;
043    this.actionsTwo = actionsTwo;
044    executor = Executors.newFixedThreadPool(2,
045      new ThreadFactoryBuilder().setNameFormat("TwoConcurrentAction-pool-%d").setDaemon(true)
046        .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
047  }
048
049  @Override
050  protected void runOneIteration() {
051    Action actionOne = PolicyBasedChaosMonkey.selectRandomItem(actionsOne);
052    Action actionTwo = PolicyBasedChaosMonkey.selectRandomItem(actionsTwo);
053
054    Future fOne = executor.submit(new ActionRunner(actionOne));
055    Future fTwo = executor.submit(new ActionRunner(actionTwo));
056
057    try {
058      fOne.get();
059      fTwo.get();
060    } catch (InterruptedException e) {
061      LOG.warn("Exception occurred during performing action: " + StringUtils.stringifyException(e));
062    } catch (ExecutionException ex) {
063      LOG
064        .warn("Exception occurred during performing action: " + StringUtils.stringifyException(ex));
065    }
066  }
067
068  @Override
069  public void init(PolicyContext context) throws Exception {
070    super.init(context);
071    for (Action a : actionsOne) {
072      a.init(context);
073    }
074    for (Action a : actionsTwo) {
075      a.init(context);
076    }
077  }
078
079  private static class ActionRunner implements Runnable {
080
081    private final Action action;
082
083    public ActionRunner(Action action) {
084
085      this.action = action;
086    }
087
088    @Override
089    public void run() {
090      try {
091        action.perform();
092      } catch (Exception ex) {
093        LOG.warn(
094          "Exception occurred during performing action: " + StringUtils.stringifyException(ex));
095      }
096    }
097  }
098}