001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase; 020 021import java.io.File; 022import java.io.IOException; 023import java.util.Locale; 024import java.util.Map; 025 026import org.apache.commons.lang3.StringUtils; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.conf.Configured; 029import org.apache.hadoop.hbase.HBaseClusterManager.CommandProvider.Operation; 030import org.apache.hadoop.hbase.util.Pair; 031import org.apache.hadoop.hbase.util.RetryCounter; 032import org.apache.hadoop.hbase.util.RetryCounter.RetryConfig; 033import org.apache.hadoop.hbase.util.RetryCounterFactory; 034import org.apache.hadoop.util.Shell; 035import org.apache.yetus.audience.InterfaceAudience; 036import org.slf4j.Logger; 037import org.slf4j.LoggerFactory; 038 039/** 040 * A default cluster manager for HBase. Uses SSH, and hbase shell scripts 041 * to manage the cluster. Assumes Unix-like commands are available like 'ps', 042 * 'kill', etc. Also assumes the user running the test has enough "power" to start & stop 043 * servers on the remote machines (for example, the test user could be the same user as the 044 * user the daemon is running as) 045 */ 046@InterfaceAudience.Private 047public class HBaseClusterManager extends Configured implements ClusterManager { 048 private static final String SIGKILL = "SIGKILL"; 049 private static final String SIGSTOP = "SIGSTOP"; 050 private static final String SIGCONT = "SIGCONT"; 051 052 protected static final Logger LOG = LoggerFactory.getLogger(HBaseClusterManager.class); 053 private String sshUserName; 054 private String sshOptions; 055 056 /** 057 * The command format that is used to execute the remote command. Arguments: 058 * 1 SSH options, 2 user name , 3 "@" if username is set, 4 host, 059 * 5 original command, 6 service user. 060 */ 061 private static final String DEFAULT_TUNNEL_CMD = 062 "timeout 30 /usr/bin/ssh %1$s %2$s%3$s%4$s \"sudo -u %6$s %5$s\""; 063 private String tunnelCmd; 064 065 private static final String RETRY_ATTEMPTS_KEY = "hbase.it.clustermanager.retry.attempts"; 066 private static final int DEFAULT_RETRY_ATTEMPTS = 5; 067 068 private static final String RETRY_SLEEP_INTERVAL_KEY = "hbase.it.clustermanager.retry.sleep.interval"; 069 private static final int DEFAULT_RETRY_SLEEP_INTERVAL = 1000; 070 071 protected RetryCounterFactory retryCounterFactory; 072 073 @Override 074 public void setConf(Configuration conf) { 075 super.setConf(conf); 076 if (conf == null) { 077 // Configured gets passed null before real conf. Why? I don't know. 078 return; 079 } 080 sshUserName = conf.get("hbase.it.clustermanager.ssh.user", ""); 081 String extraSshOptions = conf.get("hbase.it.clustermanager.ssh.opts", ""); 082 sshOptions = System.getenv("HBASE_SSH_OPTS"); 083 if (!extraSshOptions.isEmpty()) { 084 sshOptions = StringUtils.join(new Object[] { sshOptions, extraSshOptions }, " "); 085 } 086 sshOptions = (sshOptions == null) ? "" : sshOptions; 087 sshUserName = (sshUserName == null) ? "" : sshUserName; 088 tunnelCmd = conf.get("hbase.it.clustermanager.ssh.cmd", DEFAULT_TUNNEL_CMD); 089 // Print out ssh special config if any. 090 if ((sshUserName != null && sshUserName.length() > 0) || 091 (sshOptions != null && sshOptions.length() > 0)) { 092 LOG.info("Running with SSH user [" + sshUserName + "] and options [" + sshOptions + "]"); 093 } 094 095 this.retryCounterFactory = new RetryCounterFactory(new RetryConfig() 096 .setMaxAttempts(conf.getInt(RETRY_ATTEMPTS_KEY, DEFAULT_RETRY_ATTEMPTS)) 097 .setSleepInterval(conf.getLong(RETRY_SLEEP_INTERVAL_KEY, DEFAULT_RETRY_SLEEP_INTERVAL))); 098 } 099 100 private String getServiceUser(ServiceType service) { 101 Configuration conf = getConf(); 102 switch (service) { 103 case HADOOP_DATANODE: 104 return conf.get("hbase.it.clustermanager.hadoop.hdfs.user", "hdfs"); 105 case ZOOKEEPER_SERVER: 106 return conf.get("hbase.it.clustermanager.zookeeper.user", "zookeeper"); 107 default: 108 return conf.get("hbase.it.clustermanager.hbase.user", "hbase"); 109 } 110 } 111 112 /** 113 * Executes commands over SSH 114 */ 115 protected class RemoteShell extends Shell.ShellCommandExecutor { 116 private String hostname; 117 private String user; 118 119 public RemoteShell(String hostname, String[] execString, File dir, Map<String, String> env, 120 long timeout) { 121 super(execString, dir, env, timeout); 122 this.hostname = hostname; 123 } 124 125 public RemoteShell(String hostname, String[] execString, File dir, Map<String, String> env) { 126 super(execString, dir, env); 127 this.hostname = hostname; 128 } 129 130 public RemoteShell(String hostname, String[] execString, File dir) { 131 super(execString, dir); 132 this.hostname = hostname; 133 } 134 135 public RemoteShell(String hostname, String[] execString) { 136 super(execString); 137 this.hostname = hostname; 138 } 139 140 public RemoteShell(String hostname, String user, String[] execString) { 141 super(execString); 142 this.hostname = hostname; 143 this.user = user; 144 } 145 146 @Override 147 public String[] getExecString() { 148 String at = sshUserName.isEmpty() ? "" : "@"; 149 String remoteCmd = StringUtils.join(super.getExecString(), " "); 150 String cmd = String.format(tunnelCmd, sshOptions, sshUserName, at, hostname, remoteCmd, user); 151 LOG.info("Executing full command [" + cmd + "]"); 152 return new String[] { "/usr/bin/env", "bash", "-c", cmd }; 153 } 154 155 @Override 156 public void execute() throws IOException { 157 super.execute(); 158 } 159 } 160 161 /** 162 * Provides command strings for services to be executed by Shell. CommandProviders are 163 * pluggable, and different deployments(windows, bigtop, etc) can be managed by 164 * plugging-in custom CommandProvider's or ClusterManager's. 165 */ 166 static abstract class CommandProvider { 167 168 enum Operation { 169 START, STOP, RESTART 170 } 171 172 public abstract String getCommand(ServiceType service, Operation op); 173 174 public String isRunningCommand(ServiceType service) { 175 return findPidCommand(service); 176 } 177 178 protected String findPidCommand(ServiceType service) { 179 return String.format("ps ux | grep proc_%s | grep -v grep | tr -s ' ' | cut -d ' ' -f2", 180 service); 181 } 182 183 public String signalCommand(ServiceType service, String signal) { 184 return String.format("%s | xargs kill -s %s", findPidCommand(service), signal); 185 } 186 } 187 188 /** 189 * CommandProvider to manage the service using bin/hbase-* scripts 190 */ 191 static class HBaseShellCommandProvider extends CommandProvider { 192 private final String hbaseHome; 193 private final String confDir; 194 195 HBaseShellCommandProvider(Configuration conf) { 196 hbaseHome = conf.get("hbase.it.clustermanager.hbase.home", 197 System.getenv("HBASE_HOME")); 198 String tmp = conf.get("hbase.it.clustermanager.hbase.conf.dir", 199 System.getenv("HBASE_CONF_DIR")); 200 if (tmp != null) { 201 confDir = String.format("--config %s", tmp); 202 } else { 203 confDir = ""; 204 } 205 } 206 207 @Override 208 public String getCommand(ServiceType service, Operation op) { 209 return String.format("%s/bin/hbase-daemon.sh %s %s %s", hbaseHome, confDir, 210 op.toString().toLowerCase(Locale.ROOT), service); 211 } 212 } 213 214 /** 215 * CommandProvider to manage the service using sbin/hadoop-* scripts. 216 */ 217 static class HadoopShellCommandProvider extends CommandProvider { 218 private final String hadoopHome; 219 private final String confDir; 220 221 HadoopShellCommandProvider(Configuration conf) throws IOException { 222 hadoopHome = conf.get("hbase.it.clustermanager.hadoop.home", 223 System.getenv("HADOOP_HOME")); 224 String tmp = conf.get("hbase.it.clustermanager.hadoop.conf.dir", 225 System.getenv("HADOOP_CONF_DIR")); 226 if (hadoopHome == null) { 227 throw new IOException("Hadoop home configuration parameter i.e. " + 228 "'hbase.it.clustermanager.hadoop.home' is not configured properly."); 229 } 230 if (tmp != null) { 231 confDir = String.format("--config %s", tmp); 232 } else { 233 confDir = ""; 234 } 235 } 236 237 @Override 238 public String getCommand(ServiceType service, Operation op) { 239 return String.format("%s/sbin/hadoop-daemon.sh %s %s %s", hadoopHome, confDir, 240 op.toString().toLowerCase(Locale.ROOT), service); 241 } 242 } 243 244 /** 245 * CommandProvider to manage the service using bin/zk* scripts. 246 */ 247 static class ZookeeperShellCommandProvider extends CommandProvider { 248 private final String zookeeperHome; 249 private final String confDir; 250 251 ZookeeperShellCommandProvider(Configuration conf) throws IOException { 252 zookeeperHome = conf.get("hbase.it.clustermanager.zookeeper.home", 253 System.getenv("ZOOBINDIR")); 254 String tmp = conf.get("hbase.it.clustermanager.zookeeper.conf.dir", 255 System.getenv("ZOOCFGDIR")); 256 if (zookeeperHome == null) { 257 throw new IOException("ZooKeeper home configuration parameter i.e. " + 258 "'hbase.it.clustermanager.zookeeper.home' is not configured properly."); 259 } 260 if (tmp != null) { 261 confDir = String.format("--config %s", tmp); 262 } else { 263 confDir = ""; 264 } 265 } 266 267 @Override 268 public String getCommand(ServiceType service, Operation op) { 269 return String.format("%s/bin/zkServer.sh %s", zookeeperHome, op.toString().toLowerCase(Locale.ROOT)); 270 } 271 272 @Override 273 protected String findPidCommand(ServiceType service) { 274 return String.format("ps ux | grep %s | grep -v grep | tr -s ' ' | cut -d ' ' -f2", 275 service); 276 } 277 } 278 279 public HBaseClusterManager() { 280 } 281 282 protected CommandProvider getCommandProvider(ServiceType service) throws IOException { 283 switch (service) { 284 case HADOOP_DATANODE: 285 return new HadoopShellCommandProvider(getConf()); 286 case ZOOKEEPER_SERVER: 287 return new ZookeeperShellCommandProvider(getConf()); 288 default: 289 return new HBaseShellCommandProvider(getConf()); 290 } 291 } 292 293 /** 294 * Execute the given command on the host using SSH 295 * @return pair of exit code and command output 296 * @throws IOException if something goes wrong. 297 */ 298 private Pair<Integer, String> exec(String hostname, ServiceType service, String... cmd) 299 throws IOException { 300 LOG.info("Executing remote command: " + StringUtils.join(cmd, " ") + " , hostname:" + hostname); 301 302 RemoteShell shell = new RemoteShell(hostname, getServiceUser(service), cmd); 303 try { 304 shell.execute(); 305 } catch (Shell.ExitCodeException ex) { 306 // capture the stdout of the process as well. 307 String output = shell.getOutput(); 308 // add output for the ExitCodeException. 309 throw new Shell.ExitCodeException(ex.getExitCode(), "stderr: " + ex.getMessage() 310 + ", stdout: " + output); 311 } 312 313 LOG.info("Executed remote command, exit code:" + shell.getExitCode() 314 + " , output:" + shell.getOutput()); 315 316 return new Pair<>(shell.getExitCode(), shell.getOutput()); 317 } 318 319 private Pair<Integer, String> execWithRetries(String hostname, ServiceType service, String... cmd) 320 throws IOException { 321 RetryCounter retryCounter = retryCounterFactory.create(); 322 while (true) { 323 try { 324 return exec(hostname, service, cmd); 325 } catch (IOException e) { 326 retryOrThrow(retryCounter, e, hostname, cmd); 327 } 328 try { 329 retryCounter.sleepUntilNextRetry(); 330 } catch (InterruptedException ex) { 331 // ignore 332 LOG.warn("Sleep Interrupted:" + ex); 333 } 334 } 335 } 336 337 private <E extends Exception> void retryOrThrow(RetryCounter retryCounter, E ex, 338 String hostname, String[] cmd) throws E { 339 if (retryCounter.shouldRetry()) { 340 LOG.warn("Remote command: " + StringUtils.join(cmd, " ") + " , hostname:" + hostname 341 + " failed at attempt " + retryCounter.getAttemptTimes() + ". Retrying until maxAttempts: " 342 + retryCounter.getMaxAttempts() + ". Exception: " + ex.getMessage()); 343 return; 344 } 345 throw ex; 346 } 347 348 private void exec(String hostname, ServiceType service, Operation op) throws IOException { 349 execWithRetries(hostname, service, getCommandProvider(service).getCommand(service, op)); 350 } 351 352 @Override 353 public void start(ServiceType service, String hostname, int port) throws IOException { 354 exec(hostname, service, Operation.START); 355 } 356 357 @Override 358 public void stop(ServiceType service, String hostname, int port) throws IOException { 359 exec(hostname, service, Operation.STOP); 360 } 361 362 @Override 363 public void restart(ServiceType service, String hostname, int port) throws IOException { 364 exec(hostname, service, Operation.RESTART); 365 } 366 367 public void signal(ServiceType service, String signal, String hostname) throws IOException { 368 execWithRetries(hostname, service, getCommandProvider(service).signalCommand(service, signal)); 369 } 370 371 @Override 372 public boolean isRunning(ServiceType service, String hostname, int port) throws IOException { 373 String ret = execWithRetries(hostname, service, 374 getCommandProvider(service).isRunningCommand(service)).getSecond(); 375 return ret.length() > 0; 376 } 377 378 @Override 379 public void kill(ServiceType service, String hostname, int port) throws IOException { 380 signal(service, SIGKILL, hostname); 381 } 382 383 @Override 384 public void suspend(ServiceType service, String hostname, int port) throws IOException { 385 signal(service, SIGSTOP, hostname); 386 } 387 388 @Override 389 public void resume(ServiceType service, String hostname, int port) throws IOException { 390 signal(service, SIGCONT, hostname); 391 } 392}