001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase; 020 021import java.io.File; 022import java.io.IOException; 023import java.util.Locale; 024import java.util.Map; 025import org.apache.commons.lang3.StringUtils; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.conf.Configured; 028import org.apache.hadoop.hbase.HBaseClusterManager.CommandProvider.Operation; 029import org.apache.hadoop.hbase.util.Pair; 030import org.apache.hadoop.hbase.util.RetryCounter; 031import org.apache.hadoop.hbase.util.RetryCounter.RetryConfig; 032import org.apache.hadoop.hbase.util.RetryCounterFactory; 033import org.apache.hadoop.util.Shell; 034import org.apache.yetus.audience.InterfaceAudience; 035import org.slf4j.Logger; 036import org.slf4j.LoggerFactory; 037 038/** 039 * A default cluster manager for HBase. Uses SSH, and hbase shell scripts 040 * to manage the cluster. Assumes Unix-like commands are available like 'ps', 041 * 'kill', etc. Also assumes the user running the test has enough "power" to start & stop 042 * servers on the remote machines (for example, the test user could be the same user as the 043 * user the daemon is running as) 044 */ 045@InterfaceAudience.Private 046public class HBaseClusterManager extends Configured implements ClusterManager { 047 048 protected enum Signal { 049 SIGKILL, 050 SIGSTOP, 051 SIGCONT, 052 } 053 054 protected static final Logger LOG = LoggerFactory.getLogger(HBaseClusterManager.class); 055 private String sshUserName; 056 private String sshOptions; 057 058 /** 059 * The command format that is used to execute the remote command. Arguments: 060 * 1 SSH options, 2 user name , 3 "@" if username is set, 4 host, 061 * 5 original command, 6 service user. 062 */ 063 private static final String DEFAULT_TUNNEL_CMD = 064 "timeout 30 /usr/bin/ssh %1$s %2$s%3$s%4$s \"sudo -u %6$s %5$s\""; 065 private String tunnelCmd; 066 067 /** 068 * The command format that is used to execute the remote command with sudo. Arguments: 069 * 1 SSH options, 2 user name , 3 "@" if username is set, 4 host, 070 * 5 original command, 6 timeout. 071 */ 072 private static final String DEFAULT_TUNNEL_SUDO_CMD = 073 "timeout %6$s /usr/bin/ssh %1$s %2$s%3$s%4$s \"sudo %5$s\""; 074 private String tunnelSudoCmd; 075 076 static final String RETRY_ATTEMPTS_KEY = "hbase.it.clustermanager.retry.attempts"; 077 static final int DEFAULT_RETRY_ATTEMPTS = 5; 078 079 static final String RETRY_SLEEP_INTERVAL_KEY = "hbase.it.clustermanager.retry.sleep.interval"; 080 static final int DEFAULT_RETRY_SLEEP_INTERVAL = 1000; 081 082 protected RetryCounterFactory retryCounterFactory; 083 084 @Override 085 public void setConf(Configuration conf) { 086 super.setConf(conf); 087 if (conf == null) { 088 // Configured gets passed null before real conf. Why? I don't know. 089 return; 090 } 091 sshUserName = conf.get("hbase.it.clustermanager.ssh.user", ""); 092 String extraSshOptions = conf.get("hbase.it.clustermanager.ssh.opts", ""); 093 sshOptions = System.getenv("HBASE_SSH_OPTS"); 094 if (!extraSshOptions.isEmpty()) { 095 sshOptions = StringUtils.join(new Object[] { sshOptions, extraSshOptions }, " "); 096 } 097 sshOptions = (sshOptions == null) ? "" : sshOptions; 098 sshUserName = (sshUserName == null) ? "" : sshUserName; 099 tunnelCmd = conf.get("hbase.it.clustermanager.ssh.cmd", DEFAULT_TUNNEL_CMD); 100 tunnelSudoCmd = conf.get("hbase.it.clustermanager.ssh.sudo.cmd", DEFAULT_TUNNEL_SUDO_CMD); 101 // Print out ssh special config if any. 102 if ((sshUserName != null && sshUserName.length() > 0) || 103 (sshOptions != null && sshOptions.length() > 0)) { 104 LOG.info("Running with SSH user [" + sshUserName + "] and options [" + sshOptions + "]"); 105 } 106 107 this.retryCounterFactory = new RetryCounterFactory(new RetryConfig() 108 .setMaxAttempts(conf.getInt(RETRY_ATTEMPTS_KEY, DEFAULT_RETRY_ATTEMPTS)) 109 .setSleepInterval(conf.getLong(RETRY_SLEEP_INTERVAL_KEY, DEFAULT_RETRY_SLEEP_INTERVAL))); 110 } 111 112 protected String getServiceUser(ServiceType service) { 113 Configuration conf = getConf(); 114 switch (service) { 115 case HADOOP_DATANODE: 116 case HADOOP_NAMENODE: 117 return conf.get("hbase.it.clustermanager.hadoop.hdfs.user", "hdfs"); 118 case ZOOKEEPER_SERVER: 119 return conf.get("hbase.it.clustermanager.zookeeper.user", "zookeeper"); 120 default: 121 return conf.get("hbase.it.clustermanager.hbase.user", "hbase"); 122 } 123 } 124 125 /** 126 * Executes commands over SSH 127 */ 128 protected class RemoteShell extends Shell.ShellCommandExecutor { 129 private String hostname; 130 private String user; 131 132 public RemoteShell(String hostname, String[] execString, File dir, Map<String, String> env, 133 long timeout) { 134 super(execString, dir, env, timeout); 135 this.hostname = hostname; 136 } 137 138 public RemoteShell(String hostname, String[] execString, File dir, Map<String, String> env) { 139 super(execString, dir, env); 140 this.hostname = hostname; 141 } 142 143 public RemoteShell(String hostname, String[] execString, File dir) { 144 super(execString, dir); 145 this.hostname = hostname; 146 } 147 148 public RemoteShell(String hostname, String[] execString) { 149 super(execString); 150 this.hostname = hostname; 151 } 152 153 public RemoteShell(String hostname, String user, String[] execString) { 154 super(execString); 155 this.hostname = hostname; 156 this.user = user; 157 } 158 159 @Override 160 public String[] getExecString() { 161 String at = sshUserName.isEmpty() ? "" : "@"; 162 String remoteCmd = StringUtils.join(super.getExecString(), " "); 163 String cmd = String.format(tunnelCmd, sshOptions, sshUserName, at, hostname, remoteCmd, user); 164 LOG.info("Executing full command [" + cmd + "]"); 165 return new String[] { "/usr/bin/env", "bash", "-c", cmd }; 166 } 167 } 168 169 /** 170 * Executes commands over SSH 171 */ 172 protected class RemoteSudoShell extends Shell.ShellCommandExecutor { 173 private String hostname; 174 175 public RemoteSudoShell(String hostname, String[] execString, long timeout) { 176 this(hostname, execString, null, null, timeout); 177 } 178 179 public RemoteSudoShell(String hostname, String[] execString, File dir, Map<String, String> env, 180 long timeout) { 181 super(execString, dir, env, timeout); 182 this.hostname = hostname; 183 } 184 185 @Override 186 public String[] getExecString() { 187 String at = sshUserName.isEmpty() ? "" : "@"; 188 String remoteCmd = StringUtils.join(super.getExecString(), " "); 189 String cmd = String.format(tunnelSudoCmd, sshOptions, sshUserName, at, hostname, remoteCmd, 190 timeOutInterval/1000f); 191 LOG.info("Executing full command [" + cmd + "]"); 192 return new String[] { "/usr/bin/env", "bash", "-c", cmd }; 193 } 194 } 195 196 /** 197 * Provides command strings for services to be executed by Shell. CommandProviders are 198 * pluggable, and different deployments(windows, bigtop, etc) can be managed by 199 * plugging-in custom CommandProvider's or ClusterManager's. 200 */ 201 static abstract class CommandProvider { 202 203 enum Operation { 204 START, STOP, RESTART 205 } 206 207 public abstract String getCommand(ServiceType service, Operation op); 208 209 public String isRunningCommand(ServiceType service) { 210 return findPidCommand(service); 211 } 212 213 protected String findPidCommand(ServiceType service) { 214 return String.format("ps ux | grep proc_%s | grep -v grep | tr -s ' ' | cut -d ' ' -f2", 215 service); 216 } 217 218 public String signalCommand(ServiceType service, String signal) { 219 return String.format("%s | xargs kill -s %s", findPidCommand(service), signal); 220 } 221 } 222 223 /** 224 * CommandProvider to manage the service using bin/hbase-* scripts 225 */ 226 static class HBaseShellCommandProvider extends CommandProvider { 227 private final String hbaseHome; 228 private final String confDir; 229 230 HBaseShellCommandProvider(Configuration conf) { 231 hbaseHome = conf.get("hbase.it.clustermanager.hbase.home", 232 System.getenv("HBASE_HOME")); 233 String tmp = conf.get("hbase.it.clustermanager.hbase.conf.dir", 234 System.getenv("HBASE_CONF_DIR")); 235 if (tmp != null) { 236 confDir = String.format("--config %s", tmp); 237 } else { 238 confDir = ""; 239 } 240 } 241 242 @Override 243 public String getCommand(ServiceType service, Operation op) { 244 return String.format("%s/bin/hbase-daemon.sh %s %s %s", hbaseHome, confDir, 245 op.toString().toLowerCase(Locale.ROOT), service); 246 } 247 } 248 249 /** 250 * CommandProvider to manage the service using sbin/hadoop-* scripts. 251 */ 252 static class HadoopShellCommandProvider extends CommandProvider { 253 private final String hadoopHome; 254 private final String confDir; 255 256 HadoopShellCommandProvider(Configuration conf) throws IOException { 257 hadoopHome = conf.get("hbase.it.clustermanager.hadoop.home", 258 System.getenv("HADOOP_HOME")); 259 String tmp = conf.get("hbase.it.clustermanager.hadoop.conf.dir", 260 System.getenv("HADOOP_CONF_DIR")); 261 if (hadoopHome == null) { 262 throw new IOException("Hadoop home configuration parameter i.e. " + 263 "'hbase.it.clustermanager.hadoop.home' is not configured properly."); 264 } 265 if (tmp != null) { 266 confDir = String.format("--config %s", tmp); 267 } else { 268 confDir = ""; 269 } 270 } 271 272 @Override 273 public String getCommand(ServiceType service, Operation op) { 274 return String.format("%s/sbin/hadoop-daemon.sh %s %s %s", hadoopHome, confDir, 275 op.toString().toLowerCase(Locale.ROOT), service); 276 } 277 } 278 279 /** 280 * CommandProvider to manage the service using bin/zk* scripts. 281 */ 282 static class ZookeeperShellCommandProvider extends CommandProvider { 283 private final String zookeeperHome; 284 private final String confDir; 285 286 ZookeeperShellCommandProvider(Configuration conf) throws IOException { 287 zookeeperHome = conf.get("hbase.it.clustermanager.zookeeper.home", 288 System.getenv("ZOOBINDIR")); 289 String tmp = conf.get("hbase.it.clustermanager.zookeeper.conf.dir", 290 System.getenv("ZOOCFGDIR")); 291 if (zookeeperHome == null) { 292 throw new IOException("ZooKeeper home configuration parameter i.e. " + 293 "'hbase.it.clustermanager.zookeeper.home' is not configured properly."); 294 } 295 if (tmp != null) { 296 confDir = String.format("--config %s", tmp); 297 } else { 298 confDir = ""; 299 } 300 } 301 302 @Override 303 public String getCommand(ServiceType service, Operation op) { 304 return String.format("%s/bin/zkServer.sh %s", zookeeperHome, op.toString().toLowerCase(Locale.ROOT)); 305 } 306 307 @Override 308 protected String findPidCommand(ServiceType service) { 309 return String.format("ps ux | grep %s | grep -v grep | tr -s ' ' | cut -d ' ' -f2", 310 service); 311 } 312 } 313 314 public HBaseClusterManager() { 315 } 316 317 protected CommandProvider getCommandProvider(ServiceType service) throws IOException { 318 switch (service) { 319 case HADOOP_DATANODE: 320 case HADOOP_NAMENODE: 321 return new HadoopShellCommandProvider(getConf()); 322 case ZOOKEEPER_SERVER: 323 return new ZookeeperShellCommandProvider(getConf()); 324 default: 325 return new HBaseShellCommandProvider(getConf()); 326 } 327 } 328 329 /** 330 * Execute the given command on the host using SSH 331 * @return pair of exit code and command output 332 * @throws IOException if something goes wrong. 333 */ 334 protected Pair<Integer, String> exec(String hostname, ServiceType service, String... cmd) 335 throws IOException { 336 LOG.info("Executing remote command: {}, hostname:{}", StringUtils.join(cmd, " "), 337 hostname); 338 339 RemoteShell shell = new RemoteShell(hostname, getServiceUser(service), cmd); 340 try { 341 shell.execute(); 342 } catch (Shell.ExitCodeException ex) { 343 // capture the stdout of the process as well. 344 String output = shell.getOutput(); 345 // add output for the ExitCodeException. 346 throw new Shell.ExitCodeException(ex.getExitCode(), "stderr: " + ex.getMessage() 347 + ", stdout: " + output); 348 } 349 350 LOG.info("Executed remote command, exit code:{} , output:{}", shell.getExitCode(), 351 shell.getOutput()); 352 353 return new Pair<>(shell.getExitCode(), shell.getOutput()); 354 } 355 356 private Pair<Integer, String> execWithRetries(String hostname, ServiceType service, String... cmd) 357 throws IOException { 358 RetryCounter retryCounter = retryCounterFactory.create(); 359 while (true) { 360 try { 361 return exec(hostname, service, cmd); 362 } catch (IOException e) { 363 retryOrThrow(retryCounter, e, hostname, cmd); 364 } 365 try { 366 retryCounter.sleepUntilNextRetry(); 367 } catch (InterruptedException ex) { 368 // ignore 369 LOG.warn("Sleep Interrupted:", ex); 370 } 371 } 372 } 373 374 /** 375 * Execute the given command on the host using SSH 376 * @return pair of exit code and command output 377 * @throws IOException if something goes wrong. 378 */ 379 public Pair<Integer, String> execSudo(String hostname, long timeout, String... cmd) 380 throws IOException { 381 LOG.info("Executing remote command: {} , hostname:{}", StringUtils.join(cmd, " "), 382 hostname); 383 384 RemoteSudoShell shell = new RemoteSudoShell(hostname, cmd, timeout); 385 try { 386 shell.execute(); 387 } catch (Shell.ExitCodeException ex) { 388 // capture the stdout of the process as well. 389 String output = shell.getOutput(); 390 // add output for the ExitCodeException. 391 throw new Shell.ExitCodeException(ex.getExitCode(), "stderr: " + ex.getMessage() 392 + ", stdout: " + output); 393 } 394 395 LOG.info("Executed remote command, exit code:{} , output:{}", shell.getExitCode(), 396 shell.getOutput()); 397 398 return new Pair<>(shell.getExitCode(), shell.getOutput()); 399 } 400 401 public Pair<Integer, String> execSudoWithRetries(String hostname, long timeout, String... cmd) 402 throws IOException { 403 RetryCounter retryCounter = retryCounterFactory.create(); 404 while (true) { 405 try { 406 return execSudo(hostname, timeout, cmd); 407 } catch (IOException e) { 408 retryOrThrow(retryCounter, e, hostname, cmd); 409 } 410 try { 411 retryCounter.sleepUntilNextRetry(); 412 } catch (InterruptedException ex) { 413 // ignore 414 LOG.warn("Sleep Interrupted:", ex); 415 } 416 } 417 } 418 419 private <E extends Exception> void retryOrThrow(RetryCounter retryCounter, E ex, 420 String hostname, String[] cmd) throws E { 421 if (retryCounter.shouldRetry()) { 422 LOG.warn("Remote command: " + StringUtils.join(cmd, " ") + " , hostname:" + hostname 423 + " failed at attempt " + retryCounter.getAttemptTimes() + ". Retrying until maxAttempts: " 424 + retryCounter.getMaxAttempts() + ". Exception: " + ex.getMessage()); 425 return; 426 } 427 throw ex; 428 } 429 430 private void exec(String hostname, ServiceType service, Operation op) throws IOException { 431 execWithRetries(hostname, service, getCommandProvider(service).getCommand(service, op)); 432 } 433 434 @Override 435 public void start(ServiceType service, String hostname, int port) throws IOException { 436 exec(hostname, service, Operation.START); 437 } 438 439 @Override 440 public void stop(ServiceType service, String hostname, int port) throws IOException { 441 exec(hostname, service, Operation.STOP); 442 } 443 444 @Override 445 public void restart(ServiceType service, String hostname, int port) throws IOException { 446 exec(hostname, service, Operation.RESTART); 447 } 448 449 public void signal(ServiceType service, Signal signal, String hostname) throws IOException { 450 execWithRetries(hostname, service, 451 getCommandProvider(service).signalCommand(service, signal.toString())); 452 } 453 454 @Override 455 public boolean isRunning(ServiceType service, String hostname, int port) throws IOException { 456 String ret = execWithRetries(hostname, service, 457 getCommandProvider(service).isRunningCommand(service)).getSecond(); 458 return ret.length() > 0; 459 } 460 461 @Override 462 public void kill(ServiceType service, String hostname, int port) throws IOException { 463 signal(service, Signal.SIGKILL, hostname); 464 } 465 466 @Override 467 public void suspend(ServiceType service, String hostname, int port) throws IOException { 468 signal(service, Signal.SIGSTOP, hostname); 469 } 470 471 @Override 472 public void resume(ServiceType service, String hostname, int port) throws IOException { 473 signal(service, Signal.SIGCONT, hostname); 474 } 475}