001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with this
004 * work for additional information regarding copyright ownership. The ASF
005 * licenses this file to you under the Apache License, Version 2.0 (the
006 * "License"); you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
013 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
014 * License for the specific language governing permissions and limitations
015 * under the License.
016 */
017package org.apache.hadoop.hbase.util;
018
019import java.io.BufferedReader;
020import java.io.BufferedWriter;
021import java.io.File;
022import java.io.FileInputStream;
023import java.io.FileNotFoundException;
024import java.io.FileWriter;
025import java.io.FilenameFilter;
026import java.io.IOException;
027import java.io.InputStreamReader;
028import java.io.PrintStream;
029import java.util.ArrayList;
030import java.util.Collections;
031import java.util.HashMap;
032import java.util.HashSet;
033import java.util.List;
034import java.util.Map;
035import java.util.Scanner;
036import java.util.Set;
037import java.util.TreeMap;
038import java.util.regex.Matcher;
039import java.util.regex.Pattern;
040import org.apache.commons.io.FileUtils;
041import org.apache.hadoop.conf.Configuration;
042import org.apache.hadoop.hbase.HBaseTestingUtil;
043import org.apache.hadoop.hbase.HConstants;
044import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
045import org.apache.hadoop.hbase.TableName;
046import org.apache.hadoop.hbase.zookeeper.ZKUtil;
047import org.apache.hadoop.hdfs.MiniDFSCluster;
048import org.slf4j.Logger;
049import org.slf4j.LoggerFactory;
050
051/**
052 * A helper class for process-based mini-cluster tests. Unlike
053 * {@link SingleProcessHBaseCluster}, starts daemons as separate processes, allowing to
054 * do real kill testing.
055 */
056public class ProcessBasedLocalHBaseCluster {
057
058  private final String hbaseHome, workDir;
059  private final Configuration conf;
060  private final int numMasters, numRegionServers, numDataNodes;
061  private final List<Integer> rsPorts, masterPorts;
062
063  private final int zkClientPort;
064
065  private static final int MAX_FILE_SIZE_OVERRIDE = 10 * 1000 * 1000;
066
067  private static final Logger LOG = LoggerFactory.getLogger(
068      ProcessBasedLocalHBaseCluster.class);
069
070  private List<String> daemonPidFiles =
071      Collections.synchronizedList(new ArrayList<String>());
072
073  private boolean shutdownHookInstalled;
074
075  private String hbaseDaemonScript;
076
077  private MiniDFSCluster dfsCluster;
078
079  private HBaseTestingUtil testUtil;
080
081  private Thread logTailerThread;
082
083  private List<String> logTailDirs = Collections.synchronizedList(new ArrayList<String>());
084
085  private static enum ServerType {
086    MASTER("master"),
087    RS("regionserver"),
088    ZK("zookeeper");
089
090    private final String fullName;
091
092    private ServerType(String fullName) {
093      this.fullName = fullName;
094    }
095  }
096
097  /**
098   * Constructor. Modifies the passed configuration.
099   * @param conf the {@link Configuration} to use
100   * @param numDataNodes the number of data nodes
101   * @param numRegionServers the number of region servers
102   */
103  public ProcessBasedLocalHBaseCluster(Configuration conf,
104      int numDataNodes, int numRegionServers) {
105    this.conf = conf;
106    this.hbaseHome = HBaseHomePath.getHomePath();
107    this.numMasters = 1;
108    this.numRegionServers = numRegionServers;
109    this.workDir = hbaseHome + "/target/local_cluster";
110    this.numDataNodes = numDataNodes;
111
112    hbaseDaemonScript = hbaseHome + "/bin/hbase-daemon.sh";
113    zkClientPort = HBaseTestingUtil.randomFreePort();
114
115    this.rsPorts = sortedPorts(numRegionServers);
116    this.masterPorts = sortedPorts(numMasters);
117
118    conf.set(HConstants.ZOOKEEPER_QUORUM, HConstants.LOCALHOST);
119    conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zkClientPort);
120  }
121
122  /**
123   * Makes this local HBase cluster use a mini-DFS cluster. Must be called before
124   * {@link #startHBase()}.
125   * @throws IOException
126   */
127  public void startMiniDFS() throws Exception {
128    if (testUtil == null) {
129      testUtil = new HBaseTestingUtil(conf);
130    }
131    dfsCluster = testUtil.startMiniDFSCluster(numDataNodes);
132  }
133
134  /**
135   * Generates a list of random port numbers in the sorted order. A sorted
136   * order makes sense if we ever want to refer to these servers by their index
137   * in the returned array, e.g. server #0, #1, etc.
138   */
139  private static List<Integer> sortedPorts(int n) {
140    List<Integer> ports = new ArrayList<>(n);
141    for (int i = 0; i < n; ++i) {
142      ports.add(HBaseTestingUtil.randomFreePort());
143    }
144    Collections.sort(ports);
145    return ports;
146  }
147
148  public void startHBase() throws IOException {
149    startDaemonLogTailer();
150    cleanupOldState();
151
152    // start ZK
153    LOG.info("Starting ZooKeeper on port " + zkClientPort);
154    startZK();
155
156    HBaseTestingUtil.waitForHostPort(HConstants.LOCALHOST, zkClientPort);
157
158    for (int masterPort : masterPorts) {
159      startMaster(masterPort);
160    }
161
162    ZKUtil.waitForBaseZNode(conf);
163
164    for (int rsPort : rsPorts) {
165      startRegionServer(rsPort);
166    }
167
168    LOG.info("Waiting for HBase startup by scanning META");
169    int attemptsLeft = 10;
170    while (attemptsLeft-- > 0) {
171      try {
172        testUtil.getConnection().getTable(TableName.META_TABLE_NAME);
173      } catch (Exception e) {
174        LOG.info("Waiting for HBase to startup. Retries left: " + attemptsLeft,
175            e);
176        Threads.sleep(1000);
177      }
178    }
179
180    LOG.info("Process-based HBase Cluster with " + numRegionServers +
181        " region servers up and running... \n\n");
182  }
183
184  public void startRegionServer(int port) {
185    startServer(ServerType.RS, port);
186  }
187
188  public void startMaster(int port) {
189    startServer(ServerType.MASTER, port);
190  }
191
192  public void killRegionServer(int port) throws IOException {
193    killServer(ServerType.RS, port);
194  }
195
196  public void killMaster() throws IOException {
197    killServer(ServerType.MASTER, 0);
198  }
199
200  public void startZK() {
201    startServer(ServerType.ZK, 0);
202  }
203
204  private void executeCommand(String command) {
205    executeCommand(command, null);
206  }
207
208  private void executeCommand(String command, Map<String,
209      String> envOverrides) {
210    ensureShutdownHookInstalled();
211    LOG.debug("Command : " + command);
212
213    try {
214      String [] envp = null;
215      if (envOverrides != null) {
216        Map<String, String> map = new HashMap<>(System.getenv());
217        map.putAll(envOverrides);
218        envp = new String[map.size()];
219        int idx = 0;
220        for (Map.Entry<String, String> e: map.entrySet()) {
221          envp[idx++] = e.getKey() + "=" + e.getValue();
222        }
223      }
224
225      Process p = Runtime.getRuntime().exec(command, envp);
226
227      BufferedReader stdInput = new BufferedReader(
228          new InputStreamReader(p.getInputStream()));
229      BufferedReader stdError = new BufferedReader(
230          new InputStreamReader(p.getErrorStream()));
231
232      // read the output from the command
233      String s = null;
234      while ((s = stdInput.readLine()) != null) {
235        System.out.println(s);
236      }
237
238      // read any errors from the attempted command
239      while ((s = stdError.readLine()) != null) {
240        System.out.println(s);
241      }
242    } catch (IOException e) {
243      LOG.error("Error running: " + command, e);
244    }
245  }
246
247  private void shutdownAllProcesses() {
248    LOG.info("Killing daemons using pid files");
249    final List<String> pidFiles = new ArrayList<>(daemonPidFiles);
250    for (String pidFile : pidFiles) {
251      int pid = 0;
252      try {
253        pid = readPidFromFile(pidFile);
254      } catch (IOException ex) {
255        LOG.error("Could not read pid from file " + pidFile);
256      }
257
258      if (pid > 0) {
259        LOG.info("Killing pid " + pid + " (" + pidFile + ")");
260        killProcess(pid);
261      }
262    }
263  }
264
265  private void ensureShutdownHookInstalled() {
266    if (shutdownHookInstalled) {
267      return;
268    }
269
270    Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
271      @Override
272      public void run() {
273        shutdownAllProcesses();
274      }
275    }));
276
277    shutdownHookInstalled = true;
278  }
279
280  private void cleanupOldState() {
281    executeCommand("rm -rf " + workDir);
282  }
283
284  private void writeStringToFile(String s, String fileName) {
285    try {
286      BufferedWriter out = new BufferedWriter(new FileWriter(fileName));
287      out.write(s);
288      out.close();
289    } catch (IOException e) {
290      LOG.error("Error writing to: " + fileName, e);
291    }
292  }
293
294  private String serverWorkingDir(ServerType serverType, int port) {
295    return workDir + "/" + serverType + "-" + port;
296  }
297
298  private int getServerPID(ServerType serverType, int port) throws IOException {
299    String pidFile = pidFilePath(serverType, port);
300    return readPidFromFile(pidFile);
301  }
302
303  private static int readPidFromFile(String pidFile) throws IOException {
304    Scanner scanner = new Scanner(new File(pidFile));
305    try {
306      return scanner.nextInt();
307    } finally {
308      scanner.close();
309    }
310  }
311
312  private String pidFilePath(ServerType serverType, int port) {
313    String dir = serverWorkingDir(serverType, port);
314    String user = System.getenv("USER");
315    String pidFile = String.format("%s/hbase-%s-%s.pid",
316                                   dir, user, serverType.fullName);
317    return pidFile;
318  }
319
320  private void killServer(ServerType serverType, int port) throws IOException {
321    int pid = getServerPID(serverType, port);
322    if (pid > 0) {
323      LOG.info("Killing " + serverType + "; pid=" + pid);
324      killProcess(pid);
325    }
326  }
327
328  private void killProcess(int pid) {
329    String cmd = "kill -s KILL " + pid;
330    executeCommand(cmd);
331  }
332
333  private void startServer(ServerType serverType, int rsPort) {
334    // create working directory for this region server.
335    String dir = serverWorkingDir(serverType, rsPort);
336    String confStr = generateConfig(serverType, rsPort, dir);
337    LOG.debug("Creating directory " + dir);
338    new File(dir).mkdirs();
339
340    writeStringToFile(confStr, dir + "/hbase-site.xml");
341
342    // Set debug options to an empty string so that hbase-config.sh does not configure them
343    // using default ports. If we want to run remote debugging on process-based local cluster's
344    // daemons, we can automatically choose non-conflicting JDWP and JMX ports for each daemon
345    // and specify them here.
346    writeStringToFile(
347        "unset HBASE_MASTER_OPTS\n" +
348        "unset HBASE_REGIONSERVER_OPTS\n" +
349        "unset HBASE_ZOOKEEPER_OPTS\n" +
350        "HBASE_MASTER_DBG_OPTS=' '\n" +
351        "HBASE_REGIONSERVER_DBG_OPTS=' '\n" +
352        "HBASE_ZOOKEEPER_DBG_OPTS=' '\n" +
353        "HBASE_MASTER_JMX_OPTS=' '\n" +
354        "HBASE_REGIONSERVER_JMX_OPTS=' '\n" +
355        "HBASE_ZOOKEEPER_JMX_OPTS=' '\n",
356        dir + "/hbase-env.sh");
357
358    Map<String, String> envOverrides = new HashMap<>();
359    envOverrides.put("HBASE_LOG_DIR", dir);
360    envOverrides.put("HBASE_PID_DIR", dir);
361    try {
362      FileUtils.copyFile(
363          new File(hbaseHome, "conf/log4j.properties"),
364          new File(dir, "log4j.properties"));
365    } catch (IOException ex) {
366      LOG.error("Could not install log4j.properties into " + dir);
367    }
368
369    executeCommand(hbaseDaemonScript + " --config " + dir +
370                   " start " + serverType.fullName, envOverrides);
371    daemonPidFiles.add(pidFilePath(serverType, rsPort));
372    logTailDirs.add(dir);
373  }
374
375  private final String generateConfig(ServerType serverType, int rpcPort,
376      String daemonDir) {
377    StringBuilder sb = new StringBuilder();
378    Map<String, Object> confMap = new TreeMap<>();
379    confMap.put(HConstants.CLUSTER_DISTRIBUTED, true);
380
381    if (serverType == ServerType.MASTER) {
382      confMap.put(HConstants.MASTER_PORT, rpcPort);
383
384      int masterInfoPort = HBaseTestingUtil.randomFreePort();
385      reportWebUIPort("master", masterInfoPort);
386      confMap.put(HConstants.MASTER_INFO_PORT, masterInfoPort);
387    } else if (serverType == ServerType.RS) {
388      confMap.put(HConstants.REGIONSERVER_PORT, rpcPort);
389
390      int rsInfoPort = HBaseTestingUtil.randomFreePort();
391      reportWebUIPort("region server", rsInfoPort);
392      confMap.put(HConstants.REGIONSERVER_INFO_PORT, rsInfoPort);
393    } else {
394      confMap.put(HConstants.ZOOKEEPER_DATA_DIR, daemonDir);
395    }
396
397    confMap.put(HConstants.ZOOKEEPER_CLIENT_PORT, zkClientPort);
398    confMap.put(HConstants.HREGION_MAX_FILESIZE, MAX_FILE_SIZE_OVERRIDE);
399
400    if (dfsCluster != null) {
401      String fsURL = "hdfs://" + HConstants.LOCALHOST + ":" + dfsCluster.getNameNodePort();
402      confMap.put("fs.defaultFS", fsURL);
403      confMap.put("hbase.rootdir", fsURL + "/hbase_test");
404    }
405
406    sb.append("<configuration>\n");
407    for (Map.Entry<String, Object> entry : confMap.entrySet()) {
408      sb.append("  <property>\n");
409      sb.append("    <name>" + entry.getKey() + "</name>\n");
410      sb.append("    <value>" + entry.getValue() + "</value>\n");
411      sb.append("  </property>\n");
412    }
413    sb.append("</configuration>\n");
414    return sb.toString();
415  }
416
417  private static void reportWebUIPort(String daemon, int port) {
418    LOG.info("Local " + daemon + " web UI is at http://"
419        + HConstants.LOCALHOST + ":" + port);
420  }
421
422  public Configuration getConf() {
423    return conf;
424  }
425
426  public void shutdown() {
427    if (dfsCluster != null) {
428      dfsCluster.shutdown();
429    }
430    shutdownAllProcesses();
431  }
432
433  private static final Pattern TO_REMOVE_FROM_LOG_LINES_RE =
434      Pattern.compile("org\\.apache\\.hadoop\\.hbase\\.");
435
436  private static final Pattern LOG_PATH_FORMAT_RE =
437      Pattern.compile("^.*/([A-Z]+)-(\\d+)/[^/]+$");
438
439  private static String processLine(String line) {
440    Matcher m = TO_REMOVE_FROM_LOG_LINES_RE.matcher(line);
441    return m.replaceAll("");
442  }
443
444  private final class LocalDaemonLogTailer implements Runnable {
445    private final Set<String> tailedFiles = new HashSet<>();
446    private final List<String> dirList = new ArrayList<>();
447    private final Object printLock = new Object();
448
449    private final FilenameFilter LOG_FILES = new FilenameFilter() {
450      @Override
451      public boolean accept(File dir, String name) {
452        return name.endsWith(".out") || name.endsWith(".log");
453      }
454    };
455
456    @Override
457    public void run() {
458      try {
459        runInternal();
460      } catch (IOException ex) {
461        LOG.error(ex.toString(), ex);
462      }
463    }
464
465    private void runInternal() throws IOException {
466      Thread.currentThread().setName(getClass().getSimpleName());
467      while (true) {
468        scanDirs();
469        try {
470          Thread.sleep(500);
471        } catch (InterruptedException e) {
472          LOG.error("Log tailer thread interrupted", e);
473          break;
474        }
475      }
476    }
477
478    private void scanDirs() throws FileNotFoundException {
479      dirList.clear();
480      dirList.addAll(logTailDirs);
481      for (String d : dirList) {
482        for (File f : new File(d).listFiles(LOG_FILES)) {
483          String filePath = f.getAbsolutePath();
484          if (!tailedFiles.contains(filePath)) {
485            tailedFiles.add(filePath);
486            startTailingFile(filePath);
487          }
488        }
489      }
490    }
491
492    private void startTailingFile(final String filePath) throws FileNotFoundException {
493      final PrintStream dest = filePath.endsWith(".log") ? System.err : System.out;
494      final ServerType serverType;
495      final int serverPort;
496      Matcher m = LOG_PATH_FORMAT_RE.matcher(filePath);
497      if (m.matches()) {
498        serverType = ServerType.valueOf(m.group(1));
499        serverPort = Integer.valueOf(m.group(2));
500      } else {
501        LOG.error("Unrecognized log path format: " + filePath);
502        return;
503      }
504      final String logMsgPrefix =
505          "[" + serverType + (serverPort != 0 ? ":" + serverPort : "") + "] ";
506
507      LOG.debug("Tailing " + filePath);
508      Thread t = new Thread(new Runnable() {
509        @Override
510        public void run() {
511          try {
512            FileInputStream fis = new FileInputStream(filePath);
513            BufferedReader br = new BufferedReader(new InputStreamReader(fis));
514            String line;
515            while (true) {
516              try {
517                Thread.sleep(200);
518              } catch (InterruptedException e) {
519                LOG.error("Tailer for " + filePath + " interrupted");
520                break;
521              }
522              while ((line = br.readLine()) != null) {
523                line = logMsgPrefix + processLine(line);
524                synchronized (printLock) {
525                  if (line.endsWith("\n")) {
526                    dest.print(line);
527                  } else {
528                    dest.println(line);
529                  }
530                  dest.flush();
531                }
532              }
533            }
534          } catch (IOException ex) {
535            LOG.error("Failed tailing " + filePath, ex);
536          }
537        }
538      });
539      t.setDaemon(true);
540      t.setName("Tailer for " + filePath);
541      t.start();
542    }
543
544  }
545
546  private void startDaemonLogTailer() {
547    logTailerThread = new Thread(new LocalDaemonLogTailer());
548    logTailerThread.setDaemon(true);
549    logTailerThread.start();
550  }
551
552}
553