001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase;
020
021import java.io.IOException;
022import java.util.Arrays;
023import java.util.stream.Collectors;
024
025import org.apache.hadoop.conf.Configured;
026import org.apache.yetus.audience.InterfaceAudience;
027import org.slf4j.Logger;
028import org.slf4j.LoggerFactory;
029
030@InterfaceAudience.Private
031public class ZNodeClusterManager extends Configured implements ClusterManager {
032  private static final Logger LOG = LoggerFactory.getLogger(ZNodeClusterManager.class.getName());
033  private static final String SIGKILL = "SIGKILL";
034  private static final String SIGSTOP = "SIGSTOP";
035  private static final String SIGCONT = "SIGCONT";
036  public ZNodeClusterManager() {
037  }
038
039  private String getZKQuorumServersStringFromHbaseConfig() {
040    String port =
041      Integer.toString(getConf().getInt(HConstants.ZOOKEEPER_CLIENT_PORT, 2181));
042    String[] serverHosts = getConf().getStrings(HConstants.ZOOKEEPER_QUORUM, "localhost");
043    for (int i = 0; i < serverHosts.length; i++) {
044      serverHosts[i] = serverHosts[i] + ":" + port;
045    }
046    return Arrays.asList(serverHosts).stream().collect(Collectors.joining(","));
047  }
048
049  private String createZNode(String hostname, String cmd) throws IOException{
050    LOG.info("Zookeeper Mode enabled sending command to zookeeper + " +
051      cmd + "hostname:" + hostname);
052    ChaosZKClient chaosZKClient = new ChaosZKClient(getZKQuorumServersStringFromHbaseConfig());
053    return chaosZKClient.submitTask(new ChaosZKClient.TaskObject(cmd, hostname));
054  }
055
056  protected HBaseClusterManager.CommandProvider getCommandProvider(ServiceType service)
057    throws IOException {
058    switch (service) {
059      case HADOOP_DATANODE:
060      case HADOOP_NAMENODE:
061        return new HBaseClusterManager.HadoopShellCommandProvider(getConf());
062      case ZOOKEEPER_SERVER:
063        return new HBaseClusterManager.ZookeeperShellCommandProvider(getConf());
064      default:
065        return new HBaseClusterManager.HBaseShellCommandProvider(getConf());
066    }
067  }
068
069  public void signal(ServiceType service, String signal, String hostname) throws IOException {
070    createZNode(hostname, CmdType.exec.toString() +
071      getCommandProvider(service).signalCommand(service, signal));
072  }
073
074  private void createOpCommand(String hostname, ServiceType service,
075    HBaseClusterManager.CommandProvider.Operation op) throws IOException{
076    createZNode(hostname, CmdType.exec.toString() +
077      getCommandProvider(service).getCommand(service, op));
078  }
079
080  @Override
081  public void start(ServiceType service, String hostname, int port) throws IOException {
082    createOpCommand(hostname, service, HBaseClusterManager.CommandProvider.Operation.START);
083  }
084
085  @Override
086  public void stop(ServiceType service, String hostname, int port) throws IOException {
087    createOpCommand(hostname, service, HBaseClusterManager.CommandProvider.Operation.STOP);
088  }
089
090  @Override
091  public void restart(ServiceType service, String hostname, int port) throws IOException {
092    createOpCommand(hostname, service, HBaseClusterManager.CommandProvider.Operation.RESTART);
093  }
094
095  @Override
096  public void kill(ServiceType service, String hostname, int port) throws IOException {
097    signal(service, SIGKILL, hostname);
098  }
099
100  @Override
101  public void suspend(ServiceType service, String hostname, int port) throws IOException {
102    signal(service, SIGSTOP, hostname);
103  }
104
105  @Override
106  public void resume(ServiceType service, String hostname, int port) throws IOException {
107    signal(service, SIGCONT, hostname);
108  }
109
110  @Override
111  public boolean isRunning(ServiceType service, String hostname, int port) throws IOException {
112    return Boolean.parseBoolean(createZNode(hostname, CmdType.bool.toString() +
113      getCommandProvider(service).isRunningCommand(service)));
114  }
115
116  enum CmdType {
117    exec,
118    bool
119  }
120}