001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import java.io.BufferedReader;
021import java.io.BufferedWriter;
022import java.io.File;
023import java.io.FileInputStream;
024import java.io.FileNotFoundException;
025import java.io.FileWriter;
026import java.io.FilenameFilter;
027import java.io.IOException;
028import java.io.InputStreamReader;
029import java.io.PrintStream;
030import java.util.ArrayList;
031import java.util.Collections;
032import java.util.HashMap;
033import java.util.HashSet;
034import java.util.List;
035import java.util.Map;
036import java.util.Scanner;
037import java.util.Set;
038import java.util.TreeMap;
039import java.util.regex.Matcher;
040import java.util.regex.Pattern;
041import org.apache.commons.io.FileUtils;
042import org.apache.hadoop.conf.Configuration;
043import org.apache.hadoop.hbase.HBaseTestingUtility;
044import org.apache.hadoop.hbase.HConstants;
045import org.apache.hadoop.hbase.MiniHBaseCluster;
046import org.apache.hadoop.hbase.TableName;
047import org.apache.hadoop.hbase.testclassification.LargeTests;
048import org.apache.hadoop.hbase.testclassification.MiscTests;
049import org.apache.hadoop.hbase.zookeeper.ZKUtil;
050import org.apache.hadoop.hdfs.MiniDFSCluster;
051import org.junit.experimental.categories.Category;
052import org.slf4j.Logger;
053import org.slf4j.LoggerFactory;
054
055/**
056 * A helper class for process-based mini-cluster tests. Unlike {@link MiniHBaseCluster}, starts
057 * daemons as separate processes, allowing to do real kill testing.
058 */
059@Category({ MiscTests.class, LargeTests.class })
060public class ProcessBasedLocalHBaseCluster {
061
062  private final String hbaseHome, workDir;
063  private final Configuration conf;
064  private final int numMasters, numRegionServers, numDataNodes;
065  private final List<Integer> rsPorts, masterPorts;
066
067  private final int zkClientPort;
068
069  private static final int MAX_FILE_SIZE_OVERRIDE = 10 * 1000 * 1000;
070
071  private static final Logger LOG = LoggerFactory.getLogger(ProcessBasedLocalHBaseCluster.class);
072
073  private List<String> daemonPidFiles = Collections.synchronizedList(new ArrayList<String>());;
074
075  private boolean shutdownHookInstalled;
076
077  private String hbaseDaemonScript;
078
079  private MiniDFSCluster dfsCluster;
080
081  private HBaseTestingUtility testUtil;
082
083  private Thread logTailerThread;
084
085  private List<String> logTailDirs = Collections.synchronizedList(new ArrayList<String>());
086
087  private static enum ServerType {
088    MASTER("master"),
089    RS("regionserver"),
090    ZK("zookeeper");
091
092    private final String fullName;
093
094    private ServerType(String fullName) {
095      this.fullName = fullName;
096    }
097  }
098
099  /**
100   * Constructor. Modifies the passed configuration.
101   * @param conf             the {@link Configuration} to use
102   * @param numDataNodes     the number of data nodes
103   * @param numRegionServers the number of region servers
104   */
105  public ProcessBasedLocalHBaseCluster(Configuration conf, int numDataNodes, int numRegionServers) {
106    this.conf = conf;
107    this.hbaseHome = HBaseHomePath.getHomePath();
108    this.numMasters = 1;
109    this.numRegionServers = numRegionServers;
110    this.workDir = hbaseHome + "/target/local_cluster";
111    this.numDataNodes = numDataNodes;
112
113    hbaseDaemonScript = hbaseHome + "/bin/hbase-daemon.sh";
114    zkClientPort = HBaseTestingUtility.randomFreePort();
115
116    this.rsPorts = sortedPorts(numRegionServers);
117    this.masterPorts = sortedPorts(numMasters);
118
119    conf.set(HConstants.ZOOKEEPER_QUORUM, HConstants.LOCALHOST);
120    conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zkClientPort);
121  }
122
123  /**
124   * Makes this local HBase cluster use a mini-DFS cluster. Must be called before
125   * {@link #startHBase()}. n
126   */
127  public void startMiniDFS() throws Exception {
128    if (testUtil == null) {
129      testUtil = new HBaseTestingUtility(conf);
130    }
131    dfsCluster = testUtil.startMiniDFSCluster(numDataNodes);
132  }
133
134  /**
135   * Generates a list of random port numbers in the sorted order. A sorted order makes sense if we
136   * ever want to refer to these servers by their index in the returned array, e.g. server #0, #1,
137   * etc.
138   */
139  private static List<Integer> sortedPorts(int n) {
140    List<Integer> ports = new ArrayList<>(n);
141    for (int i = 0; i < n; ++i) {
142      ports.add(HBaseTestingUtility.randomFreePort());
143    }
144    Collections.sort(ports);
145    return ports;
146  }
147
148  public void startHBase() throws IOException {
149    startDaemonLogTailer();
150    cleanupOldState();
151
152    // start ZK
153    LOG.info("Starting ZooKeeper on port " + zkClientPort);
154    startZK();
155
156    HBaseTestingUtility.waitForHostPort(HConstants.LOCALHOST, zkClientPort);
157
158    for (int masterPort : masterPorts) {
159      startMaster(masterPort);
160    }
161
162    ZKUtil.waitForBaseZNode(conf);
163
164    for (int rsPort : rsPorts) {
165      startRegionServer(rsPort);
166    }
167
168    LOG.info("Waiting for HBase startup by scanning META");
169    int attemptsLeft = 10;
170    while (attemptsLeft-- > 0) {
171      try {
172        testUtil.getConnection().getTable(TableName.META_TABLE_NAME);
173      } catch (Exception e) {
174        LOG.info("Waiting for HBase to startup. Retries left: " + attemptsLeft, e);
175        Threads.sleep(1000);
176      }
177    }
178
179    LOG.info("Process-based HBase Cluster with " + numRegionServers
180      + " region servers up and running... \n\n");
181  }
182
183  public void startRegionServer(int port) {
184    startServer(ServerType.RS, port);
185  }
186
187  public void startMaster(int port) {
188    startServer(ServerType.MASTER, port);
189  }
190
191  public void killRegionServer(int port) throws IOException {
192    killServer(ServerType.RS, port);
193  }
194
195  public void killMaster() throws IOException {
196    killServer(ServerType.MASTER, 0);
197  }
198
199  public void startZK() {
200    startServer(ServerType.ZK, 0);
201  }
202
203  private void executeCommand(String command) {
204    executeCommand(command, null);
205  }
206
207  private void executeCommand(String command, Map<String, String> envOverrides) {
208    ensureShutdownHookInstalled();
209    LOG.debug("Command : " + command);
210
211    try {
212      String[] envp = null;
213      if (envOverrides != null) {
214        Map<String, String> map = new HashMap<>(System.getenv());
215        map.putAll(envOverrides);
216        envp = new String[map.size()];
217        int idx = 0;
218        for (Map.Entry<String, String> e : map.entrySet()) {
219          envp[idx++] = e.getKey() + "=" + e.getValue();
220        }
221      }
222
223      Process p = Runtime.getRuntime().exec(command, envp);
224
225      BufferedReader stdInput = new BufferedReader(new InputStreamReader(p.getInputStream()));
226      BufferedReader stdError = new BufferedReader(new InputStreamReader(p.getErrorStream()));
227
228      // read the output from the command
229      String s = null;
230      while ((s = stdInput.readLine()) != null) {
231        System.out.println(s);
232      }
233
234      // read any errors from the attempted command
235      while ((s = stdError.readLine()) != null) {
236        System.out.println(s);
237      }
238    } catch (IOException e) {
239      LOG.error("Error running: " + command, e);
240    }
241  }
242
243  private void shutdownAllProcesses() {
244    LOG.info("Killing daemons using pid files");
245    final List<String> pidFiles = new ArrayList<>(daemonPidFiles);
246    for (String pidFile : pidFiles) {
247      int pid = 0;
248      try {
249        pid = readPidFromFile(pidFile);
250      } catch (IOException ex) {
251        LOG.error("Could not read pid from file " + pidFile);
252      }
253
254      if (pid > 0) {
255        LOG.info("Killing pid " + pid + " (" + pidFile + ")");
256        killProcess(pid);
257      }
258    }
259  }
260
261  private void ensureShutdownHookInstalled() {
262    if (shutdownHookInstalled) {
263      return;
264    }
265
266    Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
267      @Override
268      public void run() {
269        shutdownAllProcesses();
270      }
271    }));
272
273    shutdownHookInstalled = true;
274  }
275
276  private void cleanupOldState() {
277    executeCommand("rm -rf " + workDir);
278  }
279
280  private void writeStringToFile(String s, String fileName) {
281    try {
282      BufferedWriter out = new BufferedWriter(new FileWriter(fileName));
283      out.write(s);
284      out.close();
285    } catch (IOException e) {
286      LOG.error("Error writing to: " + fileName, e);
287    }
288  }
289
290  private String serverWorkingDir(ServerType serverType, int port) {
291    return workDir + "/" + serverType + "-" + port;
292  }
293
294  private int getServerPID(ServerType serverType, int port) throws IOException {
295    String pidFile = pidFilePath(serverType, port);
296    return readPidFromFile(pidFile);
297  }
298
299  private static int readPidFromFile(String pidFile) throws IOException {
300    Scanner scanner = new Scanner(new File(pidFile));
301    try {
302      return scanner.nextInt();
303    } finally {
304      scanner.close();
305    }
306  }
307
308  private String pidFilePath(ServerType serverType, int port) {
309    String dir = serverWorkingDir(serverType, port);
310    String user = System.getenv("USER");
311    String pidFile = String.format("%s/hbase-%s-%s.pid", dir, user, serverType.fullName);
312    return pidFile;
313  }
314
315  private void killServer(ServerType serverType, int port) throws IOException {
316    int pid = getServerPID(serverType, port);
317    if (pid > 0) {
318      LOG.info("Killing " + serverType + "; pid=" + pid);
319      killProcess(pid);
320    }
321  }
322
323  private void killProcess(int pid) {
324    String cmd = "kill -s KILL " + pid;
325    executeCommand(cmd);
326  }
327
328  private void startServer(ServerType serverType, int rsPort) {
329    // create working directory for this region server.
330    String dir = serverWorkingDir(serverType, rsPort);
331    String confStr = generateConfig(serverType, rsPort, dir);
332    LOG.debug("Creating directory " + dir);
333    new File(dir).mkdirs();
334
335    writeStringToFile(confStr, dir + "/hbase-site.xml");
336
337    // Set debug options to an empty string so that hbase-config.sh does not configure them
338    // using default ports. If we want to run remote debugging on process-based local cluster's
339    // daemons, we can automatically choose non-conflicting JDWP and JMX ports for each daemon
340    // and specify them here.
341    writeStringToFile("unset HBASE_MASTER_OPTS\n" + "unset HBASE_REGIONSERVER_OPTS\n"
342      + "unset HBASE_ZOOKEEPER_OPTS\n" + "HBASE_MASTER_DBG_OPTS=' '\n"
343      + "HBASE_REGIONSERVER_DBG_OPTS=' '\n" + "HBASE_ZOOKEEPER_DBG_OPTS=' '\n"
344      + "HBASE_MASTER_JMX_OPTS=' '\n" + "HBASE_REGIONSERVER_JMX_OPTS=' '\n"
345      + "HBASE_ZOOKEEPER_JMX_OPTS=' '\n", dir + "/hbase-env.sh");
346
347    Map<String, String> envOverrides = new HashMap<>();
348    envOverrides.put("HBASE_LOG_DIR", dir);
349    envOverrides.put("HBASE_PID_DIR", dir);
350    try {
351      FileUtils.copyFile(new File(hbaseHome, "conf/log4j.properties"),
352        new File(dir, "log4j.properties"));
353    } catch (IOException ex) {
354      LOG.error("Could not install log4j.properties into " + dir);
355    }
356
357    executeCommand(hbaseDaemonScript + " --config " + dir + " start " + serverType.fullName,
358      envOverrides);
359    daemonPidFiles.add(pidFilePath(serverType, rsPort));
360    logTailDirs.add(dir);
361  }
362
363  private final String generateConfig(ServerType serverType, int rpcPort, String daemonDir) {
364    StringBuilder sb = new StringBuilder();
365    Map<String, Object> confMap = new TreeMap<>();
366    confMap.put(HConstants.CLUSTER_DISTRIBUTED, true);
367
368    if (serverType == ServerType.MASTER) {
369      confMap.put(HConstants.MASTER_PORT, rpcPort);
370
371      int masterInfoPort = HBaseTestingUtility.randomFreePort();
372      reportWebUIPort("master", masterInfoPort);
373      confMap.put(HConstants.MASTER_INFO_PORT, masterInfoPort);
374    } else if (serverType == ServerType.RS) {
375      confMap.put(HConstants.REGIONSERVER_PORT, rpcPort);
376
377      int rsInfoPort = HBaseTestingUtility.randomFreePort();
378      reportWebUIPort("region server", rsInfoPort);
379      confMap.put(HConstants.REGIONSERVER_INFO_PORT, rsInfoPort);
380    } else {
381      confMap.put(HConstants.ZOOKEEPER_DATA_DIR, daemonDir);
382    }
383
384    confMap.put(HConstants.ZOOKEEPER_CLIENT_PORT, zkClientPort);
385    confMap.put(HConstants.HREGION_MAX_FILESIZE, MAX_FILE_SIZE_OVERRIDE);
386
387    if (dfsCluster != null) {
388      String fsURL = "hdfs://" + HConstants.LOCALHOST + ":" + dfsCluster.getNameNodePort();
389      confMap.put("fs.defaultFS", fsURL);
390      confMap.put("hbase.rootdir", fsURL + "/hbase_test");
391    }
392
393    sb.append("<configuration>\n");
394    for (Map.Entry<String, Object> entry : confMap.entrySet()) {
395      sb.append("  <property>\n");
396      sb.append("    <name>" + entry.getKey() + "</name>\n");
397      sb.append("    <value>" + entry.getValue() + "</value>\n");
398      sb.append("  </property>\n");
399    }
400    sb.append("</configuration>\n");
401    return sb.toString();
402  }
403
404  private static void reportWebUIPort(String daemon, int port) {
405    LOG.info("Local " + daemon + " web UI is at http://" + HConstants.LOCALHOST + ":" + port);
406  }
407
408  public Configuration getConf() {
409    return conf;
410  }
411
412  public void shutdown() {
413    if (dfsCluster != null) {
414      dfsCluster.shutdown();
415    }
416    shutdownAllProcesses();
417  }
418
419  private static final Pattern TO_REMOVE_FROM_LOG_LINES_RE =
420    Pattern.compile("org\\.apache\\.hadoop\\.hbase\\.");
421
422  private static final Pattern LOG_PATH_FORMAT_RE = Pattern.compile("^.*/([A-Z]+)-(\\d+)/[^/]+$");
423
424  private static String processLine(String line) {
425    Matcher m = TO_REMOVE_FROM_LOG_LINES_RE.matcher(line);
426    return m.replaceAll("");
427  }
428
429  private final class LocalDaemonLogTailer implements Runnable {
430    private final Set<String> tailedFiles = new HashSet<>();
431    private final List<String> dirList = new ArrayList<>();
432    private final Object printLock = new Object();
433
434    private final FilenameFilter LOG_FILES = new FilenameFilter() {
435      @Override
436      public boolean accept(File dir, String name) {
437        return name.endsWith(".out") || name.endsWith(".log");
438      }
439    };
440
441    @Override
442    public void run() {
443      try {
444        runInternal();
445      } catch (IOException ex) {
446        LOG.error(ex.toString(), ex);
447      }
448    }
449
450    private void runInternal() throws IOException {
451      Thread.currentThread().setName(getClass().getSimpleName());
452      while (true) {
453        scanDirs();
454        try {
455          Thread.sleep(500);
456        } catch (InterruptedException e) {
457          LOG.error("Log tailer thread interrupted", e);
458          break;
459        }
460      }
461    }
462
463    private void scanDirs() throws FileNotFoundException {
464      dirList.clear();
465      dirList.addAll(logTailDirs);
466      for (String d : dirList) {
467        for (File f : new File(d).listFiles(LOG_FILES)) {
468          String filePath = f.getAbsolutePath();
469          if (!tailedFiles.contains(filePath)) {
470            tailedFiles.add(filePath);
471            startTailingFile(filePath);
472          }
473        }
474      }
475    }
476
477    private void startTailingFile(final String filePath) throws FileNotFoundException {
478      final PrintStream dest = filePath.endsWith(".log") ? System.err : System.out;
479      final ServerType serverType;
480      final int serverPort;
481      Matcher m = LOG_PATH_FORMAT_RE.matcher(filePath);
482      if (m.matches()) {
483        serverType = ServerType.valueOf(m.group(1));
484        serverPort = Integer.valueOf(m.group(2));
485      } else {
486        LOG.error("Unrecognized log path format: " + filePath);
487        return;
488      }
489      final String logMsgPrefix =
490        "[" + serverType + (serverPort != 0 ? ":" + serverPort : "") + "] ";
491
492      LOG.debug("Tailing " + filePath);
493      Thread t = new Thread(new Runnable() {
494        @Override
495        public void run() {
496          try {
497            FileInputStream fis = new FileInputStream(filePath);
498            BufferedReader br = new BufferedReader(new InputStreamReader(fis));
499            String line;
500            while (true) {
501              try {
502                Thread.sleep(200);
503              } catch (InterruptedException e) {
504                LOG.error("Tailer for " + filePath + " interrupted");
505                break;
506              }
507              while ((line = br.readLine()) != null) {
508                line = logMsgPrefix + processLine(line);
509                synchronized (printLock) {
510                  if (line.endsWith("\n")) {
511                    dest.print(line);
512                  } else {
513                    dest.println(line);
514                  }
515                  dest.flush();
516                }
517              }
518            }
519          } catch (IOException ex) {
520            LOG.error("Failed tailing " + filePath, ex);
521          }
522        }
523      });
524      t.setDaemon(true);
525      t.setName("Tailer for " + filePath);
526      t.start();
527    }
528
529  }
530
531  private void startDaemonLogTailer() {
532    logTailerThread = new Thread(new LocalDaemonLogTailer());
533    logTailerThread.setDaemon(true);
534    logTailerThread.start();
535  }
536
537}