001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import java.io.File; 021import java.io.IOException; 022import java.util.Locale; 023import java.util.Map; 024import org.apache.commons.lang3.StringUtils; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.conf.Configured; 027import org.apache.hadoop.hbase.HBaseClusterManager.CommandProvider.Operation; 028import org.apache.hadoop.hbase.util.Pair; 029import org.apache.hadoop.hbase.util.ReflectionUtils; 030import org.apache.hadoop.hbase.util.RetryCounter; 031import org.apache.hadoop.hbase.util.RetryCounter.RetryConfig; 032import org.apache.hadoop.hbase.util.RetryCounterFactory; 033import org.apache.hadoop.util.Shell; 034import org.apache.yetus.audience.InterfaceAudience; 035import org.slf4j.Logger; 036import org.slf4j.LoggerFactory; 037 038/** 039 * A default cluster manager for HBase. Uses SSH, and hbase shell scripts to manage the cluster. 040 * Assumes Unix-like commands are available like 'ps', 'kill', etc. Also assumes the user running 041 * the test has enough "power" to start & stop servers on the remote machines (for example, the test 042 * user could be the same user as the user the daemon is running as) 043 */ 044@InterfaceAudience.Private 045public class HBaseClusterManager extends Configured implements ClusterManager { 046 047 protected enum Signal { 048 SIGKILL, 049 SIGSTOP, 050 SIGCONT, 051 } 052 053 protected static final Logger LOG = LoggerFactory.getLogger(HBaseClusterManager.class); 054 private String sshUserName; 055 private String sshOptions; 056 057 /** 058 * The command format that is used to execute the remote command. Arguments: 1 SSH options, 2 user 059 * name , 3 "@" if username is set, 4 host, 5 original command, 6 service user. 060 */ 061 private static final String DEFAULT_TUNNEL_CMD = 062 "timeout 30 /usr/bin/ssh %1$s %2$s%3$s%4$s \"sudo -u %6$s %5$s\""; 063 private String tunnelCmd; 064 065 /** 066 * The command format that is used to execute the remote command with sudo. Arguments: 1 SSH 067 * options, 2 user name , 3 "@" if username is set, 4 host, 5 original command, 6 timeout. 068 */ 069 private static final String DEFAULT_TUNNEL_SUDO_CMD = 070 "timeout %6$s /usr/bin/ssh %1$s %2$s%3$s%4$s \"sudo %5$s\""; 071 private String tunnelSudoCmd; 072 073 static final String RETRY_ATTEMPTS_KEY = "hbase.it.clustermanager.retry.attempts"; 074 static final int DEFAULT_RETRY_ATTEMPTS = 5; 075 076 static final String RETRY_SLEEP_INTERVAL_KEY = "hbase.it.clustermanager.retry.sleep.interval"; 077 static final int DEFAULT_RETRY_SLEEP_INTERVAL = 1000; 078 079 protected RetryCounterFactory retryCounterFactory; 080 081 @Override 082 public void setConf(Configuration conf) { 083 super.setConf(conf); 084 if (conf == null) { 085 // Configured gets passed null before real conf. Why? I don't know. 086 return; 087 } 088 sshUserName = conf.get("hbase.it.clustermanager.ssh.user", ""); 089 String extraSshOptions = conf.get("hbase.it.clustermanager.ssh.opts", ""); 090 sshOptions = System.getenv("HBASE_SSH_OPTS"); 091 if (!extraSshOptions.isEmpty()) { 092 sshOptions = StringUtils.join(new Object[] { sshOptions, extraSshOptions }, " "); 093 } 094 sshOptions = (sshOptions == null) ? "" : sshOptions; 095 sshUserName = (sshUserName == null) ? "" : sshUserName; 096 tunnelCmd = conf.get("hbase.it.clustermanager.ssh.cmd", DEFAULT_TUNNEL_CMD); 097 tunnelSudoCmd = conf.get("hbase.it.clustermanager.ssh.sudo.cmd", DEFAULT_TUNNEL_SUDO_CMD); 098 // Print out ssh special config if any. 099 if ( 100 (sshUserName != null && sshUserName.length() > 0) 101 || (sshOptions != null && sshOptions.length() > 0) 102 ) { 103 LOG.info("Running with SSH user [" + sshUserName + "] and options [" + sshOptions + "]"); 104 } 105 106 this.retryCounterFactory = new RetryCounterFactory( 107 new RetryConfig().setMaxAttempts(conf.getInt(RETRY_ATTEMPTS_KEY, DEFAULT_RETRY_ATTEMPTS)) 108 .setSleepInterval(conf.getLong(RETRY_SLEEP_INTERVAL_KEY, DEFAULT_RETRY_SLEEP_INTERVAL))); 109 } 110 111 protected String getServiceUser(ServiceType service) { 112 Configuration conf = getConf(); 113 switch (service) { 114 case HADOOP_DATANODE: 115 case HADOOP_NAMENODE: 116 return conf.get("hbase.it.clustermanager.hadoop.hdfs.user", "hdfs"); 117 case ZOOKEEPER_SERVER: 118 return conf.get("hbase.it.clustermanager.zookeeper.user", "zookeeper"); 119 default: 120 return conf.get("hbase.it.clustermanager.hbase.user", "hbase"); 121 } 122 } 123 124 /** 125 * Executes commands over SSH 126 */ 127 protected class RemoteShell extends Shell.ShellCommandExecutor { 128 private String hostname; 129 private String user; 130 131 public RemoteShell(String hostname, String[] execString, File dir, Map<String, String> env, 132 long timeout) { 133 super(execString, dir, env, timeout); 134 this.hostname = hostname; 135 } 136 137 public RemoteShell(String hostname, String[] execString, File dir, Map<String, String> env) { 138 super(execString, dir, env); 139 this.hostname = hostname; 140 } 141 142 public RemoteShell(String hostname, String[] execString, File dir) { 143 super(execString, dir); 144 this.hostname = hostname; 145 } 146 147 public RemoteShell(String hostname, String[] execString) { 148 super(execString); 149 this.hostname = hostname; 150 } 151 152 public RemoteShell(String hostname, String user, String[] execString) { 153 super(execString); 154 this.hostname = hostname; 155 this.user = user; 156 } 157 158 @Override 159 public String[] getExecString() { 160 String at = sshUserName.isEmpty() ? "" : "@"; 161 String remoteCmd = StringUtils.join(super.getExecString(), " "); 162 String cmd = String.format(tunnelCmd, sshOptions, sshUserName, at, hostname, remoteCmd, user); 163 LOG.info("Executing full command [" + cmd + "]"); 164 return new String[] { "/usr/bin/env", "bash", "-c", cmd }; 165 } 166 } 167 168 /** 169 * Executes commands over SSH 170 */ 171 protected class RemoteSudoShell extends Shell.ShellCommandExecutor { 172 private String hostname; 173 174 public RemoteSudoShell(String hostname, String[] execString, long timeout) { 175 this(hostname, execString, null, null, timeout); 176 } 177 178 public RemoteSudoShell(String hostname, String[] execString, File dir, Map<String, String> env, 179 long timeout) { 180 super(execString, dir, env, timeout); 181 this.hostname = hostname; 182 } 183 184 @Override 185 public String[] getExecString() { 186 String at = sshUserName.isEmpty() ? "" : "@"; 187 String remoteCmd = StringUtils.join(super.getExecString(), " "); 188 String cmd = String.format(tunnelSudoCmd, sshOptions, sshUserName, at, hostname, remoteCmd, 189 timeOutInterval / 1000f); 190 LOG.info("Executing full command [" + cmd + "]"); 191 return new String[] { "/usr/bin/env", "bash", "-c", cmd }; 192 } 193 } 194 195 /** 196 * Provides command strings for services to be executed by Shell. CommandProviders are pluggable, 197 * and different deployments(windows, bigtop, etc) can be managed by plugging-in custom 198 * CommandProvider's or ClusterManager's. 199 */ 200 static abstract class CommandProvider { 201 202 enum Operation { 203 START, 204 STOP, 205 RESTART 206 } 207 208 public abstract String getCommand(ServiceType service, Operation op); 209 210 public String isRunningCommand(ServiceType service) { 211 return findPidCommand(service); 212 } 213 214 protected String findPidCommand(ServiceType service) { 215 return String.format("ps ux | grep proc_%s | grep -v grep | tr -s ' ' | cut -d ' ' -f2", 216 service); 217 } 218 219 public String signalCommand(ServiceType service, String signal) { 220 return String.format("%s | xargs sudo kill -s %s", findPidCommand(service), signal); 221 } 222 } 223 224 /** 225 * CommandProvider to manage the service using bin/hbase-* scripts 226 */ 227 static class HBaseShellCommandProvider extends CommandProvider { 228 private final String hbaseHome; 229 private final String confDir; 230 231 HBaseShellCommandProvider(Configuration conf) { 232 hbaseHome = conf.get("hbase.it.clustermanager.hbase.home", System.getenv("HBASE_HOME")); 233 String tmp = 234 conf.get("hbase.it.clustermanager.hbase.conf.dir", System.getenv("HBASE_CONF_DIR")); 235 if (tmp != null) { 236 confDir = String.format("--config %s", tmp); 237 } else { 238 confDir = ""; 239 } 240 } 241 242 @Override 243 public String getCommand(ServiceType service, Operation op) { 244 return String.format("%s/bin/hbase-daemon.sh %s %s %s", hbaseHome, confDir, 245 op.toString().toLowerCase(Locale.ROOT), service); 246 } 247 } 248 249 /** 250 * CommandProvider to manage the service using sbin/hadoop-* scripts. 251 */ 252 static class HadoopShellCommandProvider extends CommandProvider { 253 private final String hadoopHome; 254 private final String confDir; 255 256 HadoopShellCommandProvider(Configuration conf) throws IOException { 257 hadoopHome = conf.get("hbase.it.clustermanager.hadoop.home", System.getenv("HADOOP_HOME")); 258 String tmp = 259 conf.get("hbase.it.clustermanager.hadoop.conf.dir", System.getenv("HADOOP_CONF_DIR")); 260 if (hadoopHome == null) { 261 throw new IOException("Hadoop home configuration parameter i.e. " 262 + "'hbase.it.clustermanager.hadoop.home' is not configured properly."); 263 } 264 if (tmp != null) { 265 confDir = String.format("--config %s", tmp); 266 } else { 267 confDir = ""; 268 } 269 } 270 271 @Override 272 public String getCommand(ServiceType service, Operation op) { 273 return String.format("%s/sbin/hadoop-daemon.sh %s %s %s", hadoopHome, confDir, 274 op.toString().toLowerCase(Locale.ROOT), service); 275 } 276 } 277 278 /** 279 * CommandProvider to manage the service using bin/zk* scripts. 280 */ 281 static class ZookeeperShellCommandProvider extends CommandProvider { 282 private final String zookeeperHome; 283 private final String confDir; 284 285 ZookeeperShellCommandProvider(Configuration conf) throws IOException { 286 zookeeperHome = 287 conf.get("hbase.it.clustermanager.zookeeper.home", System.getenv("ZOOBINDIR")); 288 String tmp = 289 conf.get("hbase.it.clustermanager.zookeeper.conf.dir", System.getenv("ZOOCFGDIR")); 290 if (zookeeperHome == null) { 291 throw new IOException("ZooKeeper home configuration parameter i.e. " 292 + "'hbase.it.clustermanager.zookeeper.home' is not configured properly."); 293 } 294 if (tmp != null) { 295 confDir = String.format("--config %s", tmp); 296 } else { 297 confDir = ""; 298 } 299 } 300 301 @Override 302 public String getCommand(ServiceType service, Operation op) { 303 return String.format("%s/bin/zkServer.sh %s", zookeeperHome, 304 op.toString().toLowerCase(Locale.ROOT)); 305 } 306 307 @Override 308 protected String findPidCommand(ServiceType service) { 309 return String.format("ps ux | grep %s | grep -v grep | tr -s ' ' | cut -d ' ' -f2", service); 310 } 311 } 312 313 public HBaseClusterManager() { 314 } 315 316 protected CommandProvider getCommandProvider(ServiceType service) throws IOException { 317 switch (service) { 318 case HADOOP_DATANODE: 319 case HADOOP_NAMENODE: 320 return new HadoopShellCommandProvider(getConf()); 321 case ZOOKEEPER_SERVER: 322 return new ZookeeperShellCommandProvider(getConf()); 323 default: 324 Class<? extends CommandProvider> provider = 325 getConf().getClass("hbase.it.clustermanager.hbase.command.provider", 326 HBaseShellCommandProvider.class, CommandProvider.class); 327 return ReflectionUtils.newInstance(provider, getConf()); 328 } 329 } 330 331 /** 332 * Execute the given command on the host using SSH 333 * @return pair of exit code and command output 334 * @throws IOException if something goes wrong. 335 */ 336 protected Pair<Integer, String> exec(String hostname, ServiceType service, String... cmd) 337 throws IOException { 338 LOG.info("Executing remote command: {}, hostname:{}", StringUtils.join(cmd, " "), hostname); 339 340 RemoteShell shell = new RemoteShell(hostname, getServiceUser(service), cmd); 341 try { 342 shell.execute(); 343 } catch (Shell.ExitCodeException ex) { 344 // capture the stdout of the process as well. 345 String output = shell.getOutput(); 346 // add output for the ExitCodeException. 347 throw new Shell.ExitCodeException(ex.getExitCode(), 348 "stderr: " + ex.getMessage() + ", stdout: " + output); 349 } 350 351 LOG.info("Executed remote command, exit code:{} , output:{}", shell.getExitCode(), 352 shell.getOutput()); 353 354 return new Pair<>(shell.getExitCode(), shell.getOutput()); 355 } 356 357 private Pair<Integer, String> execWithRetries(String hostname, ServiceType service, String... cmd) 358 throws IOException { 359 RetryCounter retryCounter = retryCounterFactory.create(); 360 while (true) { 361 try { 362 return exec(hostname, service, cmd); 363 } catch (IOException e) { 364 retryOrThrow(retryCounter, e, hostname, cmd); 365 } 366 try { 367 retryCounter.sleepUntilNextRetry(); 368 } catch (InterruptedException ex) { 369 // ignore 370 LOG.warn("Sleep Interrupted:", ex); 371 } 372 } 373 } 374 375 /** 376 * Execute the given command on the host using SSH 377 * @return pair of exit code and command output 378 * @throws IOException if something goes wrong. 379 */ 380 public Pair<Integer, String> execSudo(String hostname, long timeout, String... cmd) 381 throws IOException { 382 LOG.info("Executing remote command: {} , hostname:{}", StringUtils.join(cmd, " "), hostname); 383 384 RemoteSudoShell shell = new RemoteSudoShell(hostname, cmd, timeout); 385 try { 386 shell.execute(); 387 } catch (Shell.ExitCodeException ex) { 388 // capture the stdout of the process as well. 389 String output = shell.getOutput(); 390 // add output for the ExitCodeException. 391 throw new Shell.ExitCodeException(ex.getExitCode(), 392 "stderr: " + ex.getMessage() + ", stdout: " + output); 393 } 394 395 LOG.info("Executed remote command, exit code:{} , output:{}", shell.getExitCode(), 396 shell.getOutput()); 397 398 return new Pair<>(shell.getExitCode(), shell.getOutput()); 399 } 400 401 public Pair<Integer, String> execSudoWithRetries(String hostname, long timeout, String... cmd) 402 throws IOException { 403 RetryCounter retryCounter = retryCounterFactory.create(); 404 while (true) { 405 try { 406 return execSudo(hostname, timeout, cmd); 407 } catch (IOException e) { 408 retryOrThrow(retryCounter, e, hostname, cmd); 409 } 410 try { 411 retryCounter.sleepUntilNextRetry(); 412 } catch (InterruptedException ex) { 413 // ignore 414 LOG.warn("Sleep Interrupted:", ex); 415 } 416 } 417 } 418 419 private <E extends Exception> void retryOrThrow(RetryCounter retryCounter, E ex, String hostname, 420 String[] cmd) throws E { 421 if (retryCounter.shouldRetry()) { 422 LOG.warn("Remote command: " + StringUtils.join(cmd, " ") + " , hostname:" + hostname 423 + " failed at attempt " + retryCounter.getAttemptTimes() + ". Retrying until maxAttempts: " 424 + retryCounter.getMaxAttempts() + ". Exception: " + ex.getMessage()); 425 return; 426 } 427 throw ex; 428 } 429 430 private void exec(String hostname, ServiceType service, Operation op) throws IOException { 431 execWithRetries(hostname, service, getCommandProvider(service).getCommand(service, op)); 432 } 433 434 @Override 435 public void start(ServiceType service, String hostname, int port) throws IOException { 436 exec(hostname, service, Operation.START); 437 } 438 439 @Override 440 public void stop(ServiceType service, String hostname, int port) throws IOException { 441 exec(hostname, service, Operation.STOP); 442 } 443 444 @Override 445 public void restart(ServiceType service, String hostname, int port) throws IOException { 446 exec(hostname, service, Operation.RESTART); 447 } 448 449 public void signal(ServiceType service, Signal signal, String hostname) throws IOException { 450 execWithRetries(hostname, service, 451 getCommandProvider(service).signalCommand(service, signal.toString())); 452 } 453 454 @Override 455 public boolean isRunning(ServiceType service, String hostname, int port) throws IOException { 456 String ret = 457 execWithRetries(hostname, service, getCommandProvider(service).isRunningCommand(service)) 458 .getSecond(); 459 return ret.length() > 0; 460 } 461 462 @Override 463 public void kill(ServiceType service, String hostname, int port) throws IOException { 464 signal(service, Signal.SIGKILL, hostname); 465 } 466 467 @Override 468 public void suspend(ServiceType service, String hostname, int port) throws IOException { 469 signal(service, Signal.SIGSTOP, hostname); 470 } 471 472 @Override 473 public void resume(ServiceType service, String hostname, int port) throws IOException { 474 signal(service, Signal.SIGCONT, hostname); 475 } 476}