001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase; 020 021import java.io.File; 022import java.io.IOException; 023import java.util.Locale; 024import java.util.Map; 025 026import org.apache.commons.lang3.StringUtils; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.conf.Configured; 029import org.apache.hadoop.hbase.HBaseClusterManager.CommandProvider.Operation; 030import org.apache.hadoop.hbase.util.Pair; 031import org.apache.hadoop.hbase.util.RetryCounter; 032import org.apache.hadoop.hbase.util.RetryCounter.RetryConfig; 033import org.apache.hadoop.hbase.util.RetryCounterFactory; 034import org.apache.hadoop.util.Shell; 035import org.apache.yetus.audience.InterfaceAudience; 036import org.slf4j.Logger; 037import org.slf4j.LoggerFactory; 038 039/** 040 * A default cluster manager for HBase. Uses SSH, and hbase shell scripts 041 * to manage the cluster. Assumes Unix-like commands are available like 'ps', 042 * 'kill', etc. Also assumes the user running the test has enough "power" to start & stop 043 * servers on the remote machines (for example, the test user could be the same user as the 044 * user the daemon is running as) 045 */ 046@InterfaceAudience.Private 047public class HBaseClusterManager extends Configured implements ClusterManager { 048 private static final String SIGKILL = "SIGKILL"; 049 private static final String SIGSTOP = "SIGSTOP"; 050 private static final String SIGCONT = "SIGCONT"; 051 052 protected static final Logger LOG = LoggerFactory.getLogger(HBaseClusterManager.class); 053 private String sshUserName; 054 private String sshOptions; 055 056 /** 057 * The command format that is used to execute the remote command. Arguments: 058 * 1 SSH options, 2 user name , 3 "@" if username is set, 4 host, 059 * 5 original command, 6 service user. 060 */ 061 private static final String DEFAULT_TUNNEL_CMD = 062 "timeout 30 /usr/bin/ssh %1$s %2$s%3$s%4$s \"sudo -u %6$s %5$s\""; 063 private String tunnelCmd; 064 065 /** 066 * The command format that is used to execute the remote command with sudo. Arguments: 067 * 1 SSH options, 2 user name , 3 "@" if username is set, 4 host, 068 * 5 original command, 6 timeout. 069 */ 070 private static final String DEFAULT_TUNNEL_SUDO_CMD = 071 "timeout %6$s /usr/bin/ssh %1$s %2$s%3$s%4$s \"sudo %5$s\""; 072 private String tunnelSudoCmd; 073 074 private static final String RETRY_ATTEMPTS_KEY = "hbase.it.clustermanager.retry.attempts"; 075 private static final int DEFAULT_RETRY_ATTEMPTS = 5; 076 077 private static final String RETRY_SLEEP_INTERVAL_KEY = "hbase.it.clustermanager.retry.sleep.interval"; 078 private static final int DEFAULT_RETRY_SLEEP_INTERVAL = 1000; 079 080 protected RetryCounterFactory retryCounterFactory; 081 082 @Override 083 public void setConf(Configuration conf) { 084 super.setConf(conf); 085 if (conf == null) { 086 // Configured gets passed null before real conf. Why? I don't know. 087 return; 088 } 089 sshUserName = conf.get("hbase.it.clustermanager.ssh.user", ""); 090 String extraSshOptions = conf.get("hbase.it.clustermanager.ssh.opts", ""); 091 sshOptions = System.getenv("HBASE_SSH_OPTS"); 092 if (!extraSshOptions.isEmpty()) { 093 sshOptions = StringUtils.join(new Object[] { sshOptions, extraSshOptions }, " "); 094 } 095 sshOptions = (sshOptions == null) ? "" : sshOptions; 096 sshUserName = (sshUserName == null) ? "" : sshUserName; 097 tunnelCmd = conf.get("hbase.it.clustermanager.ssh.cmd", DEFAULT_TUNNEL_CMD); 098 tunnelSudoCmd = conf.get("hbase.it.clustermanager.ssh.sudo.cmd", DEFAULT_TUNNEL_SUDO_CMD); 099 // Print out ssh special config if any. 100 if ((sshUserName != null && sshUserName.length() > 0) || 101 (sshOptions != null && sshOptions.length() > 0)) { 102 LOG.info("Running with SSH user [" + sshUserName + "] and options [" + sshOptions + "]"); 103 } 104 105 this.retryCounterFactory = new RetryCounterFactory(new RetryConfig() 106 .setMaxAttempts(conf.getInt(RETRY_ATTEMPTS_KEY, DEFAULT_RETRY_ATTEMPTS)) 107 .setSleepInterval(conf.getLong(RETRY_SLEEP_INTERVAL_KEY, DEFAULT_RETRY_SLEEP_INTERVAL))); 108 } 109 110 private String getServiceUser(ServiceType service) { 111 Configuration conf = getConf(); 112 switch (service) { 113 case HADOOP_DATANODE: 114 case HADOOP_NAMENODE: 115 return conf.get("hbase.it.clustermanager.hadoop.hdfs.user", "hdfs"); 116 case ZOOKEEPER_SERVER: 117 return conf.get("hbase.it.clustermanager.zookeeper.user", "zookeeper"); 118 default: 119 return conf.get("hbase.it.clustermanager.hbase.user", "hbase"); 120 } 121 } 122 123 /** 124 * Executes commands over SSH 125 */ 126 protected class RemoteShell extends Shell.ShellCommandExecutor { 127 private String hostname; 128 private String user; 129 130 public RemoteShell(String hostname, String[] execString, File dir, Map<String, String> env, 131 long timeout) { 132 super(execString, dir, env, timeout); 133 this.hostname = hostname; 134 } 135 136 public RemoteShell(String hostname, String[] execString, File dir, Map<String, String> env) { 137 super(execString, dir, env); 138 this.hostname = hostname; 139 } 140 141 public RemoteShell(String hostname, String[] execString, File dir) { 142 super(execString, dir); 143 this.hostname = hostname; 144 } 145 146 public RemoteShell(String hostname, String[] execString) { 147 super(execString); 148 this.hostname = hostname; 149 } 150 151 public RemoteShell(String hostname, String user, String[] execString) { 152 super(execString); 153 this.hostname = hostname; 154 this.user = user; 155 } 156 157 @Override 158 public String[] getExecString() { 159 String at = sshUserName.isEmpty() ? "" : "@"; 160 String remoteCmd = StringUtils.join(super.getExecString(), " "); 161 String cmd = String.format(tunnelCmd, sshOptions, sshUserName, at, hostname, remoteCmd, user); 162 LOG.info("Executing full command [" + cmd + "]"); 163 return new String[] { "/usr/bin/env", "bash", "-c", cmd }; 164 } 165 } 166 167 /** 168 * Executes commands over SSH 169 */ 170 protected class RemoteSudoShell extends Shell.ShellCommandExecutor { 171 private String hostname; 172 173 public RemoteSudoShell(String hostname, String[] execString, long timeout) { 174 this(hostname, execString, null, null, timeout); 175 } 176 177 public RemoteSudoShell(String hostname, String[] execString, File dir, Map<String, String> env, 178 long timeout) { 179 super(execString, dir, env, timeout); 180 this.hostname = hostname; 181 } 182 183 @Override 184 public String[] getExecString() { 185 String at = sshUserName.isEmpty() ? "" : "@"; 186 String remoteCmd = StringUtils.join(super.getExecString(), " "); 187 String cmd = String.format(tunnelSudoCmd, sshOptions, sshUserName, at, hostname, remoteCmd, 188 timeOutInterval/1000f); 189 LOG.info("Executing full command [" + cmd + "]"); 190 return new String[] { "/usr/bin/env", "bash", "-c", cmd }; 191 } 192 } 193 194 /** 195 * Provides command strings for services to be executed by Shell. CommandProviders are 196 * pluggable, and different deployments(windows, bigtop, etc) can be managed by 197 * plugging-in custom CommandProvider's or ClusterManager's. 198 */ 199 static abstract class CommandProvider { 200 201 enum Operation { 202 START, STOP, RESTART 203 } 204 205 public abstract String getCommand(ServiceType service, Operation op); 206 207 public String isRunningCommand(ServiceType service) { 208 return findPidCommand(service); 209 } 210 211 protected String findPidCommand(ServiceType service) { 212 return String.format("ps ux | grep proc_%s | grep -v grep | tr -s ' ' | cut -d ' ' -f2", 213 service); 214 } 215 216 public String signalCommand(ServiceType service, String signal) { 217 return String.format("%s | xargs kill -s %s", findPidCommand(service), signal); 218 } 219 } 220 221 /** 222 * CommandProvider to manage the service using bin/hbase-* scripts 223 */ 224 static class HBaseShellCommandProvider extends CommandProvider { 225 private final String hbaseHome; 226 private final String confDir; 227 228 HBaseShellCommandProvider(Configuration conf) { 229 hbaseHome = conf.get("hbase.it.clustermanager.hbase.home", 230 System.getenv("HBASE_HOME")); 231 String tmp = conf.get("hbase.it.clustermanager.hbase.conf.dir", 232 System.getenv("HBASE_CONF_DIR")); 233 if (tmp != null) { 234 confDir = String.format("--config %s", tmp); 235 } else { 236 confDir = ""; 237 } 238 } 239 240 @Override 241 public String getCommand(ServiceType service, Operation op) { 242 return String.format("%s/bin/hbase-daemon.sh %s %s %s", hbaseHome, confDir, 243 op.toString().toLowerCase(Locale.ROOT), service); 244 } 245 } 246 247 /** 248 * CommandProvider to manage the service using sbin/hadoop-* scripts. 249 */ 250 static class HadoopShellCommandProvider extends CommandProvider { 251 private final String hadoopHome; 252 private final String confDir; 253 254 HadoopShellCommandProvider(Configuration conf) throws IOException { 255 hadoopHome = conf.get("hbase.it.clustermanager.hadoop.home", 256 System.getenv("HADOOP_HOME")); 257 String tmp = conf.get("hbase.it.clustermanager.hadoop.conf.dir", 258 System.getenv("HADOOP_CONF_DIR")); 259 if (hadoopHome == null) { 260 throw new IOException("Hadoop home configuration parameter i.e. " + 261 "'hbase.it.clustermanager.hadoop.home' is not configured properly."); 262 } 263 if (tmp != null) { 264 confDir = String.format("--config %s", tmp); 265 } else { 266 confDir = ""; 267 } 268 } 269 270 @Override 271 public String getCommand(ServiceType service, Operation op) { 272 return String.format("%s/sbin/hadoop-daemon.sh %s %s %s", hadoopHome, confDir, 273 op.toString().toLowerCase(Locale.ROOT), service); 274 } 275 } 276 277 /** 278 * CommandProvider to manage the service using bin/zk* scripts. 279 */ 280 static class ZookeeperShellCommandProvider extends CommandProvider { 281 private final String zookeeperHome; 282 private final String confDir; 283 284 ZookeeperShellCommandProvider(Configuration conf) throws IOException { 285 zookeeperHome = conf.get("hbase.it.clustermanager.zookeeper.home", 286 System.getenv("ZOOBINDIR")); 287 String tmp = conf.get("hbase.it.clustermanager.zookeeper.conf.dir", 288 System.getenv("ZOOCFGDIR")); 289 if (zookeeperHome == null) { 290 throw new IOException("ZooKeeper home configuration parameter i.e. " + 291 "'hbase.it.clustermanager.zookeeper.home' is not configured properly."); 292 } 293 if (tmp != null) { 294 confDir = String.format("--config %s", tmp); 295 } else { 296 confDir = ""; 297 } 298 } 299 300 @Override 301 public String getCommand(ServiceType service, Operation op) { 302 return String.format("%s/bin/zkServer.sh %s", zookeeperHome, op.toString().toLowerCase(Locale.ROOT)); 303 } 304 305 @Override 306 protected String findPidCommand(ServiceType service) { 307 return String.format("ps ux | grep %s | grep -v grep | tr -s ' ' | cut -d ' ' -f2", 308 service); 309 } 310 } 311 312 public HBaseClusterManager() { 313 } 314 315 protected CommandProvider getCommandProvider(ServiceType service) throws IOException { 316 switch (service) { 317 case HADOOP_DATANODE: 318 case HADOOP_NAMENODE: 319 return new HadoopShellCommandProvider(getConf()); 320 case ZOOKEEPER_SERVER: 321 return new ZookeeperShellCommandProvider(getConf()); 322 default: 323 return new HBaseShellCommandProvider(getConf()); 324 } 325 } 326 327 /** 328 * Execute the given command on the host using SSH 329 * @return pair of exit code and command output 330 * @throws IOException if something goes wrong. 331 */ 332 private Pair<Integer, String> exec(String hostname, ServiceType service, String... cmd) 333 throws IOException { 334 LOG.info("Executing remote command: {} , hostname:{}", StringUtils.join(cmd, " "), 335 hostname); 336 337 RemoteShell shell = new RemoteShell(hostname, getServiceUser(service), cmd); 338 try { 339 shell.execute(); 340 } catch (Shell.ExitCodeException ex) { 341 // capture the stdout of the process as well. 342 String output = shell.getOutput(); 343 // add output for the ExitCodeException. 344 throw new Shell.ExitCodeException(ex.getExitCode(), "stderr: " + ex.getMessage() 345 + ", stdout: " + output); 346 } 347 348 LOG.info("Executed remote command, exit code:{} , output:{}", shell.getExitCode(), 349 shell.getOutput()); 350 351 return new Pair<>(shell.getExitCode(), shell.getOutput()); 352 } 353 354 private Pair<Integer, String> execWithRetries(String hostname, ServiceType service, String... cmd) 355 throws IOException { 356 RetryCounter retryCounter = retryCounterFactory.create(); 357 while (true) { 358 try { 359 return exec(hostname, service, cmd); 360 } catch (IOException e) { 361 retryOrThrow(retryCounter, e, hostname, cmd); 362 } 363 try { 364 retryCounter.sleepUntilNextRetry(); 365 } catch (InterruptedException ex) { 366 // ignore 367 LOG.warn("Sleep Interrupted:", ex); 368 } 369 } 370 } 371 372 /** 373 * Execute the given command on the host using SSH 374 * @return pair of exit code and command output 375 * @throws IOException if something goes wrong. 376 */ 377 public Pair<Integer, String> execSudo(String hostname, long timeout, String... cmd) 378 throws IOException { 379 LOG.info("Executing remote command: {} , hostname:{}", StringUtils.join(cmd, " "), 380 hostname); 381 382 RemoteSudoShell shell = new RemoteSudoShell(hostname, cmd, timeout); 383 try { 384 shell.execute(); 385 } catch (Shell.ExitCodeException ex) { 386 // capture the stdout of the process as well. 387 String output = shell.getOutput(); 388 // add output for the ExitCodeException. 389 throw new Shell.ExitCodeException(ex.getExitCode(), "stderr: " + ex.getMessage() 390 + ", stdout: " + output); 391 } 392 393 LOG.info("Executed remote command, exit code:{} , output:{}", shell.getExitCode(), 394 shell.getOutput()); 395 396 return new Pair<>(shell.getExitCode(), shell.getOutput()); 397 } 398 399 public Pair<Integer, String> execSudoWithRetries(String hostname, long timeout, String... cmd) 400 throws IOException { 401 RetryCounter retryCounter = retryCounterFactory.create(); 402 while (true) { 403 try { 404 return execSudo(hostname, timeout, cmd); 405 } catch (IOException e) { 406 retryOrThrow(retryCounter, e, hostname, cmd); 407 } 408 try { 409 retryCounter.sleepUntilNextRetry(); 410 } catch (InterruptedException ex) { 411 // ignore 412 LOG.warn("Sleep Interrupted:", ex); 413 } 414 } 415 } 416 417 private <E extends Exception> void retryOrThrow(RetryCounter retryCounter, E ex, 418 String hostname, String[] cmd) throws E { 419 if (retryCounter.shouldRetry()) { 420 LOG.warn("Remote command: " + StringUtils.join(cmd, " ") + " , hostname:" + hostname 421 + " failed at attempt " + retryCounter.getAttemptTimes() + ". Retrying until maxAttempts: " 422 + retryCounter.getMaxAttempts() + ". Exception: " + ex.getMessage()); 423 return; 424 } 425 throw ex; 426 } 427 428 private void exec(String hostname, ServiceType service, Operation op) throws IOException { 429 execWithRetries(hostname, service, getCommandProvider(service).getCommand(service, op)); 430 } 431 432 @Override 433 public void start(ServiceType service, String hostname, int port) throws IOException { 434 exec(hostname, service, Operation.START); 435 } 436 437 @Override 438 public void stop(ServiceType service, String hostname, int port) throws IOException { 439 exec(hostname, service, Operation.STOP); 440 } 441 442 @Override 443 public void restart(ServiceType service, String hostname, int port) throws IOException { 444 exec(hostname, service, Operation.RESTART); 445 } 446 447 public void signal(ServiceType service, String signal, String hostname) throws IOException { 448 execWithRetries(hostname, service, getCommandProvider(service).signalCommand(service, signal)); 449 } 450 451 @Override 452 public boolean isRunning(ServiceType service, String hostname, int port) throws IOException { 453 String ret = execWithRetries(hostname, service, 454 getCommandProvider(service).isRunningCommand(service)).getSecond(); 455 return ret.length() > 0; 456 } 457 458 @Override 459 public void kill(ServiceType service, String hostname, int port) throws IOException { 460 signal(service, SIGKILL, hostname); 461 } 462 463 @Override 464 public void suspend(ServiceType service, String hostname, int port) throws IOException { 465 signal(service, SIGSTOP, hostname); 466 } 467 468 @Override 469 public void resume(ServiceType service, String hostname, int port) throws IOException { 470 signal(service, SIGCONT, hostname); 471 } 472}