001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase;
020
021import java.io.File;
022import java.io.IOException;
023import java.util.Locale;
024import java.util.Map;
025import org.apache.commons.lang3.StringUtils;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.conf.Configured;
028import org.apache.hadoop.hbase.HBaseClusterManager.CommandProvider.Operation;
029import org.apache.hadoop.hbase.util.Pair;
030import org.apache.hadoop.hbase.util.ReflectionUtils;
031import org.apache.hadoop.hbase.util.RetryCounter;
032import org.apache.hadoop.hbase.util.RetryCounter.RetryConfig;
033import org.apache.hadoop.hbase.util.RetryCounterFactory;
034import org.apache.hadoop.util.Shell;
035import org.apache.yetus.audience.InterfaceAudience;
036import org.slf4j.Logger;
037import org.slf4j.LoggerFactory;
038
039/**
040 * A default cluster manager for HBase. Uses SSH, and hbase shell scripts
041 * to manage the cluster. Assumes Unix-like commands are available like 'ps',
042 * 'kill', etc. Also assumes the user running the test has enough "power" to start & stop
043 * servers on the remote machines (for example, the test user could be the same user as the
044 * user the daemon is running as)
045 */
046@InterfaceAudience.Private
047public class HBaseClusterManager extends Configured implements ClusterManager {
048
049  protected enum Signal {
050    SIGKILL,
051    SIGSTOP,
052    SIGCONT,
053  }
054
055  protected static final Logger LOG = LoggerFactory.getLogger(HBaseClusterManager.class);
056  private String sshUserName;
057  private String sshOptions;
058
059  /**
060   * The command format that is used to execute the remote command. Arguments:
061   * 1 SSH options, 2 user name , 3 "@" if username is set, 4 host,
062   * 5 original command, 6 service user.
063   */
064  private static final String DEFAULT_TUNNEL_CMD =
065      "timeout 30 /usr/bin/ssh %1$s %2$s%3$s%4$s \"sudo -u %6$s %5$s\"";
066  private String tunnelCmd;
067
068  /**
069   * The command format that is used to execute the remote command with sudo. Arguments:
070   * 1 SSH options, 2 user name , 3 "@" if username is set, 4 host,
071   * 5 original command, 6 timeout.
072   */
073  private static final String DEFAULT_TUNNEL_SUDO_CMD =
074      "timeout %6$s /usr/bin/ssh %1$s %2$s%3$s%4$s \"sudo %5$s\"";
075  private String tunnelSudoCmd;
076
077  static final String RETRY_ATTEMPTS_KEY = "hbase.it.clustermanager.retry.attempts";
078  static final int DEFAULT_RETRY_ATTEMPTS = 5;
079
080  static final String RETRY_SLEEP_INTERVAL_KEY = "hbase.it.clustermanager.retry.sleep.interval";
081  static final int DEFAULT_RETRY_SLEEP_INTERVAL = 1000;
082
083  protected RetryCounterFactory retryCounterFactory;
084
085  @Override
086  public void setConf(Configuration conf) {
087    super.setConf(conf);
088    if (conf == null) {
089      // Configured gets passed null before real conf. Why? I don't know.
090      return;
091    }
092    sshUserName = conf.get("hbase.it.clustermanager.ssh.user", "");
093    String extraSshOptions = conf.get("hbase.it.clustermanager.ssh.opts", "");
094    sshOptions = System.getenv("HBASE_SSH_OPTS");
095    if (!extraSshOptions.isEmpty()) {
096      sshOptions = StringUtils.join(new Object[] { sshOptions, extraSshOptions }, " ");
097    }
098    sshOptions = (sshOptions == null) ? "" : sshOptions;
099    sshUserName = (sshUserName == null) ? "" : sshUserName;
100    tunnelCmd = conf.get("hbase.it.clustermanager.ssh.cmd", DEFAULT_TUNNEL_CMD);
101    tunnelSudoCmd = conf.get("hbase.it.clustermanager.ssh.sudo.cmd", DEFAULT_TUNNEL_SUDO_CMD);
102    // Print out ssh special config if any.
103    if ((sshUserName != null && sshUserName.length() > 0) ||
104        (sshOptions != null && sshOptions.length() > 0)) {
105      LOG.info("Running with SSH user [" + sshUserName + "] and options [" + sshOptions + "]");
106    }
107
108    this.retryCounterFactory = new RetryCounterFactory(new RetryConfig()
109        .setMaxAttempts(conf.getInt(RETRY_ATTEMPTS_KEY, DEFAULT_RETRY_ATTEMPTS))
110        .setSleepInterval(conf.getLong(RETRY_SLEEP_INTERVAL_KEY, DEFAULT_RETRY_SLEEP_INTERVAL)));
111  }
112
113  protected String getServiceUser(ServiceType service) {
114    Configuration conf = getConf();
115    switch (service) {
116      case HADOOP_DATANODE:
117      case HADOOP_NAMENODE:
118        return conf.get("hbase.it.clustermanager.hadoop.hdfs.user", "hdfs");
119      case ZOOKEEPER_SERVER:
120        return conf.get("hbase.it.clustermanager.zookeeper.user", "zookeeper");
121      default:
122        return conf.get("hbase.it.clustermanager.hbase.user", "hbase");
123    }
124  }
125
126  /**
127   * Executes commands over SSH
128   */
129  protected class RemoteShell extends Shell.ShellCommandExecutor {
130    private String hostname;
131    private String user;
132
133    public RemoteShell(String hostname, String[] execString, File dir, Map<String, String> env,
134        long timeout) {
135      super(execString, dir, env, timeout);
136      this.hostname = hostname;
137    }
138
139    public RemoteShell(String hostname, String[] execString, File dir, Map<String, String> env) {
140      super(execString, dir, env);
141      this.hostname = hostname;
142    }
143
144    public RemoteShell(String hostname, String[] execString, File dir) {
145      super(execString, dir);
146      this.hostname = hostname;
147    }
148
149    public RemoteShell(String hostname, String[] execString) {
150      super(execString);
151      this.hostname = hostname;
152    }
153
154    public RemoteShell(String hostname, String user, String[] execString) {
155      super(execString);
156      this.hostname = hostname;
157      this.user = user;
158    }
159
160    @Override
161    public String[] getExecString() {
162      String at = sshUserName.isEmpty() ? "" : "@";
163      String remoteCmd = StringUtils.join(super.getExecString(), " ");
164      String cmd = String.format(tunnelCmd, sshOptions, sshUserName, at, hostname, remoteCmd, user);
165      LOG.info("Executing full command [" + cmd + "]");
166      return new String[] { "/usr/bin/env", "bash", "-c", cmd };
167    }
168  }
169
170  /**
171   * Executes commands over SSH
172   */
173  protected class RemoteSudoShell extends Shell.ShellCommandExecutor {
174    private String hostname;
175
176    public RemoteSudoShell(String hostname, String[] execString, long timeout) {
177      this(hostname, execString, null, null, timeout);
178    }
179
180    public RemoteSudoShell(String hostname, String[] execString, File dir, Map<String, String> env,
181        long timeout) {
182      super(execString, dir, env, timeout);
183      this.hostname = hostname;
184    }
185
186    @Override
187    public String[] getExecString() {
188      String at = sshUserName.isEmpty() ? "" : "@";
189      String remoteCmd = StringUtils.join(super.getExecString(), " ");
190      String cmd = String.format(tunnelSudoCmd, sshOptions, sshUserName, at, hostname, remoteCmd,
191          timeOutInterval/1000f);
192      LOG.info("Executing full command [" + cmd + "]");
193      return new String[] { "/usr/bin/env", "bash", "-c", cmd };
194    }
195  }
196
197  /**
198   * Provides command strings for services to be executed by Shell. CommandProviders are
199   * pluggable, and different deployments(windows, bigtop, etc) can be managed by
200   * plugging-in custom CommandProvider's or ClusterManager's.
201   */
202  static abstract class CommandProvider {
203
204    enum Operation {
205      START, STOP, RESTART
206    }
207
208    public abstract String getCommand(ServiceType service, Operation op);
209
210    public String isRunningCommand(ServiceType service) {
211      return findPidCommand(service);
212    }
213
214    protected String findPidCommand(ServiceType service) {
215      return String.format("ps ux | grep proc_%s | grep -v grep | tr -s ' ' | cut -d ' ' -f2",
216          service);
217    }
218
219    public String signalCommand(ServiceType service, String signal) {
220      return String.format("%s | xargs sudo kill -s %s", findPidCommand(service), signal);
221    }
222  }
223
224  /**
225   * CommandProvider to manage the service using bin/hbase-* scripts
226   */
227  static class HBaseShellCommandProvider extends CommandProvider {
228    private final String hbaseHome;
229    private final String confDir;
230
231    HBaseShellCommandProvider(Configuration conf) {
232      hbaseHome = conf.get("hbase.it.clustermanager.hbase.home",
233        System.getenv("HBASE_HOME"));
234      String tmp = conf.get("hbase.it.clustermanager.hbase.conf.dir",
235        System.getenv("HBASE_CONF_DIR"));
236      if (tmp != null) {
237        confDir = String.format("--config %s", tmp);
238      } else {
239        confDir = "";
240      }
241    }
242
243    @Override
244    public String getCommand(ServiceType service, Operation op) {
245      return String.format("%s/bin/hbase-daemon.sh %s %s %s", hbaseHome, confDir,
246          op.toString().toLowerCase(Locale.ROOT), service);
247    }
248  }
249
250  /**
251   * CommandProvider to manage the service using sbin/hadoop-* scripts.
252   */
253  static class HadoopShellCommandProvider extends CommandProvider {
254    private final String hadoopHome;
255    private final String confDir;
256
257    HadoopShellCommandProvider(Configuration conf) throws IOException {
258      hadoopHome = conf.get("hbase.it.clustermanager.hadoop.home",
259          System.getenv("HADOOP_HOME"));
260      String tmp = conf.get("hbase.it.clustermanager.hadoop.conf.dir",
261          System.getenv("HADOOP_CONF_DIR"));
262      if (hadoopHome == null) {
263        throw new IOException("Hadoop home configuration parameter i.e. " +
264          "'hbase.it.clustermanager.hadoop.home' is not configured properly.");
265      }
266      if (tmp != null) {
267        confDir = String.format("--config %s", tmp);
268      } else {
269        confDir = "";
270      }
271    }
272
273    @Override
274    public String getCommand(ServiceType service, Operation op) {
275      return String.format("%s/sbin/hadoop-daemon.sh %s %s %s", hadoopHome, confDir,
276          op.toString().toLowerCase(Locale.ROOT), service);
277    }
278  }
279
280  /**
281   * CommandProvider to manage the service using bin/zk* scripts.
282   */
283  static class ZookeeperShellCommandProvider extends CommandProvider {
284    private final String zookeeperHome;
285    private final String confDir;
286
287    ZookeeperShellCommandProvider(Configuration conf) throws IOException {
288      zookeeperHome = conf.get("hbase.it.clustermanager.zookeeper.home",
289          System.getenv("ZOOBINDIR"));
290      String tmp = conf.get("hbase.it.clustermanager.zookeeper.conf.dir",
291          System.getenv("ZOOCFGDIR"));
292      if (zookeeperHome == null) {
293        throw new IOException("ZooKeeper home configuration parameter i.e. " +
294          "'hbase.it.clustermanager.zookeeper.home' is not configured properly.");
295      }
296      if (tmp != null) {
297        confDir = String.format("--config %s", tmp);
298      } else {
299        confDir = "";
300      }
301    }
302
303    @Override
304    public String getCommand(ServiceType service, Operation op) {
305      return String.format("%s/bin/zkServer.sh %s", zookeeperHome, op.toString().toLowerCase(Locale.ROOT));
306    }
307
308    @Override
309    protected String findPidCommand(ServiceType service) {
310      return String.format("ps ux | grep %s | grep -v grep | tr -s ' ' | cut -d ' ' -f2",
311        service);
312    }
313  }
314
315  public HBaseClusterManager() {
316  }
317
318  protected CommandProvider getCommandProvider(ServiceType service) throws IOException {
319    switch (service) {
320      case HADOOP_DATANODE:
321      case HADOOP_NAMENODE:
322        return new HadoopShellCommandProvider(getConf());
323      case ZOOKEEPER_SERVER:
324        return new ZookeeperShellCommandProvider(getConf());
325      default:
326        Class<? extends CommandProvider> provider = getConf()
327          .getClass("hbase.it.clustermanager.hbase.command.provider",
328            HBaseShellCommandProvider.class, CommandProvider.class);
329        return ReflectionUtils.newInstance(provider, getConf());
330    }
331  }
332
333  /**
334   * Execute the given command on the host using SSH
335   * @return pair of exit code and command output
336   * @throws IOException if something goes wrong.
337   */
338  protected Pair<Integer, String> exec(String hostname, ServiceType service, String... cmd)
339    throws IOException {
340    LOG.info("Executing remote command: {}, hostname:{}", StringUtils.join(cmd, " "),
341        hostname);
342
343    RemoteShell shell = new RemoteShell(hostname, getServiceUser(service), cmd);
344    try {
345      shell.execute();
346    } catch (Shell.ExitCodeException ex) {
347      // capture the stdout of the process as well.
348      String output = shell.getOutput();
349      // add output for the ExitCodeException.
350      throw new Shell.ExitCodeException(ex.getExitCode(), "stderr: " + ex.getMessage()
351        + ", stdout: " + output);
352    }
353
354    LOG.info("Executed remote command, exit code:{} , output:{}", shell.getExitCode(),
355        shell.getOutput());
356
357    return new Pair<>(shell.getExitCode(), shell.getOutput());
358  }
359
360  private Pair<Integer, String> execWithRetries(String hostname, ServiceType service, String... cmd)
361      throws IOException {
362    RetryCounter retryCounter = retryCounterFactory.create();
363    while (true) {
364      try {
365        return exec(hostname, service, cmd);
366      } catch (IOException e) {
367        retryOrThrow(retryCounter, e, hostname, cmd);
368      }
369      try {
370        retryCounter.sleepUntilNextRetry();
371      } catch (InterruptedException ex) {
372        // ignore
373        LOG.warn("Sleep Interrupted:", ex);
374      }
375    }
376  }
377
378  /**
379   * Execute the given command on the host using SSH
380   * @return pair of exit code and command output
381   * @throws IOException if something goes wrong.
382   */
383  public Pair<Integer, String> execSudo(String hostname, long timeout, String... cmd)
384      throws IOException {
385    LOG.info("Executing remote command: {} , hostname:{}", StringUtils.join(cmd, " "),
386        hostname);
387
388    RemoteSudoShell shell = new RemoteSudoShell(hostname, cmd, timeout);
389    try {
390      shell.execute();
391    } catch (Shell.ExitCodeException ex) {
392      // capture the stdout of the process as well.
393      String output = shell.getOutput();
394      // add output for the ExitCodeException.
395      throw new Shell.ExitCodeException(ex.getExitCode(), "stderr: " + ex.getMessage()
396          + ", stdout: " + output);
397    }
398
399    LOG.info("Executed remote command, exit code:{} , output:{}", shell.getExitCode(),
400        shell.getOutput());
401
402    return new Pair<>(shell.getExitCode(), shell.getOutput());
403  }
404
405  public Pair<Integer, String> execSudoWithRetries(String hostname, long timeout, String... cmd)
406      throws IOException {
407    RetryCounter retryCounter = retryCounterFactory.create();
408    while (true) {
409      try {
410        return execSudo(hostname, timeout, cmd);
411      } catch (IOException e) {
412        retryOrThrow(retryCounter, e, hostname, cmd);
413      }
414      try {
415        retryCounter.sleepUntilNextRetry();
416      } catch (InterruptedException ex) {
417        // ignore
418        LOG.warn("Sleep Interrupted:", ex);
419      }
420    }
421  }
422
423  private <E extends Exception> void retryOrThrow(RetryCounter retryCounter, E ex,
424      String hostname, String[] cmd) throws E {
425    if (retryCounter.shouldRetry()) {
426      LOG.warn("Remote command: " + StringUtils.join(cmd, " ") + " , hostname:" + hostname
427        + " failed at attempt " + retryCounter.getAttemptTimes() + ". Retrying until maxAttempts: "
428          + retryCounter.getMaxAttempts() + ". Exception: " + ex.getMessage());
429      return;
430    }
431    throw ex;
432  }
433
434  private void exec(String hostname, ServiceType service, Operation op) throws IOException {
435    execWithRetries(hostname, service, getCommandProvider(service).getCommand(service, op));
436  }
437
438  @Override
439  public void start(ServiceType service, String hostname, int port) throws IOException {
440    exec(hostname, service, Operation.START);
441  }
442
443  @Override
444  public void stop(ServiceType service, String hostname, int port) throws IOException {
445    exec(hostname, service, Operation.STOP);
446  }
447
448  @Override
449  public void restart(ServiceType service, String hostname, int port) throws IOException {
450    exec(hostname, service, Operation.RESTART);
451  }
452
453  public void signal(ServiceType service, Signal signal, String hostname) throws IOException {
454    execWithRetries(hostname, service,
455      getCommandProvider(service).signalCommand(service, signal.toString()));
456  }
457
458  @Override
459  public boolean isRunning(ServiceType service, String hostname, int port) throws IOException {
460    String ret = execWithRetries(hostname, service,
461      getCommandProvider(service).isRunningCommand(service)).getSecond();
462    return ret.length() > 0;
463  }
464
465  @Override
466  public void kill(ServiceType service, String hostname, int port) throws IOException {
467    signal(service, Signal.SIGKILL, hostname);
468  }
469
470  @Override
471  public void suspend(ServiceType service, String hostname, int port) throws IOException {
472    signal(service, Signal.SIGSTOP, hostname);
473  }
474
475  @Override
476  public void resume(ServiceType service, String hostname, int port) throws IOException {
477    signal(service, Signal.SIGCONT, hostname);
478  }
479}