001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import java.io.File;
021import java.io.IOException;
022import java.util.Locale;
023import java.util.Map;
024import org.apache.commons.lang3.StringUtils;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.conf.Configured;
027import org.apache.hadoop.hbase.HBaseClusterManager.CommandProvider.Operation;
028import org.apache.hadoop.hbase.util.Pair;
029import org.apache.hadoop.hbase.util.ReflectionUtils;
030import org.apache.hadoop.hbase.util.RetryCounter;
031import org.apache.hadoop.hbase.util.RetryCounter.RetryConfig;
032import org.apache.hadoop.hbase.util.RetryCounterFactory;
033import org.apache.hadoop.util.Shell;
034import org.apache.yetus.audience.InterfaceAudience;
035import org.slf4j.Logger;
036import org.slf4j.LoggerFactory;
037
038/**
039 * A default cluster manager for HBase. Uses SSH, and hbase shell scripts to manage the cluster.
040 * Assumes Unix-like commands are available like 'ps', 'kill', etc. Also assumes the user running
041 * the test has enough "power" to start & stop servers on the remote machines (for example, the test
042 * user could be the same user as the user the daemon is running as)
043 */
044@InterfaceAudience.Private
045public class HBaseClusterManager extends Configured implements ClusterManager {
046
047  protected enum Signal {
048    SIGKILL,
049    SIGSTOP,
050    SIGCONT,
051  }
052
053  protected static final Logger LOG = LoggerFactory.getLogger(HBaseClusterManager.class);
054  private String sshUserName;
055  private String sshOptions;
056
057  /**
058   * The command format that is used to execute the remote command. Arguments: 1 SSH options, 2 user
059   * name , 3 "@" if username is set, 4 host, 5 original command, 6 service user.
060   */
061  private static final String DEFAULT_TUNNEL_CMD =
062    "timeout 30 /usr/bin/ssh %1$s %2$s%3$s%4$s \"sudo -u %6$s %5$s\"";
063  private String tunnelCmd;
064
065  /**
066   * The command format that is used to execute the remote command with sudo. Arguments: 1 SSH
067   * options, 2 user name , 3 "@" if username is set, 4 host, 5 original command, 6 timeout.
068   */
069  private static final String DEFAULT_TUNNEL_SUDO_CMD =
070    "timeout %6$s /usr/bin/ssh %1$s %2$s%3$s%4$s \"sudo %5$s\"";
071  private String tunnelSudoCmd;
072
073  static final String RETRY_ATTEMPTS_KEY = "hbase.it.clustermanager.retry.attempts";
074  static final int DEFAULT_RETRY_ATTEMPTS = 5;
075
076  static final String RETRY_SLEEP_INTERVAL_KEY = "hbase.it.clustermanager.retry.sleep.interval";
077  static final int DEFAULT_RETRY_SLEEP_INTERVAL = 1000;
078
079  protected RetryCounterFactory retryCounterFactory;
080
081  @Override
082  public void setConf(Configuration conf) {
083    super.setConf(conf);
084    if (conf == null) {
085      // Configured gets passed null before real conf. Why? I don't know.
086      return;
087    }
088    sshUserName = conf.get("hbase.it.clustermanager.ssh.user", "");
089    String extraSshOptions = conf.get("hbase.it.clustermanager.ssh.opts", "-o ConnectTimeout=10");
090    sshOptions = System.getenv("HBASE_SSH_OPTS");
091    if (!extraSshOptions.isEmpty()) {
092      sshOptions = StringUtils.join(new Object[] { sshOptions, extraSshOptions }, " ");
093    }
094    sshOptions = (sshOptions == null) ? "" : sshOptions;
095    sshUserName = (sshUserName == null) ? "" : sshUserName;
096    tunnelCmd = conf.get("hbase.it.clustermanager.ssh.cmd", DEFAULT_TUNNEL_CMD);
097    tunnelSudoCmd = conf.get("hbase.it.clustermanager.ssh.sudo.cmd", DEFAULT_TUNNEL_SUDO_CMD);
098    // Print out ssh special config if any.
099    if (
100      (sshUserName != null && sshUserName.length() > 0)
101        || (sshOptions != null && sshOptions.length() > 0)
102    ) {
103      LOG.info("Running with SSH user [" + sshUserName + "] and options [" + sshOptions + "]");
104    }
105
106    this.retryCounterFactory = new RetryCounterFactory(
107      new RetryConfig().setMaxAttempts(conf.getInt(RETRY_ATTEMPTS_KEY, DEFAULT_RETRY_ATTEMPTS))
108        .setSleepInterval(conf.getLong(RETRY_SLEEP_INTERVAL_KEY, DEFAULT_RETRY_SLEEP_INTERVAL)));
109  }
110
111  protected String getServiceUser(ServiceType service) {
112    Configuration conf = getConf();
113    switch (service) {
114      case HADOOP_DATANODE:
115      case HADOOP_NAMENODE:
116        return conf.get("hbase.it.clustermanager.hadoop.hdfs.user", "hdfs");
117      case ZOOKEEPER_SERVER:
118        return conf.get("hbase.it.clustermanager.zookeeper.user", "zookeeper");
119      default:
120        return conf.get("hbase.it.clustermanager.hbase.user", "hbase");
121    }
122  }
123
124  /**
125   * Executes commands over SSH
126   */
127  protected class RemoteShell extends Shell.ShellCommandExecutor {
128    private String hostname;
129    private String user;
130
131    public RemoteShell(String hostname, String[] execString, File dir, Map<String, String> env,
132      long timeout) {
133      super(execString, dir, env, timeout);
134      this.hostname = hostname;
135    }
136
137    public RemoteShell(String hostname, String[] execString, File dir, Map<String, String> env) {
138      super(execString, dir, env);
139      this.hostname = hostname;
140    }
141
142    public RemoteShell(String hostname, String[] execString, File dir) {
143      super(execString, dir);
144      this.hostname = hostname;
145    }
146
147    public RemoteShell(String hostname, String[] execString) {
148      super(execString);
149      this.hostname = hostname;
150    }
151
152    public RemoteShell(String hostname, String user, String[] execString) {
153      super(execString);
154      this.hostname = hostname;
155      this.user = user;
156    }
157
158    @Override
159    public String[] getExecString() {
160      String at = sshUserName.isEmpty() ? "" : "@";
161      String remoteCmd = StringUtils.join(super.getExecString(), " ");
162      String cmd = String.format(tunnelCmd, sshOptions, sshUserName, at, hostname, remoteCmd, user);
163      LOG.info("Executing full command [" + cmd + "]");
164      return new String[] { "/usr/bin/env", "bash", "-c", cmd };
165    }
166  }
167
168  /**
169   * Executes commands over SSH
170   */
171  protected class RemoteSudoShell extends Shell.ShellCommandExecutor {
172    private String hostname;
173
174    public RemoteSudoShell(String hostname, String[] execString, long timeout) {
175      this(hostname, execString, null, null, timeout);
176    }
177
178    public RemoteSudoShell(String hostname, String[] execString, File dir, Map<String, String> env,
179      long timeout) {
180      super(execString, dir, env, timeout);
181      this.hostname = hostname;
182    }
183
184    @Override
185    public String[] getExecString() {
186      String at = sshUserName.isEmpty() ? "" : "@";
187      String remoteCmd = StringUtils.join(super.getExecString(), " ");
188      String cmd = String.format(tunnelSudoCmd, sshOptions, sshUserName, at, hostname, remoteCmd,
189        timeOutInterval / 1000f);
190      LOG.info("Executing full command [" + cmd + "]");
191      return new String[] { "/usr/bin/env", "bash", "-c", cmd };
192    }
193  }
194
195  /**
196   * Provides command strings for services to be executed by Shell. CommandProviders are pluggable,
197   * and different deployments(windows, bigtop, etc) can be managed by plugging-in custom
198   * CommandProvider's or ClusterManager's.
199   */
200  static abstract class CommandProvider {
201
202    enum Operation {
203      START,
204      STOP,
205      RESTART
206    }
207
208    public abstract String getCommand(ServiceType service, Operation op);
209
210    public String isRunningCommand(ServiceType service) {
211      return findPidCommand(service);
212    }
213
214    protected String findPidCommand(ServiceType service) {
215      return String.format("ps ux | grep proc_%s | grep -v grep | tr -s ' ' | cut -d ' ' -f2",
216        service);
217    }
218
219    protected String getStateCommand(ServiceType service) {
220      return String.format("%s | xargs ps -o state= -p ", findPidCommand(service));
221    }
222
223    public String signalCommand(ServiceType service, String signal) {
224      return String.format("%s | xargs kill -s %s", findPidCommand(service), signal);
225    }
226  }
227
228  /**
229   * CommandProvider to manage the service using bin/hbase-* scripts
230   */
231  static class HBaseShellCommandProvider extends CommandProvider {
232    private final String hbaseHome;
233    private final String confDir;
234
235    HBaseShellCommandProvider(Configuration conf) {
236      hbaseHome = conf.get("hbase.it.clustermanager.hbase.home", System.getenv("HBASE_HOME"));
237      String tmp =
238        conf.get("hbase.it.clustermanager.hbase.conf.dir", System.getenv("HBASE_CONF_DIR"));
239      if (tmp != null) {
240        confDir = String.format("--config %s", tmp);
241      } else {
242        confDir = "";
243      }
244    }
245
246    @Override
247    public String getCommand(ServiceType service, Operation op) {
248      return String.format("%s/bin/hbase-daemon.sh %s %s %s", hbaseHome, confDir,
249        op.toString().toLowerCase(Locale.ROOT), service);
250    }
251  }
252
253  /**
254   * CommandProvider to manage the service using sbin/hadoop-* scripts.
255   */
256  static class HadoopShellCommandProvider extends CommandProvider {
257    private final String hadoopHome;
258    private final String confDir;
259
260    HadoopShellCommandProvider(Configuration conf) throws IOException {
261      hadoopHome = conf.get("hbase.it.clustermanager.hadoop.home", System.getenv("HADOOP_HOME"));
262      String tmp =
263        conf.get("hbase.it.clustermanager.hadoop.conf.dir", System.getenv("HADOOP_CONF_DIR"));
264      if (hadoopHome == null) {
265        throw new IOException("Hadoop home configuration parameter i.e. "
266          + "'hbase.it.clustermanager.hadoop.home' is not configured properly.");
267      }
268      if (tmp != null) {
269        confDir = String.format("--config %s", tmp);
270      } else {
271        confDir = "";
272      }
273    }
274
275    @Override
276    public String getCommand(ServiceType service, Operation op) {
277      return String.format("%s/sbin/hadoop-daemon.sh %s %s %s", hadoopHome, confDir,
278        op.toString().toLowerCase(Locale.ROOT), service);
279    }
280  }
281
282  /**
283   * CommandProvider to manage the service using bin/zk* scripts.
284   */
285  static class ZookeeperShellCommandProvider extends CommandProvider {
286    private final String zookeeperHome;
287
288    ZookeeperShellCommandProvider(Configuration conf) throws IOException {
289      zookeeperHome =
290        conf.get("hbase.it.clustermanager.zookeeper.home", System.getenv("ZOOBINDIR"));
291      if (zookeeperHome == null) {
292        throw new IOException("ZooKeeper home configuration parameter i.e. "
293          + "'hbase.it.clustermanager.zookeeper.home' is not configured properly.");
294      }
295    }
296
297    @Override
298    public String getCommand(ServiceType service, Operation op) {
299      return String.format("%s/bin/zkServer.sh %s", zookeeperHome,
300        op.toString().toLowerCase(Locale.ROOT));
301    }
302
303    @Override
304    protected String findPidCommand(ServiceType service) {
305      return String.format("ps ux | grep %s | grep -v grep | tr -s ' ' | cut -d ' ' -f2", service);
306    }
307  }
308
309  public HBaseClusterManager() {
310  }
311
312  protected CommandProvider getCommandProvider(ServiceType service) throws IOException {
313    switch (service) {
314      case HADOOP_DATANODE:
315      case HADOOP_NAMENODE:
316        return new HadoopShellCommandProvider(getConf());
317      case ZOOKEEPER_SERVER:
318        return new ZookeeperShellCommandProvider(getConf());
319      default:
320        Class<? extends CommandProvider> provider =
321          getConf().getClass("hbase.it.clustermanager.hbase.command.provider",
322            HBaseShellCommandProvider.class, CommandProvider.class);
323        return ReflectionUtils.newInstance(provider, getConf());
324    }
325  }
326
327  /**
328   * Execute the given command on the host using SSH
329   * @return pair of exit code and command output
330   * @throws IOException if something goes wrong.
331   */
332  protected Pair<Integer, String> exec(String hostname, ServiceType service, String... cmd)
333    throws IOException {
334    LOG.info("Executing remote command: {}, hostname:{}", StringUtils.join(cmd, " "), hostname);
335
336    RemoteShell shell = new RemoteShell(hostname, getServiceUser(service), cmd);
337    try {
338      shell.execute();
339    } catch (Shell.ExitCodeException ex) {
340      // capture the stdout of the process as well.
341      String output = shell.getOutput();
342      // add output for the ExitCodeException.
343      throw new Shell.ExitCodeException(ex.getExitCode(),
344        "stderr: " + ex.getMessage() + ", stdout: " + output);
345    }
346
347    LOG.info("Executed remote command, exit code:{} , output:{}", shell.getExitCode(),
348      shell.getOutput());
349
350    return new Pair<>(shell.getExitCode(), shell.getOutput());
351  }
352
353  private Pair<Integer, String> execWithRetries(String hostname, ServiceType service, String... cmd)
354    throws IOException {
355    RetryCounter retryCounter = retryCounterFactory.create();
356    while (true) {
357      try {
358        return exec(hostname, service, cmd);
359      } catch (IOException e) {
360        retryOrThrow(retryCounter, e, hostname, cmd);
361      }
362      try {
363        retryCounter.sleepUntilNextRetry();
364      } catch (InterruptedException ex) {
365        // ignore
366        LOG.warn("Sleep Interrupted:", ex);
367      }
368    }
369  }
370
371  /**
372   * Execute the given command on the host using SSH
373   * @return pair of exit code and command output
374   * @throws IOException if something goes wrong.
375   */
376  public Pair<Integer, String> execSudo(String hostname, long timeout, String... cmd)
377    throws IOException {
378    LOG.info("Executing remote command: {} , hostname:{}", StringUtils.join(cmd, " "), hostname);
379
380    RemoteSudoShell shell = new RemoteSudoShell(hostname, cmd, timeout);
381    try {
382      shell.execute();
383    } catch (Shell.ExitCodeException ex) {
384      // capture the stdout of the process as well.
385      String output = shell.getOutput();
386      // add output for the ExitCodeException.
387      throw new Shell.ExitCodeException(ex.getExitCode(),
388        "stderr: " + ex.getMessage() + ", stdout: " + output);
389    }
390
391    LOG.info("Executed remote command, exit code:{} , output:{}", shell.getExitCode(),
392      shell.getOutput());
393
394    return new Pair<>(shell.getExitCode(), shell.getOutput());
395  }
396
397  public Pair<Integer, String> execSudoWithRetries(String hostname, long timeout, String... cmd)
398    throws IOException {
399    RetryCounter retryCounter = retryCounterFactory.create();
400    while (true) {
401      try {
402        return execSudo(hostname, timeout, cmd);
403      } catch (IOException e) {
404        retryOrThrow(retryCounter, e, hostname, cmd);
405      }
406      try {
407        retryCounter.sleepUntilNextRetry();
408      } catch (InterruptedException ex) {
409        // ignore
410        LOG.warn("Sleep Interrupted:", ex);
411      }
412    }
413  }
414
415  private <E extends Exception> void retryOrThrow(RetryCounter retryCounter, E ex, String hostname,
416    String[] cmd) throws E {
417    if (retryCounter.shouldRetry()) {
418      LOG.warn("Remote command: " + StringUtils.join(cmd, " ") + " , hostname:" + hostname
419        + " failed at attempt " + retryCounter.getAttemptTimes() + ". Retrying until maxAttempts: "
420        + retryCounter.getMaxAttempts() + ". Exception: " + ex.getMessage());
421      return;
422    }
423    throw ex;
424  }
425
426  private void exec(String hostname, ServiceType service, Operation op) throws IOException {
427    execWithRetries(hostname, service, getCommandProvider(service).getCommand(service, op));
428  }
429
430  @Override
431  public void start(ServiceType service, String hostname, int port) throws IOException {
432    exec(hostname, service, Operation.START);
433  }
434
435  @Override
436  public void stop(ServiceType service, String hostname, int port) throws IOException {
437    exec(hostname, service, Operation.STOP);
438  }
439
440  @Override
441  public void restart(ServiceType service, String hostname, int port) throws IOException {
442    exec(hostname, service, Operation.RESTART);
443  }
444
445  public void signal(ServiceType service, Signal signal, String hostname) throws IOException {
446    execWithRetries(hostname, service,
447      getCommandProvider(service).signalCommand(service, signal.toString()));
448  }
449
450  @Override
451  public boolean isRunning(ServiceType service, String hostname, int port) throws IOException {
452    String ret =
453      execWithRetries(hostname, service, getCommandProvider(service).isRunningCommand(service))
454        .getSecond();
455    return ret.length() > 0;
456  }
457
458  @Override
459  public void kill(ServiceType service, String hostname, int port) throws IOException {
460    signal(service, Signal.SIGKILL, hostname);
461  }
462
463  @Override
464  public void suspend(ServiceType service, String hostname, int port) throws IOException {
465    signal(service, Signal.SIGSTOP, hostname);
466  }
467
468  @Override
469  public void resume(ServiceType service, String hostname, int port) throws IOException {
470    signal(service, Signal.SIGCONT, hostname);
471  }
472
473  public boolean isSuspended(ServiceType service, String hostname, int port) throws IOException {
474    String ret =
475      execWithRetries(hostname, service, getCommandProvider(service).getStateCommand(service))
476        .getSecond();
477    return ret != null && ret.trim().equals("T");
478  }
479
480  public boolean isResumed(ServiceType service, String hostname, int port) throws IOException {
481    String ret =
482      execWithRetries(hostname, service, getCommandProvider(service).getStateCommand(service))
483        .getSecond();
484    return ret != null && !ret.trim().equals("T");
485  }
486}