001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import java.io.IOException; 021import java.util.Arrays; 022import java.util.stream.Collectors; 023import org.apache.hadoop.conf.Configured; 024import org.apache.yetus.audience.InterfaceAudience; 025import org.slf4j.Logger; 026import org.slf4j.LoggerFactory; 027 028@InterfaceAudience.Private 029public class ZNodeClusterManager extends Configured implements ClusterManager { 030 private static final Logger LOG = LoggerFactory.getLogger(ZNodeClusterManager.class.getName()); 031 private static final String SIGKILL = "SIGKILL"; 032 private static final String SIGSTOP = "SIGSTOP"; 033 private static final String SIGCONT = "SIGCONT"; 034 035 public ZNodeClusterManager() { 036 } 037 038 private String getZKQuorumServersStringFromHbaseConfig() { 039 String port = Integer.toString(getConf().getInt(HConstants.ZOOKEEPER_CLIENT_PORT, 2181)); 040 String[] serverHosts = getConf().getStrings(HConstants.ZOOKEEPER_QUORUM, "localhost"); 041 for (int i = 0; i < serverHosts.length; i++) { 042 serverHosts[i] = serverHosts[i] + ":" + port; 043 } 044 return Arrays.asList(serverHosts).stream().collect(Collectors.joining(",")); 045 } 046 047 private String createZNode(String hostname, String cmd) throws IOException { 048 LOG.info( 049 "Zookeeper Mode enabled sending command to zookeeper + " + cmd + "hostname:" + hostname); 050 ChaosZKClient chaosZKClient = new ChaosZKClient(getZKQuorumServersStringFromHbaseConfig()); 051 return chaosZKClient.submitTask(new ChaosZKClient.TaskObject(cmd, hostname)); 052 } 053 054 protected HBaseClusterManager.CommandProvider getCommandProvider(ServiceType service) 055 throws IOException { 056 switch (service) { 057 case HADOOP_DATANODE: 058 case HADOOP_NAMENODE: 059 return new HBaseClusterManager.HadoopShellCommandProvider(getConf()); 060 case ZOOKEEPER_SERVER: 061 return new HBaseClusterManager.ZookeeperShellCommandProvider(getConf()); 062 default: 063 return new HBaseClusterManager.HBaseShellCommandProvider(getConf()); 064 } 065 } 066 067 public void signal(ServiceType service, String signal, String hostname) throws IOException { 068 createZNode(hostname, 069 CmdType.exec.toString() + getCommandProvider(service).signalCommand(service, signal)); 070 } 071 072 private void createOpCommand(String hostname, ServiceType service, 073 HBaseClusterManager.CommandProvider.Operation op) throws IOException { 074 createZNode(hostname, 075 CmdType.exec.toString() + getCommandProvider(service).getCommand(service, op)); 076 } 077 078 @Override 079 public void start(ServiceType service, String hostname, int port) throws IOException { 080 createOpCommand(hostname, service, HBaseClusterManager.CommandProvider.Operation.START); 081 } 082 083 @Override 084 public void stop(ServiceType service, String hostname, int port) throws IOException { 085 createOpCommand(hostname, service, HBaseClusterManager.CommandProvider.Operation.STOP); 086 } 087 088 @Override 089 public void restart(ServiceType service, String hostname, int port) throws IOException { 090 createOpCommand(hostname, service, HBaseClusterManager.CommandProvider.Operation.RESTART); 091 } 092 093 @Override 094 public void kill(ServiceType service, String hostname, int port) throws IOException { 095 signal(service, SIGKILL, hostname); 096 } 097 098 @Override 099 public void suspend(ServiceType service, String hostname, int port) throws IOException { 100 signal(service, SIGSTOP, hostname); 101 } 102 103 @Override 104 public void resume(ServiceType service, String hostname, int port) throws IOException { 105 signal(service, SIGCONT, hostname); 106 } 107 108 @Override 109 public boolean isRunning(ServiceType service, String hostname, int port) throws IOException { 110 return Boolean.parseBoolean(createZNode(hostname, 111 CmdType.bool.toString() + getCommandProvider(service).isRunningCommand(service))); 112 } 113 114 enum CmdType { 115 exec, 116 bool 117 } 118}