001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase;
020
021import java.io.File;
022import java.io.IOException;
023import java.util.Locale;
024import java.util.Map;
025
026import org.apache.commons.lang3.StringUtils;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.conf.Configured;
029import org.apache.hadoop.hbase.HBaseClusterManager.CommandProvider.Operation;
030import org.apache.hadoop.hbase.util.Pair;
031import org.apache.hadoop.hbase.util.RetryCounter;
032import org.apache.hadoop.hbase.util.RetryCounter.RetryConfig;
033import org.apache.hadoop.hbase.util.RetryCounterFactory;
034import org.apache.hadoop.util.Shell;
035import org.apache.yetus.audience.InterfaceAudience;
036import org.slf4j.Logger;
037import org.slf4j.LoggerFactory;
038
039/**
040 * A default cluster manager for HBase. Uses SSH, and hbase shell scripts
041 * to manage the cluster. Assumes Unix-like commands are available like 'ps',
042 * 'kill', etc. Also assumes the user running the test has enough "power" to start & stop
043 * servers on the remote machines (for example, the test user could be the same user as the
044 * user the daemon is running as)
045 */
046@InterfaceAudience.Private
047public class HBaseClusterManager extends Configured implements ClusterManager {
048  private static final String SIGKILL = "SIGKILL";
049  private static final String SIGSTOP = "SIGSTOP";
050  private static final String SIGCONT = "SIGCONT";
051
052  protected static final Logger LOG = LoggerFactory.getLogger(HBaseClusterManager.class);
053  private String sshUserName;
054  private String sshOptions;
055
056  /**
057   * The command format that is used to execute the remote command. Arguments:
058   * 1 SSH options, 2 user name , 3 "@" if username is set, 4 host,
059   * 5 original command, 6 service user.
060   */
061  private static final String DEFAULT_TUNNEL_CMD =
062      "timeout 30 /usr/bin/ssh %1$s %2$s%3$s%4$s \"sudo -u %6$s %5$s\"";
063  private String tunnelCmd;
064
065  /**
066   * The command format that is used to execute the remote command with sudo. Arguments:
067   * 1 SSH options, 2 user name , 3 "@" if username is set, 4 host,
068   * 5 original command, 6 timeout.
069   */
070  private static final String DEFAULT_TUNNEL_SUDO_CMD =
071      "timeout %6$s /usr/bin/ssh %1$s %2$s%3$s%4$s \"sudo %5$s\"";
072  private String tunnelSudoCmd;
073
074  private static final String RETRY_ATTEMPTS_KEY = "hbase.it.clustermanager.retry.attempts";
075  private static final int DEFAULT_RETRY_ATTEMPTS = 5;
076
077  private static final String RETRY_SLEEP_INTERVAL_KEY = "hbase.it.clustermanager.retry.sleep.interval";
078  private static final int DEFAULT_RETRY_SLEEP_INTERVAL = 1000;
079
080  protected RetryCounterFactory retryCounterFactory;
081
082  @Override
083  public void setConf(Configuration conf) {
084    super.setConf(conf);
085    if (conf == null) {
086      // Configured gets passed null before real conf. Why? I don't know.
087      return;
088    }
089    sshUserName = conf.get("hbase.it.clustermanager.ssh.user", "");
090    String extraSshOptions = conf.get("hbase.it.clustermanager.ssh.opts", "");
091    sshOptions = System.getenv("HBASE_SSH_OPTS");
092    if (!extraSshOptions.isEmpty()) {
093      sshOptions = StringUtils.join(new Object[] { sshOptions, extraSshOptions }, " ");
094    }
095    sshOptions = (sshOptions == null) ? "" : sshOptions;
096    sshUserName = (sshUserName == null) ? "" : sshUserName;
097    tunnelCmd = conf.get("hbase.it.clustermanager.ssh.cmd", DEFAULT_TUNNEL_CMD);
098    tunnelSudoCmd = conf.get("hbase.it.clustermanager.ssh.sudo.cmd", DEFAULT_TUNNEL_SUDO_CMD);
099    // Print out ssh special config if any.
100    if ((sshUserName != null && sshUserName.length() > 0) ||
101        (sshOptions != null && sshOptions.length() > 0)) {
102      LOG.info("Running with SSH user [" + sshUserName + "] and options [" + sshOptions + "]");
103    }
104
105    this.retryCounterFactory = new RetryCounterFactory(new RetryConfig()
106        .setMaxAttempts(conf.getInt(RETRY_ATTEMPTS_KEY, DEFAULT_RETRY_ATTEMPTS))
107        .setSleepInterval(conf.getLong(RETRY_SLEEP_INTERVAL_KEY, DEFAULT_RETRY_SLEEP_INTERVAL)));
108  }
109
110  private String getServiceUser(ServiceType service) {
111    Configuration conf = getConf();
112    switch (service) {
113      case HADOOP_DATANODE:
114      case HADOOP_NAMENODE:
115        return conf.get("hbase.it.clustermanager.hadoop.hdfs.user", "hdfs");
116      case ZOOKEEPER_SERVER:
117        return conf.get("hbase.it.clustermanager.zookeeper.user", "zookeeper");
118      default:
119        return conf.get("hbase.it.clustermanager.hbase.user", "hbase");
120    }
121  }
122
123  /**
124   * Executes commands over SSH
125   */
126  protected class RemoteShell extends Shell.ShellCommandExecutor {
127    private String hostname;
128    private String user;
129
130    public RemoteShell(String hostname, String[] execString, File dir, Map<String, String> env,
131        long timeout) {
132      super(execString, dir, env, timeout);
133      this.hostname = hostname;
134    }
135
136    public RemoteShell(String hostname, String[] execString, File dir, Map<String, String> env) {
137      super(execString, dir, env);
138      this.hostname = hostname;
139    }
140
141    public RemoteShell(String hostname, String[] execString, File dir) {
142      super(execString, dir);
143      this.hostname = hostname;
144    }
145
146    public RemoteShell(String hostname, String[] execString) {
147      super(execString);
148      this.hostname = hostname;
149    }
150
151    public RemoteShell(String hostname, String user, String[] execString) {
152      super(execString);
153      this.hostname = hostname;
154      this.user = user;
155    }
156
157    @Override
158    public String[] getExecString() {
159      String at = sshUserName.isEmpty() ? "" : "@";
160      String remoteCmd = StringUtils.join(super.getExecString(), " ");
161      String cmd = String.format(tunnelCmd, sshOptions, sshUserName, at, hostname, remoteCmd, user);
162      LOG.info("Executing full command [" + cmd + "]");
163      return new String[] { "/usr/bin/env", "bash", "-c", cmd };
164    }
165  }
166
167  /**
168   * Executes commands over SSH
169   */
170  protected class RemoteSudoShell extends Shell.ShellCommandExecutor {
171    private String hostname;
172
173    public RemoteSudoShell(String hostname, String[] execString, long timeout) {
174      this(hostname, execString, null, null, timeout);
175    }
176
177    public RemoteSudoShell(String hostname, String[] execString, File dir, Map<String, String> env,
178        long timeout) {
179      super(execString, dir, env, timeout);
180      this.hostname = hostname;
181    }
182
183    @Override
184    public String[] getExecString() {
185      String at = sshUserName.isEmpty() ? "" : "@";
186      String remoteCmd = StringUtils.join(super.getExecString(), " ");
187      String cmd = String.format(tunnelSudoCmd, sshOptions, sshUserName, at, hostname, remoteCmd,
188          timeOutInterval/1000f);
189      LOG.info("Executing full command [" + cmd + "]");
190      return new String[] { "/usr/bin/env", "bash", "-c", cmd };
191    }
192  }
193
194  /**
195   * Provides command strings for services to be executed by Shell. CommandProviders are
196   * pluggable, and different deployments(windows, bigtop, etc) can be managed by
197   * plugging-in custom CommandProvider's or ClusterManager's.
198   */
199  static abstract class CommandProvider {
200
201    enum Operation {
202      START, STOP, RESTART
203    }
204
205    public abstract String getCommand(ServiceType service, Operation op);
206
207    public String isRunningCommand(ServiceType service) {
208      return findPidCommand(service);
209    }
210
211    protected String findPidCommand(ServiceType service) {
212      return String.format("ps ux | grep proc_%s | grep -v grep | tr -s ' ' | cut -d ' ' -f2",
213          service);
214    }
215
216    public String signalCommand(ServiceType service, String signal) {
217      return String.format("%s | xargs kill -s %s", findPidCommand(service), signal);
218    }
219  }
220
221  /**
222   * CommandProvider to manage the service using bin/hbase-* scripts
223   */
224  static class HBaseShellCommandProvider extends CommandProvider {
225    private final String hbaseHome;
226    private final String confDir;
227
228    HBaseShellCommandProvider(Configuration conf) {
229      hbaseHome = conf.get("hbase.it.clustermanager.hbase.home",
230        System.getenv("HBASE_HOME"));
231      String tmp = conf.get("hbase.it.clustermanager.hbase.conf.dir",
232        System.getenv("HBASE_CONF_DIR"));
233      if (tmp != null) {
234        confDir = String.format("--config %s", tmp);
235      } else {
236        confDir = "";
237      }
238    }
239
240    @Override
241    public String getCommand(ServiceType service, Operation op) {
242      return String.format("%s/bin/hbase-daemon.sh %s %s %s", hbaseHome, confDir,
243          op.toString().toLowerCase(Locale.ROOT), service);
244    }
245  }
246
247  /**
248   * CommandProvider to manage the service using sbin/hadoop-* scripts.
249   */
250  static class HadoopShellCommandProvider extends CommandProvider {
251    private final String hadoopHome;
252    private final String confDir;
253
254    HadoopShellCommandProvider(Configuration conf) throws IOException {
255      hadoopHome = conf.get("hbase.it.clustermanager.hadoop.home",
256          System.getenv("HADOOP_HOME"));
257      String tmp = conf.get("hbase.it.clustermanager.hadoop.conf.dir",
258          System.getenv("HADOOP_CONF_DIR"));
259      if (hadoopHome == null) {
260        throw new IOException("Hadoop home configuration parameter i.e. " +
261          "'hbase.it.clustermanager.hadoop.home' is not configured properly.");
262      }
263      if (tmp != null) {
264        confDir = String.format("--config %s", tmp);
265      } else {
266        confDir = "";
267      }
268    }
269
270    @Override
271    public String getCommand(ServiceType service, Operation op) {
272      return String.format("%s/sbin/hadoop-daemon.sh %s %s %s", hadoopHome, confDir,
273          op.toString().toLowerCase(Locale.ROOT), service);
274    }
275  }
276
277  /**
278   * CommandProvider to manage the service using bin/zk* scripts.
279   */
280  static class ZookeeperShellCommandProvider extends CommandProvider {
281    private final String zookeeperHome;
282    private final String confDir;
283
284    ZookeeperShellCommandProvider(Configuration conf) throws IOException {
285      zookeeperHome = conf.get("hbase.it.clustermanager.zookeeper.home",
286          System.getenv("ZOOBINDIR"));
287      String tmp = conf.get("hbase.it.clustermanager.zookeeper.conf.dir",
288          System.getenv("ZOOCFGDIR"));
289      if (zookeeperHome == null) {
290        throw new IOException("ZooKeeper home configuration parameter i.e. " +
291          "'hbase.it.clustermanager.zookeeper.home' is not configured properly.");
292      }
293      if (tmp != null) {
294        confDir = String.format("--config %s", tmp);
295      } else {
296        confDir = "";
297      }
298    }
299
300    @Override
301    public String getCommand(ServiceType service, Operation op) {
302      return String.format("%s/bin/zkServer.sh %s", zookeeperHome, op.toString().toLowerCase(Locale.ROOT));
303    }
304
305    @Override
306    protected String findPidCommand(ServiceType service) {
307      return String.format("ps ux | grep %s | grep -v grep | tr -s ' ' | cut -d ' ' -f2",
308        service);
309    }
310  }
311
312  public HBaseClusterManager() {
313  }
314
315  protected CommandProvider getCommandProvider(ServiceType service) throws IOException {
316    switch (service) {
317      case HADOOP_DATANODE:
318      case HADOOP_NAMENODE:
319        return new HadoopShellCommandProvider(getConf());
320      case ZOOKEEPER_SERVER:
321        return new ZookeeperShellCommandProvider(getConf());
322      default:
323        return new HBaseShellCommandProvider(getConf());
324    }
325  }
326
327  /**
328   * Execute the given command on the host using SSH
329   * @return pair of exit code and command output
330   * @throws IOException if something goes wrong.
331   */
332  private Pair<Integer, String> exec(String hostname, ServiceType service, String... cmd)
333    throws IOException {
334    LOG.info("Executing remote command: {} , hostname:{}", StringUtils.join(cmd, " "),
335        hostname);
336
337    RemoteShell shell = new RemoteShell(hostname, getServiceUser(service), cmd);
338    try {
339      shell.execute();
340    } catch (Shell.ExitCodeException ex) {
341      // capture the stdout of the process as well.
342      String output = shell.getOutput();
343      // add output for the ExitCodeException.
344      throw new Shell.ExitCodeException(ex.getExitCode(), "stderr: " + ex.getMessage()
345        + ", stdout: " + output);
346    }
347
348    LOG.info("Executed remote command, exit code:{} , output:{}", shell.getExitCode(),
349        shell.getOutput());
350
351    return new Pair<>(shell.getExitCode(), shell.getOutput());
352  }
353
354  private Pair<Integer, String> execWithRetries(String hostname, ServiceType service, String... cmd)
355      throws IOException {
356    RetryCounter retryCounter = retryCounterFactory.create();
357    while (true) {
358      try {
359        return exec(hostname, service, cmd);
360      } catch (IOException e) {
361        retryOrThrow(retryCounter, e, hostname, cmd);
362      }
363      try {
364        retryCounter.sleepUntilNextRetry();
365      } catch (InterruptedException ex) {
366        // ignore
367        LOG.warn("Sleep Interrupted:", ex);
368      }
369    }
370  }
371
372  /**
373   * Execute the given command on the host using SSH
374   * @return pair of exit code and command output
375   * @throws IOException if something goes wrong.
376   */
377  public Pair<Integer, String> execSudo(String hostname, long timeout, String... cmd)
378      throws IOException {
379    LOG.info("Executing remote command: {} , hostname:{}", StringUtils.join(cmd, " "),
380        hostname);
381
382    RemoteSudoShell shell = new RemoteSudoShell(hostname, cmd, timeout);
383    try {
384      shell.execute();
385    } catch (Shell.ExitCodeException ex) {
386      // capture the stdout of the process as well.
387      String output = shell.getOutput();
388      // add output for the ExitCodeException.
389      throw new Shell.ExitCodeException(ex.getExitCode(), "stderr: " + ex.getMessage()
390          + ", stdout: " + output);
391    }
392
393    LOG.info("Executed remote command, exit code:{} , output:{}", shell.getExitCode(),
394        shell.getOutput());
395
396    return new Pair<>(shell.getExitCode(), shell.getOutput());
397  }
398
399  public Pair<Integer, String> execSudoWithRetries(String hostname, long timeout, String... cmd)
400      throws IOException {
401    RetryCounter retryCounter = retryCounterFactory.create();
402    while (true) {
403      try {
404        return execSudo(hostname, timeout, cmd);
405      } catch (IOException e) {
406        retryOrThrow(retryCounter, e, hostname, cmd);
407      }
408      try {
409        retryCounter.sleepUntilNextRetry();
410      } catch (InterruptedException ex) {
411        // ignore
412        LOG.warn("Sleep Interrupted:", ex);
413      }
414    }
415  }
416
417  private <E extends Exception> void retryOrThrow(RetryCounter retryCounter, E ex,
418      String hostname, String[] cmd) throws E {
419    if (retryCounter.shouldRetry()) {
420      LOG.warn("Remote command: " + StringUtils.join(cmd, " ") + " , hostname:" + hostname
421        + " failed at attempt " + retryCounter.getAttemptTimes() + ". Retrying until maxAttempts: "
422          + retryCounter.getMaxAttempts() + ". Exception: " + ex.getMessage());
423      return;
424    }
425    throw ex;
426  }
427
428  private void exec(String hostname, ServiceType service, Operation op) throws IOException {
429    execWithRetries(hostname, service, getCommandProvider(service).getCommand(service, op));
430  }
431
432  @Override
433  public void start(ServiceType service, String hostname, int port) throws IOException {
434    exec(hostname, service, Operation.START);
435  }
436
437  @Override
438  public void stop(ServiceType service, String hostname, int port) throws IOException {
439    exec(hostname, service, Operation.STOP);
440  }
441
442  @Override
443  public void restart(ServiceType service, String hostname, int port) throws IOException {
444    exec(hostname, service, Operation.RESTART);
445  }
446
447  public void signal(ServiceType service, String signal, String hostname) throws IOException {
448    execWithRetries(hostname, service, getCommandProvider(service).signalCommand(service, signal));
449  }
450
451  @Override
452  public boolean isRunning(ServiceType service, String hostname, int port) throws IOException {
453    String ret = execWithRetries(hostname, service,
454      getCommandProvider(service).isRunningCommand(service)).getSecond();
455    return ret.length() > 0;
456  }
457
458  @Override
459  public void kill(ServiceType service, String hostname, int port) throws IOException {
460    signal(service, SIGKILL, hostname);
461  }
462
463  @Override
464  public void suspend(ServiceType service, String hostname, int port) throws IOException {
465    signal(service, SIGSTOP, hostname);
466  }
467
468  @Override
469  public void resume(ServiceType service, String hostname, int port) throws IOException {
470    signal(service, SIGCONT, hostname);
471  }
472}