001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import java.io.IOException;
021import java.util.Arrays;
022import java.util.stream.Collectors;
023import org.apache.hadoop.conf.Configured;
024import org.apache.yetus.audience.InterfaceAudience;
025import org.slf4j.Logger;
026import org.slf4j.LoggerFactory;
027
028@InterfaceAudience.Private
029public class ZNodeClusterManager extends Configured implements ClusterManager {
030  private static final Logger LOG = LoggerFactory.getLogger(ZNodeClusterManager.class.getName());
031  private static final String SIGKILL = "SIGKILL";
032  private static final String SIGSTOP = "SIGSTOP";
033  private static final String SIGCONT = "SIGCONT";
034
035  public ZNodeClusterManager() {
036  }
037
038  private String getZKQuorumServersStringFromHbaseConfig() {
039    String port = Integer.toString(getConf().getInt(HConstants.ZOOKEEPER_CLIENT_PORT, 2181));
040    String[] serverHosts = getConf().getStrings(HConstants.ZOOKEEPER_QUORUM, "localhost");
041    for (int i = 0; i < serverHosts.length; i++) {
042      serverHosts[i] = serverHosts[i] + ":" + port;
043    }
044    return Arrays.asList(serverHosts).stream().collect(Collectors.joining(","));
045  }
046
047  private String createZNode(String hostname, String cmd) throws IOException {
048    LOG.info(
049      "Zookeeper Mode enabled sending command to zookeeper + " + cmd + "hostname:" + hostname);
050    ChaosZKClient chaosZKClient = new ChaosZKClient(getZKQuorumServersStringFromHbaseConfig());
051    return chaosZKClient.submitTask(new ChaosZKClient.TaskObject(cmd, hostname));
052  }
053
054  protected HBaseClusterManager.CommandProvider getCommandProvider(ServiceType service)
055    throws IOException {
056    switch (service) {
057      case HADOOP_DATANODE:
058      case HADOOP_NAMENODE:
059        return new HBaseClusterManager.HadoopShellCommandProvider(getConf());
060      case ZOOKEEPER_SERVER:
061        return new HBaseClusterManager.ZookeeperShellCommandProvider(getConf());
062      default:
063        return new HBaseClusterManager.HBaseShellCommandProvider(getConf());
064    }
065  }
066
067  public void signal(ServiceType service, String signal, String hostname) throws IOException {
068    createZNode(hostname,
069      CmdType.exec.toString() + getCommandProvider(service).signalCommand(service, signal));
070  }
071
072  private void createOpCommand(String hostname, ServiceType service,
073    HBaseClusterManager.CommandProvider.Operation op) throws IOException {
074    createZNode(hostname,
075      CmdType.exec.toString() + getCommandProvider(service).getCommand(service, op));
076  }
077
078  @Override
079  public void start(ServiceType service, String hostname, int port) throws IOException {
080    createOpCommand(hostname, service, HBaseClusterManager.CommandProvider.Operation.START);
081  }
082
083  @Override
084  public void stop(ServiceType service, String hostname, int port) throws IOException {
085    createOpCommand(hostname, service, HBaseClusterManager.CommandProvider.Operation.STOP);
086  }
087
088  @Override
089  public void restart(ServiceType service, String hostname, int port) throws IOException {
090    createOpCommand(hostname, service, HBaseClusterManager.CommandProvider.Operation.RESTART);
091  }
092
093  @Override
094  public void kill(ServiceType service, String hostname, int port) throws IOException {
095    signal(service, SIGKILL, hostname);
096  }
097
098  @Override
099  public void suspend(ServiceType service, String hostname, int port) throws IOException {
100    signal(service, SIGSTOP, hostname);
101  }
102
103  @Override
104  public void resume(ServiceType service, String hostname, int port) throws IOException {
105    signal(service, SIGCONT, hostname);
106  }
107
108  @Override
109  public boolean isRunning(ServiceType service, String hostname, int port) throws IOException {
110    return Boolean.parseBoolean(createZNode(hostname,
111      CmdType.bool.toString() + getCommandProvider(service).isRunningCommand(service)));
112  }
113
114  enum CmdType {
115    exec,
116    bool
117  }
118}