001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import java.io.BufferedReader;
021import java.io.BufferedWriter;
022import java.io.File;
023import java.io.FileInputStream;
024import java.io.FileNotFoundException;
025import java.io.FileWriter;
026import java.io.FilenameFilter;
027import java.io.IOException;
028import java.io.InputStreamReader;
029import java.io.PrintStream;
030import java.util.ArrayList;
031import java.util.Collections;
032import java.util.HashMap;
033import java.util.HashSet;
034import java.util.List;
035import java.util.Map;
036import java.util.Scanner;
037import java.util.Set;
038import java.util.TreeMap;
039import java.util.regex.Matcher;
040import java.util.regex.Pattern;
041import org.apache.commons.io.FileUtils;
042import org.apache.hadoop.conf.Configuration;
043import org.apache.hadoop.hbase.HBaseTestingUtil;
044import org.apache.hadoop.hbase.HConstants;
045import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
046import org.apache.hadoop.hbase.TableName;
047import org.apache.hadoop.hbase.zookeeper.ZKUtil;
048import org.apache.hadoop.hdfs.MiniDFSCluster;
049import org.slf4j.Logger;
050import org.slf4j.LoggerFactory;
051
052/**
053 * A helper class for process-based mini-cluster tests. Unlike {@link SingleProcessHBaseCluster},
054 * starts daemons as separate processes, allowing to do real kill testing.
055 */
056public class ProcessBasedLocalHBaseCluster {
057
058  private final String hbaseHome, workDir;
059  private final Configuration conf;
060  private final int numMasters, numRegionServers, numDataNodes;
061  private final List<Integer> rsPorts, masterPorts;
062
063  private final int zkClientPort;
064
065  private static final int MAX_FILE_SIZE_OVERRIDE = 10 * 1000 * 1000;
066
067  private static final Logger LOG = LoggerFactory.getLogger(ProcessBasedLocalHBaseCluster.class);
068
069  private List<String> daemonPidFiles = Collections.synchronizedList(new ArrayList<String>());
070
071  private boolean shutdownHookInstalled;
072
073  private String hbaseDaemonScript;
074
075  private MiniDFSCluster dfsCluster;
076
077  private HBaseTestingUtil testUtil;
078
079  private Thread logTailerThread;
080
081  private List<String> logTailDirs = Collections.synchronizedList(new ArrayList<String>());
082
083  private static enum ServerType {
084    MASTER("master"),
085    RS("regionserver"),
086    ZK("zookeeper");
087
088    private final String fullName;
089
090    private ServerType(String fullName) {
091      this.fullName = fullName;
092    }
093  }
094
095  /**
096   * Constructor. Modifies the passed configuration.
097   * @param conf             the {@link Configuration} to use
098   * @param numDataNodes     the number of data nodes
099   * @param numRegionServers the number of region servers
100   */
101  public ProcessBasedLocalHBaseCluster(Configuration conf, int numDataNodes, int numRegionServers) {
102    this.conf = conf;
103    this.hbaseHome = HBaseHomePath.getHomePath();
104    this.numMasters = 1;
105    this.numRegionServers = numRegionServers;
106    this.workDir = hbaseHome + "/target/local_cluster";
107    this.numDataNodes = numDataNodes;
108
109    hbaseDaemonScript = hbaseHome + "/bin/hbase-daemon.sh";
110    zkClientPort = HBaseTestingUtil.randomFreePort();
111
112    this.rsPorts = sortedPorts(numRegionServers);
113    this.masterPorts = sortedPorts(numMasters);
114
115    conf.set(HConstants.ZOOKEEPER_QUORUM, HConstants.LOCALHOST);
116    conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zkClientPort);
117  }
118
119  /**
120   * Makes this local HBase cluster use a mini-DFS cluster. Must be called before
121   * {@link #startHBase()}.
122   */
123  public void startMiniDFS() throws Exception {
124    if (testUtil == null) {
125      testUtil = new HBaseTestingUtil(conf);
126    }
127    dfsCluster = testUtil.startMiniDFSCluster(numDataNodes);
128  }
129
130  /**
131   * Generates a list of random port numbers in the sorted order. A sorted order makes sense if we
132   * ever want to refer to these servers by their index in the returned array, e.g. server #0, #1,
133   * etc.
134   */
135  private static List<Integer> sortedPorts(int n) {
136    List<Integer> ports = new ArrayList<>(n);
137    for (int i = 0; i < n; ++i) {
138      ports.add(HBaseTestingUtil.randomFreePort());
139    }
140    Collections.sort(ports);
141    return ports;
142  }
143
144  public void startHBase() throws IOException {
145    startDaemonLogTailer();
146    cleanupOldState();
147
148    // start ZK
149    LOG.info("Starting ZooKeeper on port " + zkClientPort);
150    startZK();
151
152    HBaseTestingUtil.waitForHostPort(HConstants.LOCALHOST, zkClientPort);
153
154    for (int masterPort : masterPorts) {
155      startMaster(masterPort);
156    }
157
158    ZKUtil.waitForBaseZNode(conf);
159
160    for (int rsPort : rsPorts) {
161      startRegionServer(rsPort);
162    }
163
164    LOG.info("Waiting for HBase startup by scanning META");
165    int attemptsLeft = 10;
166    while (attemptsLeft-- > 0) {
167      try {
168        testUtil.getConnection().getTable(TableName.META_TABLE_NAME);
169      } catch (Exception e) {
170        LOG.info("Waiting for HBase to startup. Retries left: " + attemptsLeft, e);
171        Threads.sleep(1000);
172      }
173    }
174
175    LOG.info("Process-based HBase Cluster with " + numRegionServers
176      + " region servers up and running... \n\n");
177  }
178
179  public void startRegionServer(int port) {
180    startServer(ServerType.RS, port);
181  }
182
183  public void startMaster(int port) {
184    startServer(ServerType.MASTER, port);
185  }
186
187  public void killRegionServer(int port) throws IOException {
188    killServer(ServerType.RS, port);
189  }
190
191  public void killMaster() throws IOException {
192    killServer(ServerType.MASTER, 0);
193  }
194
195  public void startZK() {
196    startServer(ServerType.ZK, 0);
197  }
198
199  private void executeCommand(String command) {
200    executeCommand(command, null);
201  }
202
203  private void executeCommand(String command, Map<String, String> envOverrides) {
204    ensureShutdownHookInstalled();
205    LOG.debug("Command : " + command);
206
207    try {
208      String[] envp = null;
209      if (envOverrides != null) {
210        Map<String, String> map = new HashMap<>(System.getenv());
211        map.putAll(envOverrides);
212        envp = new String[map.size()];
213        int idx = 0;
214        for (Map.Entry<String, String> e : map.entrySet()) {
215          envp[idx++] = e.getKey() + "=" + e.getValue();
216        }
217      }
218
219      Process p = Runtime.getRuntime().exec(command, envp);
220
221      BufferedReader stdInput = new BufferedReader(new InputStreamReader(p.getInputStream()));
222      BufferedReader stdError = new BufferedReader(new InputStreamReader(p.getErrorStream()));
223
224      // read the output from the command
225      String s = null;
226      while ((s = stdInput.readLine()) != null) {
227        System.out.println(s);
228      }
229
230      // read any errors from the attempted command
231      while ((s = stdError.readLine()) != null) {
232        System.out.println(s);
233      }
234    } catch (IOException e) {
235      LOG.error("Error running: " + command, e);
236    }
237  }
238
239  private void shutdownAllProcesses() {
240    LOG.info("Killing daemons using pid files");
241    final List<String> pidFiles = new ArrayList<>(daemonPidFiles);
242    for (String pidFile : pidFiles) {
243      int pid = 0;
244      try {
245        pid = readPidFromFile(pidFile);
246      } catch (IOException ex) {
247        LOG.error("Could not read pid from file " + pidFile);
248      }
249
250      if (pid > 0) {
251        LOG.info("Killing pid " + pid + " (" + pidFile + ")");
252        killProcess(pid);
253      }
254    }
255  }
256
257  private void ensureShutdownHookInstalled() {
258    if (shutdownHookInstalled) {
259      return;
260    }
261
262    Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
263      @Override
264      public void run() {
265        shutdownAllProcesses();
266      }
267    }));
268
269    shutdownHookInstalled = true;
270  }
271
272  private void cleanupOldState() {
273    executeCommand("rm -rf " + workDir);
274  }
275
276  private void writeStringToFile(String s, String fileName) {
277    try {
278      BufferedWriter out = new BufferedWriter(new FileWriter(fileName));
279      out.write(s);
280      out.close();
281    } catch (IOException e) {
282      LOG.error("Error writing to: " + fileName, e);
283    }
284  }
285
286  private String serverWorkingDir(ServerType serverType, int port) {
287    return workDir + "/" + serverType + "-" + port;
288  }
289
290  private int getServerPID(ServerType serverType, int port) throws IOException {
291    String pidFile = pidFilePath(serverType, port);
292    return readPidFromFile(pidFile);
293  }
294
295  private static int readPidFromFile(String pidFile) throws IOException {
296    Scanner scanner = new Scanner(new File(pidFile));
297    try {
298      return scanner.nextInt();
299    } finally {
300      scanner.close();
301    }
302  }
303
304  private String pidFilePath(ServerType serverType, int port) {
305    String dir = serverWorkingDir(serverType, port);
306    String user = System.getenv("USER");
307    String pidFile = String.format("%s/hbase-%s-%s.pid", dir, user, serverType.fullName);
308    return pidFile;
309  }
310
311  private void killServer(ServerType serverType, int port) throws IOException {
312    int pid = getServerPID(serverType, port);
313    if (pid > 0) {
314      LOG.info("Killing " + serverType + "; pid=" + pid);
315      killProcess(pid);
316    }
317  }
318
319  private void killProcess(int pid) {
320    String cmd = "kill -s KILL " + pid;
321    executeCommand(cmd);
322  }
323
324  private void startServer(ServerType serverType, int rsPort) {
325    // create working directory for this region server.
326    String dir = serverWorkingDir(serverType, rsPort);
327    String confStr = generateConfig(serverType, rsPort, dir);
328    LOG.debug("Creating directory " + dir);
329    new File(dir).mkdirs();
330
331    writeStringToFile(confStr, dir + "/hbase-site.xml");
332
333    // Set debug options to an empty string so that hbase-config.sh does not configure them
334    // using default ports. If we want to run remote debugging on process-based local cluster's
335    // daemons, we can automatically choose non-conflicting JDWP and JMX ports for each daemon
336    // and specify them here.
337    writeStringToFile("unset HBASE_MASTER_OPTS\n" + "unset HBASE_REGIONSERVER_OPTS\n"
338      + "unset HBASE_ZOOKEEPER_OPTS\n" + "HBASE_MASTER_DBG_OPTS=' '\n"
339      + "HBASE_REGIONSERVER_DBG_OPTS=' '\n" + "HBASE_ZOOKEEPER_DBG_OPTS=' '\n"
340      + "HBASE_MASTER_JMX_OPTS=' '\n" + "HBASE_REGIONSERVER_JMX_OPTS=' '\n"
341      + "HBASE_ZOOKEEPER_JMX_OPTS=' '\n", dir + "/hbase-env.sh");
342
343    Map<String, String> envOverrides = new HashMap<>();
344    envOverrides.put("HBASE_LOG_DIR", dir);
345    envOverrides.put("HBASE_PID_DIR", dir);
346    try {
347      FileUtils.copyFile(new File(hbaseHome, "conf/log4j.properties"),
348        new File(dir, "log4j.properties"));
349    } catch (IOException ex) {
350      LOG.error("Could not install log4j.properties into " + dir);
351    }
352
353    executeCommand(hbaseDaemonScript + " --config " + dir + " start " + serverType.fullName,
354      envOverrides);
355    daemonPidFiles.add(pidFilePath(serverType, rsPort));
356    logTailDirs.add(dir);
357  }
358
359  private final String generateConfig(ServerType serverType, int rpcPort, String daemonDir) {
360    StringBuilder sb = new StringBuilder();
361    Map<String, Object> confMap = new TreeMap<>();
362    confMap.put(HConstants.CLUSTER_DISTRIBUTED, true);
363
364    if (serverType == ServerType.MASTER) {
365      confMap.put(HConstants.MASTER_PORT, rpcPort);
366
367      int masterInfoPort = HBaseTestingUtil.randomFreePort();
368      reportWebUIPort("master", masterInfoPort);
369      confMap.put(HConstants.MASTER_INFO_PORT, masterInfoPort);
370    } else if (serverType == ServerType.RS) {
371      confMap.put(HConstants.REGIONSERVER_PORT, rpcPort);
372
373      int rsInfoPort = HBaseTestingUtil.randomFreePort();
374      reportWebUIPort("region server", rsInfoPort);
375      confMap.put(HConstants.REGIONSERVER_INFO_PORT, rsInfoPort);
376    } else {
377      confMap.put(HConstants.ZOOKEEPER_DATA_DIR, daemonDir);
378    }
379
380    confMap.put(HConstants.ZOOKEEPER_CLIENT_PORT, zkClientPort);
381    confMap.put(HConstants.HREGION_MAX_FILESIZE, MAX_FILE_SIZE_OVERRIDE);
382
383    if (dfsCluster != null) {
384      String fsURL = "hdfs://" + HConstants.LOCALHOST + ":" + dfsCluster.getNameNodePort();
385      confMap.put("fs.defaultFS", fsURL);
386      confMap.put("hbase.rootdir", fsURL + "/hbase_test");
387    }
388
389    sb.append("<configuration>\n");
390    for (Map.Entry<String, Object> entry : confMap.entrySet()) {
391      sb.append("  <property>\n");
392      sb.append("    <name>" + entry.getKey() + "</name>\n");
393      sb.append("    <value>" + entry.getValue() + "</value>\n");
394      sb.append("  </property>\n");
395    }
396    sb.append("</configuration>\n");
397    return sb.toString();
398  }
399
400  private static void reportWebUIPort(String daemon, int port) {
401    LOG.info("Local " + daemon + " web UI is at http://" + HConstants.LOCALHOST + ":" + port);
402  }
403
404  public Configuration getConf() {
405    return conf;
406  }
407
408  public void shutdown() {
409    if (dfsCluster != null) {
410      dfsCluster.shutdown();
411    }
412    shutdownAllProcesses();
413  }
414
415  private static final Pattern TO_REMOVE_FROM_LOG_LINES_RE =
416    Pattern.compile("org\\.apache\\.hadoop\\.hbase\\.");
417
418  private static final Pattern LOG_PATH_FORMAT_RE = Pattern.compile("^.*/([A-Z]+)-(\\d+)/[^/]+$");
419
420  private static String processLine(String line) {
421    Matcher m = TO_REMOVE_FROM_LOG_LINES_RE.matcher(line);
422    return m.replaceAll("");
423  }
424
425  private final class LocalDaemonLogTailer implements Runnable {
426    private final Set<String> tailedFiles = new HashSet<>();
427    private final List<String> dirList = new ArrayList<>();
428    private final Object printLock = new Object();
429
430    private final FilenameFilter LOG_FILES = new FilenameFilter() {
431      @Override
432      public boolean accept(File dir, String name) {
433        return name.endsWith(".out") || name.endsWith(".log");
434      }
435    };
436
437    @Override
438    public void run() {
439      try {
440        runInternal();
441      } catch (IOException ex) {
442        LOG.error(ex.toString(), ex);
443      }
444    }
445
446    private void runInternal() throws IOException {
447      Thread.currentThread().setName(getClass().getSimpleName());
448      while (true) {
449        scanDirs();
450        try {
451          Thread.sleep(500);
452        } catch (InterruptedException e) {
453          LOG.error("Log tailer thread interrupted", e);
454          break;
455        }
456      }
457    }
458
459    private void scanDirs() throws FileNotFoundException {
460      dirList.clear();
461      dirList.addAll(logTailDirs);
462      for (String d : dirList) {
463        for (File f : new File(d).listFiles(LOG_FILES)) {
464          String filePath = f.getAbsolutePath();
465          if (!tailedFiles.contains(filePath)) {
466            tailedFiles.add(filePath);
467            startTailingFile(filePath);
468          }
469        }
470      }
471    }
472
473    private void startTailingFile(final String filePath) throws FileNotFoundException {
474      final PrintStream dest = filePath.endsWith(".log") ? System.err : System.out;
475      final ServerType serverType;
476      final int serverPort;
477      Matcher m = LOG_PATH_FORMAT_RE.matcher(filePath);
478      if (m.matches()) {
479        serverType = ServerType.valueOf(m.group(1));
480        serverPort = Integer.valueOf(m.group(2));
481      } else {
482        LOG.error("Unrecognized log path format: " + filePath);
483        return;
484      }
485      final String logMsgPrefix =
486        "[" + serverType + (serverPort != 0 ? ":" + serverPort : "") + "] ";
487
488      LOG.debug("Tailing " + filePath);
489      Thread t = new Thread(new Runnable() {
490        @Override
491        public void run() {
492          try {
493            FileInputStream fis = new FileInputStream(filePath);
494            BufferedReader br = new BufferedReader(new InputStreamReader(fis));
495            String line;
496            while (true) {
497              try {
498                Thread.sleep(200);
499              } catch (InterruptedException e) {
500                LOG.error("Tailer for " + filePath + " interrupted");
501                break;
502              }
503              while ((line = br.readLine()) != null) {
504                line = logMsgPrefix + processLine(line);
505                synchronized (printLock) {
506                  if (line.endsWith("\n")) {
507                    dest.print(line);
508                  } else {
509                    dest.println(line);
510                  }
511                  dest.flush();
512                }
513              }
514            }
515          } catch (IOException ex) {
516            LOG.error("Failed tailing " + filePath, ex);
517          }
518        }
519      });
520      t.setDaemon(true);
521      t.setName("Tailer for " + filePath);
522      t.start();
523    }
524
525  }
526
527  private void startDaemonLogTailer() {
528    logTailerThread = new Thread(new LocalDaemonLogTailer());
529    logTailerThread.setDaemon(true);
530    logTailerThread.start();
531  }
532
533}