001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with this
004 * work for additional information regarding copyright ownership. The ASF
005 * licenses this file to you under the Apache License, Version 2.0 (the
006 * "License"); you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
013 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
014 * License for the specific language governing permissions and limitations
015 * under the License.
016 */
017package org.apache.hadoop.hbase.util;
018
019import java.io.BufferedReader;
020import java.io.BufferedWriter;
021import java.io.File;
022import java.io.FileInputStream;
023import java.io.FileNotFoundException;
024import java.io.FileWriter;
025import java.io.FilenameFilter;
026import java.io.IOException;
027import java.io.InputStreamReader;
028import java.io.PrintStream;
029import java.util.ArrayList;
030import java.util.Collections;
031import java.util.HashMap;
032import java.util.HashSet;
033import java.util.List;
034import java.util.Map;
035import java.util.Scanner;
036import java.util.Set;
037import java.util.TreeMap;
038import java.util.regex.Matcher;
039import java.util.regex.Pattern;
040
041import org.apache.commons.io.FileUtils;
042import org.apache.hadoop.conf.Configuration;
043import org.apache.hadoop.hbase.HBaseTestingUtility;
044import org.apache.hadoop.hbase.HConstants;
045import org.apache.hadoop.hbase.MiniHBaseCluster;
046import org.apache.hadoop.hbase.TableName;
047import org.apache.hadoop.hbase.testclassification.LargeTests;
048import org.apache.hadoop.hbase.testclassification.MiscTests;
049import org.apache.hadoop.hbase.zookeeper.ZKUtil;
050import org.apache.hadoop.hdfs.MiniDFSCluster;
051import org.junit.experimental.categories.Category;
052import org.slf4j.Logger;
053import org.slf4j.LoggerFactory;
054
055/**
056 * A helper class for process-based mini-cluster tests. Unlike
057 * {@link MiniHBaseCluster}, starts daemons as separate processes, allowing to
058 * do real kill testing.
059 */
060@Category({MiscTests.class, LargeTests.class})
061public class ProcessBasedLocalHBaseCluster {
062
063  private final String hbaseHome, workDir;
064  private final Configuration conf;
065  private final int numMasters, numRegionServers, numDataNodes;
066  private final List<Integer> rsPorts, masterPorts;
067
068  private final int zkClientPort;
069
070  private static final int MAX_FILE_SIZE_OVERRIDE = 10 * 1000 * 1000;
071
072  private static final Logger LOG = LoggerFactory.getLogger(
073      ProcessBasedLocalHBaseCluster.class);
074
075  private List<String> daemonPidFiles =
076      Collections.synchronizedList(new ArrayList<String>());;
077
078  private boolean shutdownHookInstalled;
079
080  private String hbaseDaemonScript;
081
082  private MiniDFSCluster dfsCluster;
083
084  private HBaseTestingUtility testUtil;
085
086  private Thread logTailerThread;
087
088  private List<String> logTailDirs = Collections.synchronizedList(new ArrayList<String>());
089
090  private static enum ServerType {
091    MASTER("master"),
092    RS("regionserver"),
093    ZK("zookeeper");
094
095    private final String fullName;
096
097    private ServerType(String fullName) {
098      this.fullName = fullName;
099    }
100  }
101
102  /**
103   * Constructor. Modifies the passed configuration.
104   * @param conf the {@link Configuration} to use
105   * @param numDataNodes the number of data nodes
106   * @param numRegionServers the number of region servers
107   */
108  public ProcessBasedLocalHBaseCluster(Configuration conf,
109      int numDataNodes, int numRegionServers) {
110    this.conf = conf;
111    this.hbaseHome = HBaseHomePath.getHomePath();
112    this.numMasters = 1;
113    this.numRegionServers = numRegionServers;
114    this.workDir = hbaseHome + "/target/local_cluster";
115    this.numDataNodes = numDataNodes;
116
117    hbaseDaemonScript = hbaseHome + "/bin/hbase-daemon.sh";
118    zkClientPort = HBaseTestingUtility.randomFreePort();
119
120    this.rsPorts = sortedPorts(numRegionServers);
121    this.masterPorts = sortedPorts(numMasters);
122
123    conf.set(HConstants.ZOOKEEPER_QUORUM, HConstants.LOCALHOST);
124    conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zkClientPort);
125  }
126
127  /**
128   * Makes this local HBase cluster use a mini-DFS cluster. Must be called before
129   * {@link #startHBase()}.
130   * @throws IOException
131   */
132  public void startMiniDFS() throws Exception {
133    if (testUtil == null) {
134      testUtil = new HBaseTestingUtility(conf);
135    }
136    dfsCluster = testUtil.startMiniDFSCluster(numDataNodes);
137  }
138
139  /**
140   * Generates a list of random port numbers in the sorted order. A sorted
141   * order makes sense if we ever want to refer to these servers by their index
142   * in the returned array, e.g. server #0, #1, etc.
143   */
144  private static List<Integer> sortedPorts(int n) {
145    List<Integer> ports = new ArrayList<>(n);
146    for (int i = 0; i < n; ++i) {
147      ports.add(HBaseTestingUtility.randomFreePort());
148    }
149    Collections.sort(ports);
150    return ports;
151  }
152
153  public void startHBase() throws IOException {
154    startDaemonLogTailer();
155    cleanupOldState();
156
157    // start ZK
158    LOG.info("Starting ZooKeeper on port " + zkClientPort);
159    startZK();
160
161    HBaseTestingUtility.waitForHostPort(HConstants.LOCALHOST, zkClientPort);
162
163    for (int masterPort : masterPorts) {
164      startMaster(masterPort);
165    }
166
167    ZKUtil.waitForBaseZNode(conf);
168
169    for (int rsPort : rsPorts) {
170      startRegionServer(rsPort);
171    }
172
173    LOG.info("Waiting for HBase startup by scanning META");
174    int attemptsLeft = 10;
175    while (attemptsLeft-- > 0) {
176      try {
177        testUtil.getConnection().getTable(TableName.META_TABLE_NAME);
178      } catch (Exception e) {
179        LOG.info("Waiting for HBase to startup. Retries left: " + attemptsLeft,
180            e);
181        Threads.sleep(1000);
182      }
183    }
184
185    LOG.info("Process-based HBase Cluster with " + numRegionServers +
186        " region servers up and running... \n\n");
187  }
188
189  public void startRegionServer(int port) {
190    startServer(ServerType.RS, port);
191  }
192
193  public void startMaster(int port) {
194    startServer(ServerType.MASTER, port);
195  }
196
197  public void killRegionServer(int port) throws IOException {
198    killServer(ServerType.RS, port);
199  }
200
201  public void killMaster() throws IOException {
202    killServer(ServerType.MASTER, 0);
203  }
204
205  public void startZK() {
206    startServer(ServerType.ZK, 0);
207  }
208
209  private void executeCommand(String command) {
210    executeCommand(command, null);
211  }
212
213  private void executeCommand(String command, Map<String,
214      String> envOverrides) {
215    ensureShutdownHookInstalled();
216    LOG.debug("Command : " + command);
217
218    try {
219      String [] envp = null;
220      if (envOverrides != null) {
221        Map<String, String> map = new HashMap<>(System.getenv());
222        map.putAll(envOverrides);
223        envp = new String[map.size()];
224        int idx = 0;
225        for (Map.Entry<String, String> e: map.entrySet()) {
226          envp[idx++] = e.getKey() + "=" + e.getValue();
227        }
228      }
229
230      Process p = Runtime.getRuntime().exec(command, envp);
231
232      BufferedReader stdInput = new BufferedReader(
233          new InputStreamReader(p.getInputStream()));
234      BufferedReader stdError = new BufferedReader(
235          new InputStreamReader(p.getErrorStream()));
236
237      // read the output from the command
238      String s = null;
239      while ((s = stdInput.readLine()) != null) {
240        System.out.println(s);
241      }
242
243      // read any errors from the attempted command
244      while ((s = stdError.readLine()) != null) {
245        System.out.println(s);
246      }
247    } catch (IOException e) {
248      LOG.error("Error running: " + command, e);
249    }
250  }
251
252  private void shutdownAllProcesses() {
253    LOG.info("Killing daemons using pid files");
254    final List<String> pidFiles = new ArrayList<>(daemonPidFiles);
255    for (String pidFile : pidFiles) {
256      int pid = 0;
257      try {
258        pid = readPidFromFile(pidFile);
259      } catch (IOException ex) {
260        LOG.error("Could not read pid from file " + pidFile);
261      }
262
263      if (pid > 0) {
264        LOG.info("Killing pid " + pid + " (" + pidFile + ")");
265        killProcess(pid);
266      }
267    }
268  }
269
270  private void ensureShutdownHookInstalled() {
271    if (shutdownHookInstalled) {
272      return;
273    }
274
275    Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
276      @Override
277      public void run() {
278        shutdownAllProcesses();
279      }
280    }));
281
282    shutdownHookInstalled = true;
283  }
284
285  private void cleanupOldState() {
286    executeCommand("rm -rf " + workDir);
287  }
288
289  private void writeStringToFile(String s, String fileName) {
290    try {
291      BufferedWriter out = new BufferedWriter(new FileWriter(fileName));
292      out.write(s);
293      out.close();
294    } catch (IOException e) {
295      LOG.error("Error writing to: " + fileName, e);
296    }
297  }
298
299  private String serverWorkingDir(ServerType serverType, int port) {
300    return workDir + "/" + serverType + "-" + port;
301  }
302
303  private int getServerPID(ServerType serverType, int port) throws IOException {
304    String pidFile = pidFilePath(serverType, port);
305    return readPidFromFile(pidFile);
306  }
307
308  private static int readPidFromFile(String pidFile) throws IOException {
309    Scanner scanner = new Scanner(new File(pidFile));
310    try {
311      return scanner.nextInt();
312    } finally {
313      scanner.close();
314    }
315  }
316
317  private String pidFilePath(ServerType serverType, int port) {
318    String dir = serverWorkingDir(serverType, port);
319    String user = System.getenv("USER");
320    String pidFile = String.format("%s/hbase-%s-%s.pid",
321                                   dir, user, serverType.fullName);
322    return pidFile;
323  }
324
325  private void killServer(ServerType serverType, int port) throws IOException {
326    int pid = getServerPID(serverType, port);
327    if (pid > 0) {
328      LOG.info("Killing " + serverType + "; pid=" + pid);
329      killProcess(pid);
330    }
331  }
332
333  private void killProcess(int pid) {
334    String cmd = "kill -s KILL " + pid;
335    executeCommand(cmd);
336  }
337
338  private void startServer(ServerType serverType, int rsPort) {
339    // create working directory for this region server.
340    String dir = serverWorkingDir(serverType, rsPort);
341    String confStr = generateConfig(serverType, rsPort, dir);
342    LOG.debug("Creating directory " + dir);
343    new File(dir).mkdirs();
344
345    writeStringToFile(confStr, dir + "/hbase-site.xml");
346
347    // Set debug options to an empty string so that hbase-config.sh does not configure them
348    // using default ports. If we want to run remote debugging on process-based local cluster's
349    // daemons, we can automatically choose non-conflicting JDWP and JMX ports for each daemon
350    // and specify them here.
351    writeStringToFile(
352        "unset HBASE_MASTER_OPTS\n" +
353        "unset HBASE_REGIONSERVER_OPTS\n" +
354        "unset HBASE_ZOOKEEPER_OPTS\n" +
355        "HBASE_MASTER_DBG_OPTS=' '\n" +
356        "HBASE_REGIONSERVER_DBG_OPTS=' '\n" +
357        "HBASE_ZOOKEEPER_DBG_OPTS=' '\n" +
358        "HBASE_MASTER_JMX_OPTS=' '\n" +
359        "HBASE_REGIONSERVER_JMX_OPTS=' '\n" +
360        "HBASE_ZOOKEEPER_JMX_OPTS=' '\n",
361        dir + "/hbase-env.sh");
362
363    Map<String, String> envOverrides = new HashMap<>();
364    envOverrides.put("HBASE_LOG_DIR", dir);
365    envOverrides.put("HBASE_PID_DIR", dir);
366    try {
367      FileUtils.copyFile(
368          new File(hbaseHome, "conf/log4j.properties"),
369          new File(dir, "log4j.properties"));
370    } catch (IOException ex) {
371      LOG.error("Could not install log4j.properties into " + dir);
372    }
373
374    executeCommand(hbaseDaemonScript + " --config " + dir +
375                   " start " + serverType.fullName, envOverrides);
376    daemonPidFiles.add(pidFilePath(serverType, rsPort));
377    logTailDirs.add(dir);
378  }
379
380  private final String generateConfig(ServerType serverType, int rpcPort,
381      String daemonDir) {
382    StringBuilder sb = new StringBuilder();
383    Map<String, Object> confMap = new TreeMap<>();
384    confMap.put(HConstants.CLUSTER_DISTRIBUTED, true);
385
386    if (serverType == ServerType.MASTER) {
387      confMap.put(HConstants.MASTER_PORT, rpcPort);
388
389      int masterInfoPort = HBaseTestingUtility.randomFreePort();
390      reportWebUIPort("master", masterInfoPort);
391      confMap.put(HConstants.MASTER_INFO_PORT, masterInfoPort);
392    } else if (serverType == ServerType.RS) {
393      confMap.put(HConstants.REGIONSERVER_PORT, rpcPort);
394
395      int rsInfoPort = HBaseTestingUtility.randomFreePort();
396      reportWebUIPort("region server", rsInfoPort);
397      confMap.put(HConstants.REGIONSERVER_INFO_PORT, rsInfoPort);
398    } else {
399      confMap.put(HConstants.ZOOKEEPER_DATA_DIR, daemonDir);
400    }
401
402    confMap.put(HConstants.ZOOKEEPER_CLIENT_PORT, zkClientPort);
403    confMap.put(HConstants.HREGION_MAX_FILESIZE, MAX_FILE_SIZE_OVERRIDE);
404
405    if (dfsCluster != null) {
406      String fsURL = "hdfs://" + HConstants.LOCALHOST + ":" + dfsCluster.getNameNodePort();
407      confMap.put("fs.defaultFS", fsURL);
408      confMap.put("hbase.rootdir", fsURL + "/hbase_test");
409    }
410
411    sb.append("<configuration>\n");
412    for (Map.Entry<String, Object> entry : confMap.entrySet()) {
413      sb.append("  <property>\n");
414      sb.append("    <name>" + entry.getKey() + "</name>\n");
415      sb.append("    <value>" + entry.getValue() + "</value>\n");
416      sb.append("  </property>\n");
417    }
418    sb.append("</configuration>\n");
419    return sb.toString();
420  }
421
422  private static void reportWebUIPort(String daemon, int port) {
423    LOG.info("Local " + daemon + " web UI is at http://"
424        + HConstants.LOCALHOST + ":" + port);
425  }
426
427  public Configuration getConf() {
428    return conf;
429  }
430
431  public void shutdown() {
432    if (dfsCluster != null) {
433      dfsCluster.shutdown();
434    }
435    shutdownAllProcesses();
436  }
437
438  private static final Pattern TO_REMOVE_FROM_LOG_LINES_RE =
439      Pattern.compile("org\\.apache\\.hadoop\\.hbase\\.");
440
441  private static final Pattern LOG_PATH_FORMAT_RE =
442      Pattern.compile("^.*/([A-Z]+)-(\\d+)/[^/]+$");
443
444  private static String processLine(String line) {
445    Matcher m = TO_REMOVE_FROM_LOG_LINES_RE.matcher(line);
446    return m.replaceAll("");
447  }
448
449  private final class LocalDaemonLogTailer implements Runnable {
450    private final Set<String> tailedFiles = new HashSet<>();
451    private final List<String> dirList = new ArrayList<>();
452    private final Object printLock = new Object();
453
454    private final FilenameFilter LOG_FILES = new FilenameFilter() {
455      @Override
456      public boolean accept(File dir, String name) {
457        return name.endsWith(".out") || name.endsWith(".log");
458      }
459    };
460
461    @Override
462    public void run() {
463      try {
464        runInternal();
465      } catch (IOException ex) {
466        LOG.error(ex.toString(), ex);
467      }
468    }
469
470    private void runInternal() throws IOException {
471      Thread.currentThread().setName(getClass().getSimpleName());
472      while (true) {
473        scanDirs();
474        try {
475          Thread.sleep(500);
476        } catch (InterruptedException e) {
477          LOG.error("Log tailer thread interrupted", e);
478          break;
479        }
480      }
481    }
482
483    private void scanDirs() throws FileNotFoundException {
484      dirList.clear();
485      dirList.addAll(logTailDirs);
486      for (String d : dirList) {
487        for (File f : new File(d).listFiles(LOG_FILES)) {
488          String filePath = f.getAbsolutePath();
489          if (!tailedFiles.contains(filePath)) {
490            tailedFiles.add(filePath);
491            startTailingFile(filePath);
492          }
493        }
494      }
495    }
496
497    private void startTailingFile(final String filePath) throws FileNotFoundException {
498      final PrintStream dest = filePath.endsWith(".log") ? System.err : System.out;
499      final ServerType serverType;
500      final int serverPort;
501      Matcher m = LOG_PATH_FORMAT_RE.matcher(filePath);
502      if (m.matches()) {
503        serverType = ServerType.valueOf(m.group(1));
504        serverPort = Integer.valueOf(m.group(2));
505      } else {
506        LOG.error("Unrecognized log path format: " + filePath);
507        return;
508      }
509      final String logMsgPrefix =
510          "[" + serverType + (serverPort != 0 ? ":" + serverPort : "") + "] ";
511
512      LOG.debug("Tailing " + filePath);
513      Thread t = new Thread(new Runnable() {
514        @Override
515        public void run() {
516          try {
517            FileInputStream fis = new FileInputStream(filePath);
518            BufferedReader br = new BufferedReader(new InputStreamReader(fis));
519            String line;
520            while (true) {
521              try {
522                Thread.sleep(200);
523              } catch (InterruptedException e) {
524                LOG.error("Tailer for " + filePath + " interrupted");
525                break;
526              }
527              while ((line = br.readLine()) != null) {
528                line = logMsgPrefix + processLine(line);
529                synchronized (printLock) {
530                  if (line.endsWith("\n")) {
531                    dest.print(line);
532                  } else {
533                    dest.println(line);
534                  }
535                  dest.flush();
536                }
537              }
538            }
539          } catch (IOException ex) {
540            LOG.error("Failed tailing " + filePath, ex);
541          }
542        }
543      });
544      t.setDaemon(true);
545      t.setName("Tailer for " + filePath);
546      t.start();
547    }
548
549  }
550
551  private void startDaemonLogTailer() {
552    logTailerThread = new Thread(new LocalDaemonLogTailer());
553    logTailerThread.setDaemon(true);
554    logTailerThread.start();
555  }
556
557}
558