001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import java.io.File; 021import java.io.IOException; 022import java.util.Locale; 023import java.util.Map; 024import org.apache.commons.lang3.StringUtils; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.conf.Configured; 027import org.apache.hadoop.hbase.HBaseClusterManager.CommandProvider.Operation; 028import org.apache.hadoop.hbase.util.Pair; 029import org.apache.hadoop.hbase.util.ReflectionUtils; 030import org.apache.hadoop.hbase.util.RetryCounter; 031import org.apache.hadoop.hbase.util.RetryCounter.RetryConfig; 032import org.apache.hadoop.hbase.util.RetryCounterFactory; 033import org.apache.hadoop.util.Shell; 034import org.apache.yetus.audience.InterfaceAudience; 035import org.slf4j.Logger; 036import org.slf4j.LoggerFactory; 037 038/** 039 * A default cluster manager for HBase. Uses SSH, and hbase shell scripts to manage the cluster. 040 * Assumes Unix-like commands are available like 'ps', 'kill', etc. Also assumes the user running 041 * the test has enough "power" to start & stop servers on the remote machines (for example, the test 042 * user could be the same user as the user the daemon is running as) 043 */ 044@InterfaceAudience.Private 045public class HBaseClusterManager extends Configured implements ClusterManager { 046 047 protected enum Signal { 048 SIGKILL, 049 SIGSTOP, 050 SIGCONT, 051 } 052 053 protected static final Logger LOG = LoggerFactory.getLogger(HBaseClusterManager.class); 054 private String sshUserName; 055 private String sshOptions; 056 057 /** 058 * The command format that is used to execute the remote command. Arguments: 1 SSH options, 2 user 059 * name , 3 "@" if username is set, 4 host, 5 original command, 6 service user. 060 */ 061 private static final String DEFAULT_TUNNEL_CMD = 062 "timeout 30 /usr/bin/ssh %1$s %2$s%3$s%4$s \"sudo -u %6$s %5$s\""; 063 private String tunnelCmd; 064 065 /** 066 * The command format that is used to execute the remote command with sudo. Arguments: 1 SSH 067 * options, 2 user name , 3 "@" if username is set, 4 host, 5 original command, 6 timeout. 068 */ 069 private static final String DEFAULT_TUNNEL_SUDO_CMD = 070 "timeout %6$s /usr/bin/ssh %1$s %2$s%3$s%4$s \"sudo %5$s\""; 071 private String tunnelSudoCmd; 072 073 static final String RETRY_ATTEMPTS_KEY = "hbase.it.clustermanager.retry.attempts"; 074 static final int DEFAULT_RETRY_ATTEMPTS = 5; 075 076 static final String RETRY_SLEEP_INTERVAL_KEY = "hbase.it.clustermanager.retry.sleep.interval"; 077 static final int DEFAULT_RETRY_SLEEP_INTERVAL = 1000; 078 079 protected RetryCounterFactory retryCounterFactory; 080 081 @Override 082 public void setConf(Configuration conf) { 083 super.setConf(conf); 084 if (conf == null) { 085 // Configured gets passed null before real conf. Why? I don't know. 086 return; 087 } 088 sshUserName = conf.get("hbase.it.clustermanager.ssh.user", ""); 089 String extraSshOptions = conf.get("hbase.it.clustermanager.ssh.opts", "-o ConnectTimeout=10"); 090 sshOptions = System.getenv("HBASE_SSH_OPTS"); 091 if (!extraSshOptions.isEmpty()) { 092 sshOptions = StringUtils.join(new Object[] { sshOptions, extraSshOptions }, " "); 093 } 094 sshOptions = (sshOptions == null) ? "" : sshOptions; 095 sshUserName = (sshUserName == null) ? "" : sshUserName; 096 tunnelCmd = conf.get("hbase.it.clustermanager.ssh.cmd", DEFAULT_TUNNEL_CMD); 097 tunnelSudoCmd = conf.get("hbase.it.clustermanager.ssh.sudo.cmd", DEFAULT_TUNNEL_SUDO_CMD); 098 // Print out ssh special config if any. 099 if ( 100 (sshUserName != null && sshUserName.length() > 0) 101 || (sshOptions != null && sshOptions.length() > 0) 102 ) { 103 LOG.info("Running with SSH user [" + sshUserName + "] and options [" + sshOptions + "]"); 104 } 105 106 this.retryCounterFactory = new RetryCounterFactory( 107 new RetryConfig().setMaxAttempts(conf.getInt(RETRY_ATTEMPTS_KEY, DEFAULT_RETRY_ATTEMPTS)) 108 .setSleepInterval(conf.getLong(RETRY_SLEEP_INTERVAL_KEY, DEFAULT_RETRY_SLEEP_INTERVAL))); 109 } 110 111 protected String getServiceUser(ServiceType service) { 112 Configuration conf = getConf(); 113 switch (service) { 114 case HADOOP_DATANODE: 115 case HADOOP_NAMENODE: 116 return conf.get("hbase.it.clustermanager.hadoop.hdfs.user", "hdfs"); 117 case ZOOKEEPER_SERVER: 118 return conf.get("hbase.it.clustermanager.zookeeper.user", "zookeeper"); 119 default: 120 return conf.get("hbase.it.clustermanager.hbase.user", "hbase"); 121 } 122 } 123 124 /** 125 * Executes commands over SSH 126 */ 127 protected class RemoteShell extends Shell.ShellCommandExecutor { 128 private String hostname; 129 private String user; 130 131 public RemoteShell(String hostname, String[] execString, File dir, Map<String, String> env, 132 long timeout) { 133 super(execString, dir, env, timeout); 134 this.hostname = hostname; 135 } 136 137 public RemoteShell(String hostname, String[] execString, File dir, Map<String, String> env) { 138 super(execString, dir, env); 139 this.hostname = hostname; 140 } 141 142 public RemoteShell(String hostname, String[] execString, File dir) { 143 super(execString, dir); 144 this.hostname = hostname; 145 } 146 147 public RemoteShell(String hostname, String[] execString) { 148 super(execString); 149 this.hostname = hostname; 150 } 151 152 public RemoteShell(String hostname, String user, String[] execString) { 153 super(execString); 154 this.hostname = hostname; 155 this.user = user; 156 } 157 158 @Override 159 public String[] getExecString() { 160 String at = sshUserName.isEmpty() ? "" : "@"; 161 String remoteCmd = StringUtils.join(super.getExecString(), " "); 162 String cmd = String.format(tunnelCmd, sshOptions, sshUserName, at, hostname, remoteCmd, user); 163 LOG.info("Executing full command [" + cmd + "]"); 164 return new String[] { "/usr/bin/env", "bash", "-c", cmd }; 165 } 166 } 167 168 /** 169 * Executes commands over SSH 170 */ 171 protected class RemoteSudoShell extends Shell.ShellCommandExecutor { 172 private String hostname; 173 174 public RemoteSudoShell(String hostname, String[] execString, long timeout) { 175 this(hostname, execString, null, null, timeout); 176 } 177 178 public RemoteSudoShell(String hostname, String[] execString, File dir, Map<String, String> env, 179 long timeout) { 180 super(execString, dir, env, timeout); 181 this.hostname = hostname; 182 } 183 184 @Override 185 public String[] getExecString() { 186 String at = sshUserName.isEmpty() ? "" : "@"; 187 String remoteCmd = StringUtils.join(super.getExecString(), " "); 188 String cmd = String.format(tunnelSudoCmd, sshOptions, sshUserName, at, hostname, remoteCmd, 189 timeOutInterval / 1000f); 190 LOG.info("Executing full command [" + cmd + "]"); 191 return new String[] { "/usr/bin/env", "bash", "-c", cmd }; 192 } 193 } 194 195 /** 196 * Provides command strings for services to be executed by Shell. CommandProviders are pluggable, 197 * and different deployments(windows, bigtop, etc) can be managed by plugging-in custom 198 * CommandProvider's or ClusterManager's. 199 */ 200 static abstract class CommandProvider { 201 202 enum Operation { 203 START, 204 STOP, 205 RESTART 206 } 207 208 public abstract String getCommand(ServiceType service, Operation op); 209 210 public String isRunningCommand(ServiceType service) { 211 return findPidCommand(service); 212 } 213 214 protected String findPidCommand(ServiceType service) { 215 return String.format("ps ux | grep proc_%s | grep -v grep | tr -s ' ' | cut -d ' ' -f2", 216 service); 217 } 218 219 protected String getStateCommand(ServiceType service) { 220 return String.format("%s | xargs ps -o state= -p ", findPidCommand(service)); 221 } 222 223 public String signalCommand(ServiceType service, String signal) { 224 return String.format("%s | xargs kill -s %s", findPidCommand(service), signal); 225 } 226 } 227 228 /** 229 * CommandProvider to manage the service using bin/hbase-* scripts 230 */ 231 static class HBaseShellCommandProvider extends CommandProvider { 232 private final String hbaseHome; 233 private final String confDir; 234 235 HBaseShellCommandProvider(Configuration conf) { 236 hbaseHome = conf.get("hbase.it.clustermanager.hbase.home", System.getenv("HBASE_HOME")); 237 String tmp = 238 conf.get("hbase.it.clustermanager.hbase.conf.dir", System.getenv("HBASE_CONF_DIR")); 239 if (tmp != null) { 240 confDir = String.format("--config %s", tmp); 241 } else { 242 confDir = ""; 243 } 244 } 245 246 @Override 247 public String getCommand(ServiceType service, Operation op) { 248 return String.format("%s/bin/hbase-daemon.sh %s %s %s", hbaseHome, confDir, 249 op.toString().toLowerCase(Locale.ROOT), service); 250 } 251 } 252 253 /** 254 * CommandProvider to manage the service using sbin/hadoop-* scripts. 255 */ 256 static class HadoopShellCommandProvider extends CommandProvider { 257 private final String hadoopHome; 258 private final String confDir; 259 260 HadoopShellCommandProvider(Configuration conf) throws IOException { 261 hadoopHome = conf.get("hbase.it.clustermanager.hadoop.home", System.getenv("HADOOP_HOME")); 262 String tmp = 263 conf.get("hbase.it.clustermanager.hadoop.conf.dir", System.getenv("HADOOP_CONF_DIR")); 264 if (hadoopHome == null) { 265 throw new IOException("Hadoop home configuration parameter i.e. " 266 + "'hbase.it.clustermanager.hadoop.home' is not configured properly."); 267 } 268 if (tmp != null) { 269 confDir = String.format("--config %s", tmp); 270 } else { 271 confDir = ""; 272 } 273 } 274 275 @Override 276 public String getCommand(ServiceType service, Operation op) { 277 return String.format("%s/sbin/hadoop-daemon.sh %s %s %s", hadoopHome, confDir, 278 op.toString().toLowerCase(Locale.ROOT), service); 279 } 280 } 281 282 /** 283 * CommandProvider to manage the service using bin/zk* scripts. 284 */ 285 static class ZookeeperShellCommandProvider extends CommandProvider { 286 private final String zookeeperHome; 287 288 ZookeeperShellCommandProvider(Configuration conf) throws IOException { 289 zookeeperHome = 290 conf.get("hbase.it.clustermanager.zookeeper.home", System.getenv("ZOOBINDIR")); 291 if (zookeeperHome == null) { 292 throw new IOException("ZooKeeper home configuration parameter i.e. " 293 + "'hbase.it.clustermanager.zookeeper.home' is not configured properly."); 294 } 295 } 296 297 @Override 298 public String getCommand(ServiceType service, Operation op) { 299 return String.format("%s/bin/zkServer.sh %s", zookeeperHome, 300 op.toString().toLowerCase(Locale.ROOT)); 301 } 302 303 @Override 304 protected String findPidCommand(ServiceType service) { 305 return String.format("ps ux | grep %s | grep -v grep | tr -s ' ' | cut -d ' ' -f2", service); 306 } 307 } 308 309 public HBaseClusterManager() { 310 } 311 312 protected CommandProvider getCommandProvider(ServiceType service) throws IOException { 313 switch (service) { 314 case HADOOP_DATANODE: 315 case HADOOP_NAMENODE: 316 return new HadoopShellCommandProvider(getConf()); 317 case ZOOKEEPER_SERVER: 318 return new ZookeeperShellCommandProvider(getConf()); 319 default: 320 Class<? extends CommandProvider> provider = 321 getConf().getClass("hbase.it.clustermanager.hbase.command.provider", 322 HBaseShellCommandProvider.class, CommandProvider.class); 323 return ReflectionUtils.newInstance(provider, getConf()); 324 } 325 } 326 327 /** 328 * Execute the given command on the host using SSH 329 * @return pair of exit code and command output 330 * @throws IOException if something goes wrong. 331 */ 332 protected Pair<Integer, String> exec(String hostname, ServiceType service, String... cmd) 333 throws IOException { 334 LOG.info("Executing remote command: {}, hostname:{}", StringUtils.join(cmd, " "), hostname); 335 336 RemoteShell shell = new RemoteShell(hostname, getServiceUser(service), cmd); 337 try { 338 shell.execute(); 339 } catch (Shell.ExitCodeException ex) { 340 // capture the stdout of the process as well. 341 String output = shell.getOutput(); 342 // add output for the ExitCodeException. 343 throw new Shell.ExitCodeException(ex.getExitCode(), 344 "stderr: " + ex.getMessage() + ", stdout: " + output); 345 } 346 347 LOG.info("Executed remote command, exit code:{} , output:{}", shell.getExitCode(), 348 shell.getOutput()); 349 350 return new Pair<>(shell.getExitCode(), shell.getOutput()); 351 } 352 353 private Pair<Integer, String> execWithRetries(String hostname, ServiceType service, String... cmd) 354 throws IOException { 355 RetryCounter retryCounter = retryCounterFactory.create(); 356 while (true) { 357 try { 358 return exec(hostname, service, cmd); 359 } catch (IOException e) { 360 retryOrThrow(retryCounter, e, hostname, cmd); 361 } 362 try { 363 retryCounter.sleepUntilNextRetry(); 364 } catch (InterruptedException ex) { 365 // ignore 366 LOG.warn("Sleep Interrupted:", ex); 367 } 368 } 369 } 370 371 /** 372 * Execute the given command on the host using SSH 373 * @return pair of exit code and command output 374 * @throws IOException if something goes wrong. 375 */ 376 public Pair<Integer, String> execSudo(String hostname, long timeout, String... cmd) 377 throws IOException { 378 LOG.info("Executing remote command: {} , hostname:{}", StringUtils.join(cmd, " "), hostname); 379 380 RemoteSudoShell shell = new RemoteSudoShell(hostname, cmd, timeout); 381 try { 382 shell.execute(); 383 } catch (Shell.ExitCodeException ex) { 384 // capture the stdout of the process as well. 385 String output = shell.getOutput(); 386 // add output for the ExitCodeException. 387 throw new Shell.ExitCodeException(ex.getExitCode(), 388 "stderr: " + ex.getMessage() + ", stdout: " + output); 389 } 390 391 LOG.info("Executed remote command, exit code:{} , output:{}", shell.getExitCode(), 392 shell.getOutput()); 393 394 return new Pair<>(shell.getExitCode(), shell.getOutput()); 395 } 396 397 public Pair<Integer, String> execSudoWithRetries(String hostname, long timeout, String... cmd) 398 throws IOException { 399 RetryCounter retryCounter = retryCounterFactory.create(); 400 while (true) { 401 try { 402 return execSudo(hostname, timeout, cmd); 403 } catch (IOException e) { 404 retryOrThrow(retryCounter, e, hostname, cmd); 405 } 406 try { 407 retryCounter.sleepUntilNextRetry(); 408 } catch (InterruptedException ex) { 409 // ignore 410 LOG.warn("Sleep Interrupted:", ex); 411 } 412 } 413 } 414 415 private <E extends Exception> void retryOrThrow(RetryCounter retryCounter, E ex, String hostname, 416 String[] cmd) throws E { 417 if (retryCounter.shouldRetry()) { 418 LOG.warn("Remote command: " + StringUtils.join(cmd, " ") + " , hostname:" + hostname 419 + " failed at attempt " + retryCounter.getAttemptTimes() + ". Retrying until maxAttempts: " 420 + retryCounter.getMaxAttempts() + ". Exception: " + ex.getMessage()); 421 return; 422 } 423 throw ex; 424 } 425 426 private void exec(String hostname, ServiceType service, Operation op) throws IOException { 427 execWithRetries(hostname, service, getCommandProvider(service).getCommand(service, op)); 428 } 429 430 @Override 431 public void start(ServiceType service, String hostname, int port) throws IOException { 432 exec(hostname, service, Operation.START); 433 } 434 435 @Override 436 public void stop(ServiceType service, String hostname, int port) throws IOException { 437 exec(hostname, service, Operation.STOP); 438 } 439 440 @Override 441 public void restart(ServiceType service, String hostname, int port) throws IOException { 442 exec(hostname, service, Operation.RESTART); 443 } 444 445 public void signal(ServiceType service, Signal signal, String hostname) throws IOException { 446 execWithRetries(hostname, service, 447 getCommandProvider(service).signalCommand(service, signal.toString())); 448 } 449 450 @Override 451 public boolean isRunning(ServiceType service, String hostname, int port) throws IOException { 452 String ret = 453 execWithRetries(hostname, service, getCommandProvider(service).isRunningCommand(service)) 454 .getSecond(); 455 return ret.length() > 0; 456 } 457 458 @Override 459 public void kill(ServiceType service, String hostname, int port) throws IOException { 460 signal(service, Signal.SIGKILL, hostname); 461 } 462 463 @Override 464 public void suspend(ServiceType service, String hostname, int port) throws IOException { 465 signal(service, Signal.SIGSTOP, hostname); 466 } 467 468 @Override 469 public void resume(ServiceType service, String hostname, int port) throws IOException { 470 signal(service, Signal.SIGCONT, hostname); 471 } 472 473 public boolean isSuspended(ServiceType service, String hostname, int port) throws IOException { 474 String ret = 475 execWithRetries(hostname, service, getCommandProvider(service).getStateCommand(service)) 476 .getSecond(); 477 return ret != null && ret.trim().equals("T"); 478 } 479 480 public boolean isResumed(ServiceType service, String hostname, int port) throws IOException { 481 String ret = 482 execWithRetries(hostname, service, getCommandProvider(service).getStateCommand(service)) 483 .getSecond(); 484 return ret != null && !ret.trim().equals("T"); 485 } 486}