001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import java.io.File; 021import java.io.IOException; 022import java.util.Locale; 023import java.util.Map; 024import org.apache.commons.lang3.StringUtils; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.conf.Configured; 027import org.apache.hadoop.hbase.HBaseClusterManager.CommandProvider.Operation; 028import org.apache.hadoop.hbase.util.Pair; 029import org.apache.hadoop.hbase.util.ReflectionUtils; 030import org.apache.hadoop.hbase.util.RetryCounter; 031import org.apache.hadoop.hbase.util.RetryCounter.RetryConfig; 032import org.apache.hadoop.hbase.util.RetryCounterFactory; 033import org.apache.hadoop.util.Shell; 034import org.apache.yetus.audience.InterfaceAudience; 035import org.slf4j.Logger; 036import org.slf4j.LoggerFactory; 037 038/** 039 * A default cluster manager for HBase. Uses SSH, and hbase shell scripts to manage the cluster. 040 * Assumes Unix-like commands are available like 'ps', 'kill', etc. Also assumes the user running 041 * the test has enough "power" to start & stop servers on the remote machines (for example, the test 042 * user could be the same user as the user the daemon is running as) 043 */ 044@InterfaceAudience.Private 045public class HBaseClusterManager extends Configured implements ClusterManager { 046 047 protected enum Signal { 048 SIGKILL, 049 SIGSTOP, 050 SIGCONT, 051 } 052 053 protected static final Logger LOG = LoggerFactory.getLogger(HBaseClusterManager.class); 054 private String sshUserName; 055 private String sshOptions; 056 057 /** 058 * The command format that is used to execute the remote command. Arguments: 1 SSH options, 2 user 059 * name , 3 "@" if username is set, 4 host, 5 original command, 6 service user. 060 */ 061 private static final String DEFAULT_TUNNEL_CMD = 062 "timeout 30 /usr/bin/ssh %1$s %2$s%3$s%4$s \"sudo -u %6$s %5$s\""; 063 private String tunnelCmd; 064 065 /** 066 * The command format that is used to execute the remote command with sudo. Arguments: 1 SSH 067 * options, 2 user name , 3 "@" if username is set, 4 host, 5 original command, 6 timeout. 068 */ 069 private static final String DEFAULT_TUNNEL_SUDO_CMD = 070 "timeout %6$s /usr/bin/ssh %1$s %2$s%3$s%4$s \"sudo %5$s\""; 071 private String tunnelSudoCmd; 072 073 static final String RETRY_ATTEMPTS_KEY = "hbase.it.clustermanager.retry.attempts"; 074 static final int DEFAULT_RETRY_ATTEMPTS = 5; 075 076 static final String RETRY_SLEEP_INTERVAL_KEY = "hbase.it.clustermanager.retry.sleep.interval"; 077 static final int DEFAULT_RETRY_SLEEP_INTERVAL = 1000; 078 079 protected RetryCounterFactory retryCounterFactory; 080 081 @Override 082 public void setConf(Configuration conf) { 083 super.setConf(conf); 084 if (conf == null) { 085 // Configured gets passed null before real conf. Why? I don't know. 086 return; 087 } 088 sshUserName = conf.get("hbase.it.clustermanager.ssh.user", ""); 089 String extraSshOptions = conf.get("hbase.it.clustermanager.ssh.opts", ""); 090 sshOptions = System.getenv("HBASE_SSH_OPTS"); 091 if (!extraSshOptions.isEmpty()) { 092 sshOptions = StringUtils.join(new Object[] { sshOptions, extraSshOptions }, " "); 093 } 094 sshOptions = (sshOptions == null) ? "" : sshOptions; 095 sshUserName = (sshUserName == null) ? "" : sshUserName; 096 tunnelCmd = conf.get("hbase.it.clustermanager.ssh.cmd", DEFAULT_TUNNEL_CMD); 097 tunnelSudoCmd = conf.get("hbase.it.clustermanager.ssh.sudo.cmd", DEFAULT_TUNNEL_SUDO_CMD); 098 // Print out ssh special config if any. 099 if ( 100 (sshUserName != null && sshUserName.length() > 0) 101 || (sshOptions != null && sshOptions.length() > 0) 102 ) { 103 LOG.info("Running with SSH user [" + sshUserName + "] and options [" + sshOptions + "]"); 104 } 105 106 this.retryCounterFactory = new RetryCounterFactory( 107 new RetryConfig().setMaxAttempts(conf.getInt(RETRY_ATTEMPTS_KEY, DEFAULT_RETRY_ATTEMPTS)) 108 .setSleepInterval(conf.getLong(RETRY_SLEEP_INTERVAL_KEY, DEFAULT_RETRY_SLEEP_INTERVAL))); 109 } 110 111 protected String getServiceUser(ServiceType service) { 112 Configuration conf = getConf(); 113 switch (service) { 114 case HADOOP_DATANODE: 115 case HADOOP_NAMENODE: 116 return conf.get("hbase.it.clustermanager.hadoop.hdfs.user", "hdfs"); 117 case ZOOKEEPER_SERVER: 118 return conf.get("hbase.it.clustermanager.zookeeper.user", "zookeeper"); 119 default: 120 return conf.get("hbase.it.clustermanager.hbase.user", "hbase"); 121 } 122 } 123 124 /** 125 * Executes commands over SSH 126 */ 127 protected class RemoteShell extends Shell.ShellCommandExecutor { 128 private String hostname; 129 private String user; 130 131 public RemoteShell(String hostname, String[] execString, File dir, Map<String, String> env, 132 long timeout) { 133 super(execString, dir, env, timeout); 134 this.hostname = hostname; 135 } 136 137 public RemoteShell(String hostname, String[] execString, File dir, Map<String, String> env) { 138 super(execString, dir, env); 139 this.hostname = hostname; 140 } 141 142 public RemoteShell(String hostname, String[] execString, File dir) { 143 super(execString, dir); 144 this.hostname = hostname; 145 } 146 147 public RemoteShell(String hostname, String[] execString) { 148 super(execString); 149 this.hostname = hostname; 150 } 151 152 public RemoteShell(String hostname, String user, String[] execString) { 153 super(execString); 154 this.hostname = hostname; 155 this.user = user; 156 } 157 158 @Override 159 public String[] getExecString() { 160 String at = sshUserName.isEmpty() ? "" : "@"; 161 String remoteCmd = StringUtils.join(super.getExecString(), " "); 162 String cmd = String.format(tunnelCmd, sshOptions, sshUserName, at, hostname, remoteCmd, user); 163 LOG.info("Executing full command [" + cmd + "]"); 164 return new String[] { "/usr/bin/env", "bash", "-c", cmd }; 165 } 166 } 167 168 /** 169 * Executes commands over SSH 170 */ 171 protected class RemoteSudoShell extends Shell.ShellCommandExecutor { 172 private String hostname; 173 174 public RemoteSudoShell(String hostname, String[] execString, long timeout) { 175 this(hostname, execString, null, null, timeout); 176 } 177 178 public RemoteSudoShell(String hostname, String[] execString, File dir, Map<String, String> env, 179 long timeout) { 180 super(execString, dir, env, timeout); 181 this.hostname = hostname; 182 } 183 184 @Override 185 public String[] getExecString() { 186 String at = sshUserName.isEmpty() ? "" : "@"; 187 String remoteCmd = StringUtils.join(super.getExecString(), " "); 188 String cmd = String.format(tunnelSudoCmd, sshOptions, sshUserName, at, hostname, remoteCmd, 189 timeOutInterval / 1000f); 190 LOG.info("Executing full command [" + cmd + "]"); 191 return new String[] { "/usr/bin/env", "bash", "-c", cmd }; 192 } 193 } 194 195 /** 196 * Provides command strings for services to be executed by Shell. CommandProviders are pluggable, 197 * and different deployments(windows, bigtop, etc) can be managed by plugging-in custom 198 * CommandProvider's or ClusterManager's. 199 */ 200 static abstract class CommandProvider { 201 202 enum Operation { 203 START, 204 STOP, 205 RESTART 206 } 207 208 public abstract String getCommand(ServiceType service, Operation op); 209 210 public String isRunningCommand(ServiceType service) { 211 return findPidCommand(service); 212 } 213 214 protected String findPidCommand(ServiceType service) { 215 return String.format("ps ux | grep proc_%s | grep -v grep | tr -s ' ' | cut -d ' ' -f2", 216 service); 217 } 218 219 public String signalCommand(ServiceType service, String signal) { 220 return String.format("%s | xargs sudo kill -s %s", findPidCommand(service), signal); 221 } 222 } 223 224 /** 225 * CommandProvider to manage the service using bin/hbase-* scripts 226 */ 227 static class HBaseShellCommandProvider extends CommandProvider { 228 private final String hbaseHome; 229 private final String confDir; 230 231 HBaseShellCommandProvider(Configuration conf) { 232 hbaseHome = conf.get("hbase.it.clustermanager.hbase.home", System.getenv("HBASE_HOME")); 233 String tmp = 234 conf.get("hbase.it.clustermanager.hbase.conf.dir", System.getenv("HBASE_CONF_DIR")); 235 if (tmp != null) { 236 confDir = String.format("--config %s", tmp); 237 } else { 238 confDir = ""; 239 } 240 } 241 242 @Override 243 public String getCommand(ServiceType service, Operation op) { 244 return String.format("%s/bin/hbase-daemon.sh %s %s %s", hbaseHome, confDir, 245 op.toString().toLowerCase(Locale.ROOT), service); 246 } 247 } 248 249 /** 250 * CommandProvider to manage the service using sbin/hadoop-* scripts. 251 */ 252 static class HadoopShellCommandProvider extends CommandProvider { 253 private final String hadoopHome; 254 private final String confDir; 255 256 HadoopShellCommandProvider(Configuration conf) throws IOException { 257 hadoopHome = conf.get("hbase.it.clustermanager.hadoop.home", System.getenv("HADOOP_HOME")); 258 String tmp = 259 conf.get("hbase.it.clustermanager.hadoop.conf.dir", System.getenv("HADOOP_CONF_DIR")); 260 if (hadoopHome == null) { 261 throw new IOException("Hadoop home configuration parameter i.e. " 262 + "'hbase.it.clustermanager.hadoop.home' is not configured properly."); 263 } 264 if (tmp != null) { 265 confDir = String.format("--config %s", tmp); 266 } else { 267 confDir = ""; 268 } 269 } 270 271 @Override 272 public String getCommand(ServiceType service, Operation op) { 273 return String.format("%s/sbin/hadoop-daemon.sh %s %s %s", hadoopHome, confDir, 274 op.toString().toLowerCase(Locale.ROOT), service); 275 } 276 } 277 278 /** 279 * CommandProvider to manage the service using bin/zk* scripts. 280 */ 281 static class ZookeeperShellCommandProvider extends CommandProvider { 282 private final String zookeeperHome; 283 284 ZookeeperShellCommandProvider(Configuration conf) throws IOException { 285 zookeeperHome = 286 conf.get("hbase.it.clustermanager.zookeeper.home", System.getenv("ZOOBINDIR")); 287 if (zookeeperHome == null) { 288 throw new IOException("ZooKeeper home configuration parameter i.e. " 289 + "'hbase.it.clustermanager.zookeeper.home' is not configured properly."); 290 } 291 } 292 293 @Override 294 public String getCommand(ServiceType service, Operation op) { 295 return String.format("%s/bin/zkServer.sh %s", zookeeperHome, 296 op.toString().toLowerCase(Locale.ROOT)); 297 } 298 299 @Override 300 protected String findPidCommand(ServiceType service) { 301 return String.format("ps ux | grep %s | grep -v grep | tr -s ' ' | cut -d ' ' -f2", service); 302 } 303 } 304 305 public HBaseClusterManager() { 306 } 307 308 protected CommandProvider getCommandProvider(ServiceType service) throws IOException { 309 switch (service) { 310 case HADOOP_DATANODE: 311 case HADOOP_NAMENODE: 312 return new HadoopShellCommandProvider(getConf()); 313 case ZOOKEEPER_SERVER: 314 return new ZookeeperShellCommandProvider(getConf()); 315 default: 316 Class<? extends CommandProvider> provider = 317 getConf().getClass("hbase.it.clustermanager.hbase.command.provider", 318 HBaseShellCommandProvider.class, CommandProvider.class); 319 return ReflectionUtils.newInstance(provider, getConf()); 320 } 321 } 322 323 /** 324 * Execute the given command on the host using SSH 325 * @return pair of exit code and command output 326 * @throws IOException if something goes wrong. 327 */ 328 protected Pair<Integer, String> exec(String hostname, ServiceType service, String... cmd) 329 throws IOException { 330 LOG.info("Executing remote command: {}, hostname:{}", StringUtils.join(cmd, " "), hostname); 331 332 RemoteShell shell = new RemoteShell(hostname, getServiceUser(service), cmd); 333 try { 334 shell.execute(); 335 } catch (Shell.ExitCodeException ex) { 336 // capture the stdout of the process as well. 337 String output = shell.getOutput(); 338 // add output for the ExitCodeException. 339 throw new Shell.ExitCodeException(ex.getExitCode(), 340 "stderr: " + ex.getMessage() + ", stdout: " + output); 341 } 342 343 LOG.info("Executed remote command, exit code:{} , output:{}", shell.getExitCode(), 344 shell.getOutput()); 345 346 return new Pair<>(shell.getExitCode(), shell.getOutput()); 347 } 348 349 private Pair<Integer, String> execWithRetries(String hostname, ServiceType service, String... cmd) 350 throws IOException { 351 RetryCounter retryCounter = retryCounterFactory.create(); 352 while (true) { 353 try { 354 return exec(hostname, service, cmd); 355 } catch (IOException e) { 356 retryOrThrow(retryCounter, e, hostname, cmd); 357 } 358 try { 359 retryCounter.sleepUntilNextRetry(); 360 } catch (InterruptedException ex) { 361 // ignore 362 LOG.warn("Sleep Interrupted:", ex); 363 } 364 } 365 } 366 367 /** 368 * Execute the given command on the host using SSH 369 * @return pair of exit code and command output 370 * @throws IOException if something goes wrong. 371 */ 372 public Pair<Integer, String> execSudo(String hostname, long timeout, String... cmd) 373 throws IOException { 374 LOG.info("Executing remote command: {} , hostname:{}", StringUtils.join(cmd, " "), hostname); 375 376 RemoteSudoShell shell = new RemoteSudoShell(hostname, cmd, timeout); 377 try { 378 shell.execute(); 379 } catch (Shell.ExitCodeException ex) { 380 // capture the stdout of the process as well. 381 String output = shell.getOutput(); 382 // add output for the ExitCodeException. 383 throw new Shell.ExitCodeException(ex.getExitCode(), 384 "stderr: " + ex.getMessage() + ", stdout: " + output); 385 } 386 387 LOG.info("Executed remote command, exit code:{} , output:{}", shell.getExitCode(), 388 shell.getOutput()); 389 390 return new Pair<>(shell.getExitCode(), shell.getOutput()); 391 } 392 393 public Pair<Integer, String> execSudoWithRetries(String hostname, long timeout, String... cmd) 394 throws IOException { 395 RetryCounter retryCounter = retryCounterFactory.create(); 396 while (true) { 397 try { 398 return execSudo(hostname, timeout, cmd); 399 } catch (IOException e) { 400 retryOrThrow(retryCounter, e, hostname, cmd); 401 } 402 try { 403 retryCounter.sleepUntilNextRetry(); 404 } catch (InterruptedException ex) { 405 // ignore 406 LOG.warn("Sleep Interrupted:", ex); 407 } 408 } 409 } 410 411 private <E extends Exception> void retryOrThrow(RetryCounter retryCounter, E ex, String hostname, 412 String[] cmd) throws E { 413 if (retryCounter.shouldRetry()) { 414 LOG.warn("Remote command: " + StringUtils.join(cmd, " ") + " , hostname:" + hostname 415 + " failed at attempt " + retryCounter.getAttemptTimes() + ". Retrying until maxAttempts: " 416 + retryCounter.getMaxAttempts() + ". Exception: " + ex.getMessage()); 417 return; 418 } 419 throw ex; 420 } 421 422 private void exec(String hostname, ServiceType service, Operation op) throws IOException { 423 execWithRetries(hostname, service, getCommandProvider(service).getCommand(service, op)); 424 } 425 426 @Override 427 public void start(ServiceType service, String hostname, int port) throws IOException { 428 exec(hostname, service, Operation.START); 429 } 430 431 @Override 432 public void stop(ServiceType service, String hostname, int port) throws IOException { 433 exec(hostname, service, Operation.STOP); 434 } 435 436 @Override 437 public void restart(ServiceType service, String hostname, int port) throws IOException { 438 exec(hostname, service, Operation.RESTART); 439 } 440 441 public void signal(ServiceType service, Signal signal, String hostname) throws IOException { 442 execWithRetries(hostname, service, 443 getCommandProvider(service).signalCommand(service, signal.toString())); 444 } 445 446 @Override 447 public boolean isRunning(ServiceType service, String hostname, int port) throws IOException { 448 String ret = 449 execWithRetries(hostname, service, getCommandProvider(service).isRunningCommand(service)) 450 .getSecond(); 451 return ret.length() > 0; 452 } 453 454 @Override 455 public void kill(ServiceType service, String hostname, int port) throws IOException { 456 signal(service, Signal.SIGKILL, hostname); 457 } 458 459 @Override 460 public void suspend(ServiceType service, String hostname, int port) throws IOException { 461 signal(service, Signal.SIGSTOP, hostname); 462 } 463 464 @Override 465 public void resume(ServiceType service, String hostname, int port) throws IOException { 466 signal(service, Signal.SIGCONT, hostname); 467 } 468}