001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase;
020
021import java.io.File;
022import java.io.IOException;
023import java.util.Locale;
024import java.util.Map;
025
026import org.apache.commons.lang3.StringUtils;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.conf.Configured;
029import org.apache.hadoop.hbase.HBaseClusterManager.CommandProvider.Operation;
030import org.apache.hadoop.hbase.util.Pair;
031import org.apache.hadoop.hbase.util.RetryCounter;
032import org.apache.hadoop.hbase.util.RetryCounter.RetryConfig;
033import org.apache.hadoop.hbase.util.RetryCounterFactory;
034import org.apache.hadoop.util.Shell;
035import org.apache.yetus.audience.InterfaceAudience;
036import org.slf4j.Logger;
037import org.slf4j.LoggerFactory;
038
039/**
040 * A default cluster manager for HBase. Uses SSH, and hbase shell scripts
041 * to manage the cluster. Assumes Unix-like commands are available like 'ps',
042 * 'kill', etc. Also assumes the user running the test has enough "power" to start & stop
043 * servers on the remote machines (for example, the test user could be the same user as the
044 * user the daemon is running as)
045 */
046@InterfaceAudience.Private
047public class HBaseClusterManager extends Configured implements ClusterManager {
048  private static final String SIGKILL = "SIGKILL";
049  private static final String SIGSTOP = "SIGSTOP";
050  private static final String SIGCONT = "SIGCONT";
051
052  protected static final Logger LOG = LoggerFactory.getLogger(HBaseClusterManager.class);
053  private String sshUserName;
054  private String sshOptions;
055
056  /**
057   * The command format that is used to execute the remote command. Arguments:
058   * 1 SSH options, 2 user name , 3 "@" if username is set, 4 host,
059   * 5 original command, 6 service user.
060   */
061  private static final String DEFAULT_TUNNEL_CMD =
062      "timeout 30 /usr/bin/ssh %1$s %2$s%3$s%4$s \"sudo -u %6$s %5$s\"";
063  private String tunnelCmd;
064
065  private static final String RETRY_ATTEMPTS_KEY = "hbase.it.clustermanager.retry.attempts";
066  private static final int DEFAULT_RETRY_ATTEMPTS = 5;
067
068  private static final String RETRY_SLEEP_INTERVAL_KEY = "hbase.it.clustermanager.retry.sleep.interval";
069  private static final int DEFAULT_RETRY_SLEEP_INTERVAL = 1000;
070
071  protected RetryCounterFactory retryCounterFactory;
072
073  @Override
074  public void setConf(Configuration conf) {
075    super.setConf(conf);
076    if (conf == null) {
077      // Configured gets passed null before real conf. Why? I don't know.
078      return;
079    }
080    sshUserName = conf.get("hbase.it.clustermanager.ssh.user", "");
081    String extraSshOptions = conf.get("hbase.it.clustermanager.ssh.opts", "");
082    sshOptions = System.getenv("HBASE_SSH_OPTS");
083    if (!extraSshOptions.isEmpty()) {
084      sshOptions = StringUtils.join(new Object[] { sshOptions, extraSshOptions }, " ");
085    }
086    sshOptions = (sshOptions == null) ? "" : sshOptions;
087    sshUserName = (sshUserName == null) ? "" : sshUserName;
088    tunnelCmd = conf.get("hbase.it.clustermanager.ssh.cmd", DEFAULT_TUNNEL_CMD);
089    // Print out ssh special config if any.
090    if ((sshUserName != null && sshUserName.length() > 0) ||
091        (sshOptions != null && sshOptions.length() > 0)) {
092      LOG.info("Running with SSH user [" + sshUserName + "] and options [" + sshOptions + "]");
093    }
094
095    this.retryCounterFactory = new RetryCounterFactory(new RetryConfig()
096        .setMaxAttempts(conf.getInt(RETRY_ATTEMPTS_KEY, DEFAULT_RETRY_ATTEMPTS))
097        .setSleepInterval(conf.getLong(RETRY_SLEEP_INTERVAL_KEY, DEFAULT_RETRY_SLEEP_INTERVAL)));
098  }
099
100  private String getServiceUser(ServiceType service) {
101    Configuration conf = getConf();
102    switch (service) {
103      case HADOOP_DATANODE:
104        return conf.get("hbase.it.clustermanager.hadoop.hdfs.user", "hdfs");
105      case ZOOKEEPER_SERVER:
106        return conf.get("hbase.it.clustermanager.zookeeper.user", "zookeeper");
107      default:
108        return conf.get("hbase.it.clustermanager.hbase.user", "hbase");
109    }
110  }
111
112  /**
113   * Executes commands over SSH
114   */
115  protected class RemoteShell extends Shell.ShellCommandExecutor {
116    private String hostname;
117    private String user;
118
119    public RemoteShell(String hostname, String[] execString, File dir, Map<String, String> env,
120        long timeout) {
121      super(execString, dir, env, timeout);
122      this.hostname = hostname;
123    }
124
125    public RemoteShell(String hostname, String[] execString, File dir, Map<String, String> env) {
126      super(execString, dir, env);
127      this.hostname = hostname;
128    }
129
130    public RemoteShell(String hostname, String[] execString, File dir) {
131      super(execString, dir);
132      this.hostname = hostname;
133    }
134
135    public RemoteShell(String hostname, String[] execString) {
136      super(execString);
137      this.hostname = hostname;
138    }
139
140    public RemoteShell(String hostname, String user, String[] execString) {
141      super(execString);
142      this.hostname = hostname;
143      this.user = user;
144    }
145
146    @Override
147    public String[] getExecString() {
148      String at = sshUserName.isEmpty() ? "" : "@";
149      String remoteCmd = StringUtils.join(super.getExecString(), " ");
150      String cmd = String.format(tunnelCmd, sshOptions, sshUserName, at, hostname, remoteCmd, user);
151      LOG.info("Executing full command [" + cmd + "]");
152      return new String[] { "/usr/bin/env", "bash", "-c", cmd };
153    }
154
155    @Override
156    public void execute() throws IOException {
157      super.execute();
158    }
159  }
160
161  /**
162   * Provides command strings for services to be executed by Shell. CommandProviders are
163   * pluggable, and different deployments(windows, bigtop, etc) can be managed by
164   * plugging-in custom CommandProvider's or ClusterManager's.
165   */
166  static abstract class CommandProvider {
167
168    enum Operation {
169      START, STOP, RESTART
170    }
171
172    public abstract String getCommand(ServiceType service, Operation op);
173
174    public String isRunningCommand(ServiceType service) {
175      return findPidCommand(service);
176    }
177
178    protected String findPidCommand(ServiceType service) {
179      return String.format("ps ux | grep proc_%s | grep -v grep | tr -s ' ' | cut -d ' ' -f2",
180          service);
181    }
182
183    public String signalCommand(ServiceType service, String signal) {
184      return String.format("%s | xargs kill -s %s", findPidCommand(service), signal);
185    }
186  }
187
188  /**
189   * CommandProvider to manage the service using bin/hbase-* scripts
190   */
191  static class HBaseShellCommandProvider extends CommandProvider {
192    private final String hbaseHome;
193    private final String confDir;
194
195    HBaseShellCommandProvider(Configuration conf) {
196      hbaseHome = conf.get("hbase.it.clustermanager.hbase.home",
197        System.getenv("HBASE_HOME"));
198      String tmp = conf.get("hbase.it.clustermanager.hbase.conf.dir",
199        System.getenv("HBASE_CONF_DIR"));
200      if (tmp != null) {
201        confDir = String.format("--config %s", tmp);
202      } else {
203        confDir = "";
204      }
205    }
206
207    @Override
208    public String getCommand(ServiceType service, Operation op) {
209      return String.format("%s/bin/hbase-daemon.sh %s %s %s", hbaseHome, confDir,
210          op.toString().toLowerCase(Locale.ROOT), service);
211    }
212  }
213
214  /**
215   * CommandProvider to manage the service using sbin/hadoop-* scripts.
216   */
217  static class HadoopShellCommandProvider extends CommandProvider {
218    private final String hadoopHome;
219    private final String confDir;
220
221    HadoopShellCommandProvider(Configuration conf) throws IOException {
222      hadoopHome = conf.get("hbase.it.clustermanager.hadoop.home",
223          System.getenv("HADOOP_HOME"));
224      String tmp = conf.get("hbase.it.clustermanager.hadoop.conf.dir",
225          System.getenv("HADOOP_CONF_DIR"));
226      if (hadoopHome == null) {
227        throw new IOException("Hadoop home configuration parameter i.e. " +
228          "'hbase.it.clustermanager.hadoop.home' is not configured properly.");
229      }
230      if (tmp != null) {
231        confDir = String.format("--config %s", tmp);
232      } else {
233        confDir = "";
234      }
235    }
236
237    @Override
238    public String getCommand(ServiceType service, Operation op) {
239      return String.format("%s/sbin/hadoop-daemon.sh %s %s %s", hadoopHome, confDir,
240          op.toString().toLowerCase(Locale.ROOT), service);
241    }
242  }
243
244  /**
245   * CommandProvider to manage the service using bin/zk* scripts.
246   */
247  static class ZookeeperShellCommandProvider extends CommandProvider {
248    private final String zookeeperHome;
249    private final String confDir;
250
251    ZookeeperShellCommandProvider(Configuration conf) throws IOException {
252      zookeeperHome = conf.get("hbase.it.clustermanager.zookeeper.home",
253          System.getenv("ZOOBINDIR"));
254      String tmp = conf.get("hbase.it.clustermanager.zookeeper.conf.dir",
255          System.getenv("ZOOCFGDIR"));
256      if (zookeeperHome == null) {
257        throw new IOException("ZooKeeper home configuration parameter i.e. " +
258          "'hbase.it.clustermanager.zookeeper.home' is not configured properly.");
259      }
260      if (tmp != null) {
261        confDir = String.format("--config %s", tmp);
262      } else {
263        confDir = "";
264      }
265    }
266
267    @Override
268    public String getCommand(ServiceType service, Operation op) {
269      return String.format("%s/bin/zkServer.sh %s", zookeeperHome, op.toString().toLowerCase(Locale.ROOT));
270    }
271
272    @Override
273    protected String findPidCommand(ServiceType service) {
274      return String.format("ps ux | grep %s | grep -v grep | tr -s ' ' | cut -d ' ' -f2",
275        service);
276    }
277  }
278
279  public HBaseClusterManager() {
280  }
281
282  protected CommandProvider getCommandProvider(ServiceType service) throws IOException {
283    switch (service) {
284      case HADOOP_DATANODE:
285        return new HadoopShellCommandProvider(getConf());
286      case ZOOKEEPER_SERVER:
287        return new ZookeeperShellCommandProvider(getConf());
288      default:
289        return new HBaseShellCommandProvider(getConf());
290    }
291  }
292
293  /**
294   * Execute the given command on the host using SSH
295   * @return pair of exit code and command output
296   * @throws IOException if something goes wrong.
297   */
298  private Pair<Integer, String> exec(String hostname, ServiceType service, String... cmd)
299    throws IOException {
300    LOG.info("Executing remote command: " + StringUtils.join(cmd, " ") + " , hostname:" + hostname);
301
302    RemoteShell shell = new RemoteShell(hostname, getServiceUser(service), cmd);
303    try {
304      shell.execute();
305    } catch (Shell.ExitCodeException ex) {
306      // capture the stdout of the process as well.
307      String output = shell.getOutput();
308      // add output for the ExitCodeException.
309      throw new Shell.ExitCodeException(ex.getExitCode(), "stderr: " + ex.getMessage()
310        + ", stdout: " + output);
311    }
312
313    LOG.info("Executed remote command, exit code:" + shell.getExitCode()
314        + " , output:" + shell.getOutput());
315
316    return new Pair<>(shell.getExitCode(), shell.getOutput());
317  }
318
319  private Pair<Integer, String> execWithRetries(String hostname, ServiceType service, String... cmd)
320      throws IOException {
321    RetryCounter retryCounter = retryCounterFactory.create();
322    while (true) {
323      try {
324        return exec(hostname, service, cmd);
325      } catch (IOException e) {
326        retryOrThrow(retryCounter, e, hostname, cmd);
327      }
328      try {
329        retryCounter.sleepUntilNextRetry();
330      } catch (InterruptedException ex) {
331        // ignore
332        LOG.warn("Sleep Interrupted:" + ex);
333      }
334    }
335  }
336
337  private <E extends Exception> void retryOrThrow(RetryCounter retryCounter, E ex,
338      String hostname, String[] cmd) throws E {
339    if (retryCounter.shouldRetry()) {
340      LOG.warn("Remote command: " + StringUtils.join(cmd, " ") + " , hostname:" + hostname
341        + " failed at attempt " + retryCounter.getAttemptTimes() + ". Retrying until maxAttempts: "
342          + retryCounter.getMaxAttempts() + ". Exception: " + ex.getMessage());
343      return;
344    }
345    throw ex;
346  }
347
348  private void exec(String hostname, ServiceType service, Operation op) throws IOException {
349    execWithRetries(hostname, service, getCommandProvider(service).getCommand(service, op));
350  }
351
352  @Override
353  public void start(ServiceType service, String hostname, int port) throws IOException {
354    exec(hostname, service, Operation.START);
355  }
356
357  @Override
358  public void stop(ServiceType service, String hostname, int port) throws IOException {
359    exec(hostname, service, Operation.STOP);
360  }
361
362  @Override
363  public void restart(ServiceType service, String hostname, int port) throws IOException {
364    exec(hostname, service, Operation.RESTART);
365  }
366
367  public void signal(ServiceType service, String signal, String hostname) throws IOException {
368    execWithRetries(hostname, service, getCommandProvider(service).signalCommand(service, signal));
369  }
370
371  @Override
372  public boolean isRunning(ServiceType service, String hostname, int port) throws IOException {
373    String ret = execWithRetries(hostname, service,
374      getCommandProvider(service).isRunningCommand(service)).getSecond();
375    return ret.length() > 0;
376  }
377
378  @Override
379  public void kill(ServiceType service, String hostname, int port) throws IOException {
380    signal(service, SIGKILL, hostname);
381  }
382
383  @Override
384  public void suspend(ServiceType service, String hostname, int port) throws IOException {
385    signal(service, SIGSTOP, hostname);
386  }
387
388  @Override
389  public void resume(ServiceType service, String hostname, int port) throws IOException {
390    signal(service, SIGCONT, hostname);
391  }
392}