001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase;
020
021import java.io.File;
022import java.io.IOException;
023import java.util.Locale;
024import java.util.Map;
025import org.apache.commons.lang3.StringUtils;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.conf.Configured;
028import org.apache.hadoop.hbase.HBaseClusterManager.CommandProvider.Operation;
029import org.apache.hadoop.hbase.util.Pair;
030import org.apache.hadoop.hbase.util.RetryCounter;
031import org.apache.hadoop.hbase.util.RetryCounter.RetryConfig;
032import org.apache.hadoop.hbase.util.RetryCounterFactory;
033import org.apache.hadoop.util.Shell;
034import org.apache.yetus.audience.InterfaceAudience;
035import org.slf4j.Logger;
036import org.slf4j.LoggerFactory;
037
038/**
039 * A default cluster manager for HBase. Uses SSH, and hbase shell scripts
040 * to manage the cluster. Assumes Unix-like commands are available like 'ps',
041 * 'kill', etc. Also assumes the user running the test has enough "power" to start & stop
042 * servers on the remote machines (for example, the test user could be the same user as the
043 * user the daemon is running as)
044 */
045@InterfaceAudience.Private
046public class HBaseClusterManager extends Configured implements ClusterManager {
047
048  protected enum Signal {
049    SIGKILL,
050    SIGSTOP,
051    SIGCONT,
052  }
053
054  protected static final Logger LOG = LoggerFactory.getLogger(HBaseClusterManager.class);
055  private String sshUserName;
056  private String sshOptions;
057
058  /**
059   * The command format that is used to execute the remote command. Arguments:
060   * 1 SSH options, 2 user name , 3 "@" if username is set, 4 host,
061   * 5 original command, 6 service user.
062   */
063  private static final String DEFAULT_TUNNEL_CMD =
064      "timeout 30 /usr/bin/ssh %1$s %2$s%3$s%4$s \"sudo -u %6$s %5$s\"";
065  private String tunnelCmd;
066
067  /**
068   * The command format that is used to execute the remote command with sudo. Arguments:
069   * 1 SSH options, 2 user name , 3 "@" if username is set, 4 host,
070   * 5 original command, 6 timeout.
071   */
072  private static final String DEFAULT_TUNNEL_SUDO_CMD =
073      "timeout %6$s /usr/bin/ssh %1$s %2$s%3$s%4$s \"sudo %5$s\"";
074  private String tunnelSudoCmd;
075
076  static final String RETRY_ATTEMPTS_KEY = "hbase.it.clustermanager.retry.attempts";
077  static final int DEFAULT_RETRY_ATTEMPTS = 5;
078
079  static final String RETRY_SLEEP_INTERVAL_KEY = "hbase.it.clustermanager.retry.sleep.interval";
080  static final int DEFAULT_RETRY_SLEEP_INTERVAL = 1000;
081
082  protected RetryCounterFactory retryCounterFactory;
083
084  @Override
085  public void setConf(Configuration conf) {
086    super.setConf(conf);
087    if (conf == null) {
088      // Configured gets passed null before real conf. Why? I don't know.
089      return;
090    }
091    sshUserName = conf.get("hbase.it.clustermanager.ssh.user", "");
092    String extraSshOptions = conf.get("hbase.it.clustermanager.ssh.opts", "");
093    sshOptions = System.getenv("HBASE_SSH_OPTS");
094    if (!extraSshOptions.isEmpty()) {
095      sshOptions = StringUtils.join(new Object[] { sshOptions, extraSshOptions }, " ");
096    }
097    sshOptions = (sshOptions == null) ? "" : sshOptions;
098    sshUserName = (sshUserName == null) ? "" : sshUserName;
099    tunnelCmd = conf.get("hbase.it.clustermanager.ssh.cmd", DEFAULT_TUNNEL_CMD);
100    tunnelSudoCmd = conf.get("hbase.it.clustermanager.ssh.sudo.cmd", DEFAULT_TUNNEL_SUDO_CMD);
101    // Print out ssh special config if any.
102    if ((sshUserName != null && sshUserName.length() > 0) ||
103        (sshOptions != null && sshOptions.length() > 0)) {
104      LOG.info("Running with SSH user [" + sshUserName + "] and options [" + sshOptions + "]");
105    }
106
107    this.retryCounterFactory = new RetryCounterFactory(new RetryConfig()
108        .setMaxAttempts(conf.getInt(RETRY_ATTEMPTS_KEY, DEFAULT_RETRY_ATTEMPTS))
109        .setSleepInterval(conf.getLong(RETRY_SLEEP_INTERVAL_KEY, DEFAULT_RETRY_SLEEP_INTERVAL)));
110  }
111
112  protected String getServiceUser(ServiceType service) {
113    Configuration conf = getConf();
114    switch (service) {
115      case HADOOP_DATANODE:
116      case HADOOP_NAMENODE:
117        return conf.get("hbase.it.clustermanager.hadoop.hdfs.user", "hdfs");
118      case ZOOKEEPER_SERVER:
119        return conf.get("hbase.it.clustermanager.zookeeper.user", "zookeeper");
120      default:
121        return conf.get("hbase.it.clustermanager.hbase.user", "hbase");
122    }
123  }
124
125  /**
126   * Executes commands over SSH
127   */
128  protected class RemoteShell extends Shell.ShellCommandExecutor {
129    private String hostname;
130    private String user;
131
132    public RemoteShell(String hostname, String[] execString, File dir, Map<String, String> env,
133        long timeout) {
134      super(execString, dir, env, timeout);
135      this.hostname = hostname;
136    }
137
138    public RemoteShell(String hostname, String[] execString, File dir, Map<String, String> env) {
139      super(execString, dir, env);
140      this.hostname = hostname;
141    }
142
143    public RemoteShell(String hostname, String[] execString, File dir) {
144      super(execString, dir);
145      this.hostname = hostname;
146    }
147
148    public RemoteShell(String hostname, String[] execString) {
149      super(execString);
150      this.hostname = hostname;
151    }
152
153    public RemoteShell(String hostname, String user, String[] execString) {
154      super(execString);
155      this.hostname = hostname;
156      this.user = user;
157    }
158
159    @Override
160    public String[] getExecString() {
161      String at = sshUserName.isEmpty() ? "" : "@";
162      String remoteCmd = StringUtils.join(super.getExecString(), " ");
163      String cmd = String.format(tunnelCmd, sshOptions, sshUserName, at, hostname, remoteCmd, user);
164      LOG.info("Executing full command [" + cmd + "]");
165      return new String[] { "/usr/bin/env", "bash", "-c", cmd };
166    }
167  }
168
169  /**
170   * Executes commands over SSH
171   */
172  protected class RemoteSudoShell extends Shell.ShellCommandExecutor {
173    private String hostname;
174
175    public RemoteSudoShell(String hostname, String[] execString, long timeout) {
176      this(hostname, execString, null, null, timeout);
177    }
178
179    public RemoteSudoShell(String hostname, String[] execString, File dir, Map<String, String> env,
180        long timeout) {
181      super(execString, dir, env, timeout);
182      this.hostname = hostname;
183    }
184
185    @Override
186    public String[] getExecString() {
187      String at = sshUserName.isEmpty() ? "" : "@";
188      String remoteCmd = StringUtils.join(super.getExecString(), " ");
189      String cmd = String.format(tunnelSudoCmd, sshOptions, sshUserName, at, hostname, remoteCmd,
190          timeOutInterval/1000f);
191      LOG.info("Executing full command [" + cmd + "]");
192      return new String[] { "/usr/bin/env", "bash", "-c", cmd };
193    }
194  }
195
196  /**
197   * Provides command strings for services to be executed by Shell. CommandProviders are
198   * pluggable, and different deployments(windows, bigtop, etc) can be managed by
199   * plugging-in custom CommandProvider's or ClusterManager's.
200   */
201  static abstract class CommandProvider {
202
203    enum Operation {
204      START, STOP, RESTART
205    }
206
207    public abstract String getCommand(ServiceType service, Operation op);
208
209    public String isRunningCommand(ServiceType service) {
210      return findPidCommand(service);
211    }
212
213    protected String findPidCommand(ServiceType service) {
214      return String.format("ps ux | grep proc_%s | grep -v grep | tr -s ' ' | cut -d ' ' -f2",
215          service);
216    }
217
218    public String signalCommand(ServiceType service, String signal) {
219      return String.format("%s | xargs kill -s %s", findPidCommand(service), signal);
220    }
221  }
222
223  /**
224   * CommandProvider to manage the service using bin/hbase-* scripts
225   */
226  static class HBaseShellCommandProvider extends CommandProvider {
227    private final String hbaseHome;
228    private final String confDir;
229
230    HBaseShellCommandProvider(Configuration conf) {
231      hbaseHome = conf.get("hbase.it.clustermanager.hbase.home",
232        System.getenv("HBASE_HOME"));
233      String tmp = conf.get("hbase.it.clustermanager.hbase.conf.dir",
234        System.getenv("HBASE_CONF_DIR"));
235      if (tmp != null) {
236        confDir = String.format("--config %s", tmp);
237      } else {
238        confDir = "";
239      }
240    }
241
242    @Override
243    public String getCommand(ServiceType service, Operation op) {
244      return String.format("%s/bin/hbase-daemon.sh %s %s %s", hbaseHome, confDir,
245          op.toString().toLowerCase(Locale.ROOT), service);
246    }
247  }
248
249  /**
250   * CommandProvider to manage the service using sbin/hadoop-* scripts.
251   */
252  static class HadoopShellCommandProvider extends CommandProvider {
253    private final String hadoopHome;
254    private final String confDir;
255
256    HadoopShellCommandProvider(Configuration conf) throws IOException {
257      hadoopHome = conf.get("hbase.it.clustermanager.hadoop.home",
258          System.getenv("HADOOP_HOME"));
259      String tmp = conf.get("hbase.it.clustermanager.hadoop.conf.dir",
260          System.getenv("HADOOP_CONF_DIR"));
261      if (hadoopHome == null) {
262        throw new IOException("Hadoop home configuration parameter i.e. " +
263          "'hbase.it.clustermanager.hadoop.home' is not configured properly.");
264      }
265      if (tmp != null) {
266        confDir = String.format("--config %s", tmp);
267      } else {
268        confDir = "";
269      }
270    }
271
272    @Override
273    public String getCommand(ServiceType service, Operation op) {
274      return String.format("%s/sbin/hadoop-daemon.sh %s %s %s", hadoopHome, confDir,
275          op.toString().toLowerCase(Locale.ROOT), service);
276    }
277  }
278
279  /**
280   * CommandProvider to manage the service using bin/zk* scripts.
281   */
282  static class ZookeeperShellCommandProvider extends CommandProvider {
283    private final String zookeeperHome;
284    private final String confDir;
285
286    ZookeeperShellCommandProvider(Configuration conf) throws IOException {
287      zookeeperHome = conf.get("hbase.it.clustermanager.zookeeper.home",
288          System.getenv("ZOOBINDIR"));
289      String tmp = conf.get("hbase.it.clustermanager.zookeeper.conf.dir",
290          System.getenv("ZOOCFGDIR"));
291      if (zookeeperHome == null) {
292        throw new IOException("ZooKeeper home configuration parameter i.e. " +
293          "'hbase.it.clustermanager.zookeeper.home' is not configured properly.");
294      }
295      if (tmp != null) {
296        confDir = String.format("--config %s", tmp);
297      } else {
298        confDir = "";
299      }
300    }
301
302    @Override
303    public String getCommand(ServiceType service, Operation op) {
304      return String.format("%s/bin/zkServer.sh %s", zookeeperHome, op.toString().toLowerCase(Locale.ROOT));
305    }
306
307    @Override
308    protected String findPidCommand(ServiceType service) {
309      return String.format("ps ux | grep %s | grep -v grep | tr -s ' ' | cut -d ' ' -f2",
310        service);
311    }
312  }
313
314  public HBaseClusterManager() {
315  }
316
317  protected CommandProvider getCommandProvider(ServiceType service) throws IOException {
318    switch (service) {
319      case HADOOP_DATANODE:
320      case HADOOP_NAMENODE:
321        return new HadoopShellCommandProvider(getConf());
322      case ZOOKEEPER_SERVER:
323        return new ZookeeperShellCommandProvider(getConf());
324      default:
325        return new HBaseShellCommandProvider(getConf());
326    }
327  }
328
329  /**
330   * Execute the given command on the host using SSH
331   * @return pair of exit code and command output
332   * @throws IOException if something goes wrong.
333   */
334  protected Pair<Integer, String> exec(String hostname, ServiceType service, String... cmd)
335    throws IOException {
336    LOG.info("Executing remote command: {}, hostname:{}", StringUtils.join(cmd, " "),
337        hostname);
338
339    RemoteShell shell = new RemoteShell(hostname, getServiceUser(service), cmd);
340    try {
341      shell.execute();
342    } catch (Shell.ExitCodeException ex) {
343      // capture the stdout of the process as well.
344      String output = shell.getOutput();
345      // add output for the ExitCodeException.
346      throw new Shell.ExitCodeException(ex.getExitCode(), "stderr: " + ex.getMessage()
347        + ", stdout: " + output);
348    }
349
350    LOG.info("Executed remote command, exit code:{} , output:{}", shell.getExitCode(),
351        shell.getOutput());
352
353    return new Pair<>(shell.getExitCode(), shell.getOutput());
354  }
355
356  private Pair<Integer, String> execWithRetries(String hostname, ServiceType service, String... cmd)
357      throws IOException {
358    RetryCounter retryCounter = retryCounterFactory.create();
359    while (true) {
360      try {
361        return exec(hostname, service, cmd);
362      } catch (IOException e) {
363        retryOrThrow(retryCounter, e, hostname, cmd);
364      }
365      try {
366        retryCounter.sleepUntilNextRetry();
367      } catch (InterruptedException ex) {
368        // ignore
369        LOG.warn("Sleep Interrupted:", ex);
370      }
371    }
372  }
373
374  /**
375   * Execute the given command on the host using SSH
376   * @return pair of exit code and command output
377   * @throws IOException if something goes wrong.
378   */
379  public Pair<Integer, String> execSudo(String hostname, long timeout, String... cmd)
380      throws IOException {
381    LOG.info("Executing remote command: {} , hostname:{}", StringUtils.join(cmd, " "),
382        hostname);
383
384    RemoteSudoShell shell = new RemoteSudoShell(hostname, cmd, timeout);
385    try {
386      shell.execute();
387    } catch (Shell.ExitCodeException ex) {
388      // capture the stdout of the process as well.
389      String output = shell.getOutput();
390      // add output for the ExitCodeException.
391      throw new Shell.ExitCodeException(ex.getExitCode(), "stderr: " + ex.getMessage()
392          + ", stdout: " + output);
393    }
394
395    LOG.info("Executed remote command, exit code:{} , output:{}", shell.getExitCode(),
396        shell.getOutput());
397
398    return new Pair<>(shell.getExitCode(), shell.getOutput());
399  }
400
401  public Pair<Integer, String> execSudoWithRetries(String hostname, long timeout, String... cmd)
402      throws IOException {
403    RetryCounter retryCounter = retryCounterFactory.create();
404    while (true) {
405      try {
406        return execSudo(hostname, timeout, cmd);
407      } catch (IOException e) {
408        retryOrThrow(retryCounter, e, hostname, cmd);
409      }
410      try {
411        retryCounter.sleepUntilNextRetry();
412      } catch (InterruptedException ex) {
413        // ignore
414        LOG.warn("Sleep Interrupted:", ex);
415      }
416    }
417  }
418
419  private <E extends Exception> void retryOrThrow(RetryCounter retryCounter, E ex,
420      String hostname, String[] cmd) throws E {
421    if (retryCounter.shouldRetry()) {
422      LOG.warn("Remote command: " + StringUtils.join(cmd, " ") + " , hostname:" + hostname
423        + " failed at attempt " + retryCounter.getAttemptTimes() + ". Retrying until maxAttempts: "
424          + retryCounter.getMaxAttempts() + ". Exception: " + ex.getMessage());
425      return;
426    }
427    throw ex;
428  }
429
430  private void exec(String hostname, ServiceType service, Operation op) throws IOException {
431    execWithRetries(hostname, service, getCommandProvider(service).getCommand(service, op));
432  }
433
434  @Override
435  public void start(ServiceType service, String hostname, int port) throws IOException {
436    exec(hostname, service, Operation.START);
437  }
438
439  @Override
440  public void stop(ServiceType service, String hostname, int port) throws IOException {
441    exec(hostname, service, Operation.STOP);
442  }
443
444  @Override
445  public void restart(ServiceType service, String hostname, int port) throws IOException {
446    exec(hostname, service, Operation.RESTART);
447  }
448
449  public void signal(ServiceType service, Signal signal, String hostname) throws IOException {
450    execWithRetries(hostname, service,
451      getCommandProvider(service).signalCommand(service, signal.toString()));
452  }
453
454  @Override
455  public boolean isRunning(ServiceType service, String hostname, int port) throws IOException {
456    String ret = execWithRetries(hostname, service,
457      getCommandProvider(service).isRunningCommand(service)).getSecond();
458    return ret.length() > 0;
459  }
460
461  @Override
462  public void kill(ServiceType service, String hostname, int port) throws IOException {
463    signal(service, Signal.SIGKILL, hostname);
464  }
465
466  @Override
467  public void suspend(ServiceType service, String hostname, int port) throws IOException {
468    signal(service, Signal.SIGSTOP, hostname);
469  }
470
471  @Override
472  public void resume(ServiceType service, String hostname, int port) throws IOException {
473    signal(service, Signal.SIGCONT, hostname);
474  }
475}