001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util; 019 020import java.io.BufferedReader; 021import java.io.BufferedWriter; 022import java.io.File; 023import java.io.FileInputStream; 024import java.io.FileNotFoundException; 025import java.io.FileWriter; 026import java.io.FilenameFilter; 027import java.io.IOException; 028import java.io.InputStreamReader; 029import java.io.PrintStream; 030import java.util.ArrayList; 031import java.util.Collections; 032import java.util.HashMap; 033import java.util.HashSet; 034import java.util.List; 035import java.util.Map; 036import java.util.Scanner; 037import java.util.Set; 038import java.util.TreeMap; 039import java.util.regex.Matcher; 040import java.util.regex.Pattern; 041import org.apache.commons.io.FileUtils; 042import org.apache.hadoop.conf.Configuration; 043import org.apache.hadoop.hbase.HBaseTestingUtility; 044import org.apache.hadoop.hbase.HConstants; 045import org.apache.hadoop.hbase.MiniHBaseCluster; 046import org.apache.hadoop.hbase.TableName; 047import org.apache.hadoop.hbase.testclassification.LargeTests; 048import org.apache.hadoop.hbase.testclassification.MiscTests; 049import org.apache.hadoop.hbase.zookeeper.ZKUtil; 050import org.apache.hadoop.hdfs.MiniDFSCluster; 051import org.junit.experimental.categories.Category; 052import org.slf4j.Logger; 053import org.slf4j.LoggerFactory; 054 055/** 056 * A helper class for process-based mini-cluster tests. Unlike {@link MiniHBaseCluster}, starts 057 * daemons as separate processes, allowing to do real kill testing. 058 */ 059@Category({ MiscTests.class, LargeTests.class }) 060public class ProcessBasedLocalHBaseCluster { 061 062 private final String hbaseHome, workDir; 063 private final Configuration conf; 064 private final int numMasters, numRegionServers, numDataNodes; 065 private final List<Integer> rsPorts, masterPorts; 066 067 private final int zkClientPort; 068 069 private static final int MAX_FILE_SIZE_OVERRIDE = 10 * 1000 * 1000; 070 071 private static final Logger LOG = LoggerFactory.getLogger(ProcessBasedLocalHBaseCluster.class); 072 073 private List<String> daemonPidFiles = Collections.synchronizedList(new ArrayList<String>());; 074 075 private boolean shutdownHookInstalled; 076 077 private String hbaseDaemonScript; 078 079 private MiniDFSCluster dfsCluster; 080 081 private HBaseTestingUtility testUtil; 082 083 private Thread logTailerThread; 084 085 private List<String> logTailDirs = Collections.synchronizedList(new ArrayList<String>()); 086 087 private static enum ServerType { 088 MASTER("master"), 089 RS("regionserver"), 090 ZK("zookeeper"); 091 092 private final String fullName; 093 094 private ServerType(String fullName) { 095 this.fullName = fullName; 096 } 097 } 098 099 /** 100 * Constructor. Modifies the passed configuration. 101 * @param conf the {@link Configuration} to use 102 * @param numDataNodes the number of data nodes 103 * @param numRegionServers the number of region servers 104 */ 105 public ProcessBasedLocalHBaseCluster(Configuration conf, int numDataNodes, int numRegionServers) { 106 this.conf = conf; 107 this.hbaseHome = HBaseHomePath.getHomePath(); 108 this.numMasters = 1; 109 this.numRegionServers = numRegionServers; 110 this.workDir = hbaseHome + "/target/local_cluster"; 111 this.numDataNodes = numDataNodes; 112 113 hbaseDaemonScript = hbaseHome + "/bin/hbase-daemon.sh"; 114 zkClientPort = HBaseTestingUtility.randomFreePort(); 115 116 this.rsPorts = sortedPorts(numRegionServers); 117 this.masterPorts = sortedPorts(numMasters); 118 119 conf.set(HConstants.ZOOKEEPER_QUORUM, HConstants.LOCALHOST); 120 conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zkClientPort); 121 } 122 123 /** 124 * Makes this local HBase cluster use a mini-DFS cluster. Must be called before 125 * {@link #startHBase()}. n 126 */ 127 public void startMiniDFS() throws Exception { 128 if (testUtil == null) { 129 testUtil = new HBaseTestingUtility(conf); 130 } 131 dfsCluster = testUtil.startMiniDFSCluster(numDataNodes); 132 } 133 134 /** 135 * Generates a list of random port numbers in the sorted order. A sorted order makes sense if we 136 * ever want to refer to these servers by their index in the returned array, e.g. server #0, #1, 137 * etc. 138 */ 139 private static List<Integer> sortedPorts(int n) { 140 List<Integer> ports = new ArrayList<>(n); 141 for (int i = 0; i < n; ++i) { 142 ports.add(HBaseTestingUtility.randomFreePort()); 143 } 144 Collections.sort(ports); 145 return ports; 146 } 147 148 public void startHBase() throws IOException { 149 startDaemonLogTailer(); 150 cleanupOldState(); 151 152 // start ZK 153 LOG.info("Starting ZooKeeper on port " + zkClientPort); 154 startZK(); 155 156 HBaseTestingUtility.waitForHostPort(HConstants.LOCALHOST, zkClientPort); 157 158 for (int masterPort : masterPorts) { 159 startMaster(masterPort); 160 } 161 162 ZKUtil.waitForBaseZNode(conf); 163 164 for (int rsPort : rsPorts) { 165 startRegionServer(rsPort); 166 } 167 168 LOG.info("Waiting for HBase startup by scanning META"); 169 int attemptsLeft = 10; 170 while (attemptsLeft-- > 0) { 171 try { 172 testUtil.getConnection().getTable(TableName.META_TABLE_NAME); 173 } catch (Exception e) { 174 LOG.info("Waiting for HBase to startup. Retries left: " + attemptsLeft, e); 175 Threads.sleep(1000); 176 } 177 } 178 179 LOG.info("Process-based HBase Cluster with " + numRegionServers 180 + " region servers up and running... \n\n"); 181 } 182 183 public void startRegionServer(int port) { 184 startServer(ServerType.RS, port); 185 } 186 187 public void startMaster(int port) { 188 startServer(ServerType.MASTER, port); 189 } 190 191 public void killRegionServer(int port) throws IOException { 192 killServer(ServerType.RS, port); 193 } 194 195 public void killMaster() throws IOException { 196 killServer(ServerType.MASTER, 0); 197 } 198 199 public void startZK() { 200 startServer(ServerType.ZK, 0); 201 } 202 203 private void executeCommand(String command) { 204 executeCommand(command, null); 205 } 206 207 private void executeCommand(String command, Map<String, String> envOverrides) { 208 ensureShutdownHookInstalled(); 209 LOG.debug("Command : " + command); 210 211 try { 212 String[] envp = null; 213 if (envOverrides != null) { 214 Map<String, String> map = new HashMap<>(System.getenv()); 215 map.putAll(envOverrides); 216 envp = new String[map.size()]; 217 int idx = 0; 218 for (Map.Entry<String, String> e : map.entrySet()) { 219 envp[idx++] = e.getKey() + "=" + e.getValue(); 220 } 221 } 222 223 Process p = Runtime.getRuntime().exec(command, envp); 224 225 BufferedReader stdInput = new BufferedReader(new InputStreamReader(p.getInputStream())); 226 BufferedReader stdError = new BufferedReader(new InputStreamReader(p.getErrorStream())); 227 228 // read the output from the command 229 String s = null; 230 while ((s = stdInput.readLine()) != null) { 231 System.out.println(s); 232 } 233 234 // read any errors from the attempted command 235 while ((s = stdError.readLine()) != null) { 236 System.out.println(s); 237 } 238 } catch (IOException e) { 239 LOG.error("Error running: " + command, e); 240 } 241 } 242 243 private void shutdownAllProcesses() { 244 LOG.info("Killing daemons using pid files"); 245 final List<String> pidFiles = new ArrayList<>(daemonPidFiles); 246 for (String pidFile : pidFiles) { 247 int pid = 0; 248 try { 249 pid = readPidFromFile(pidFile); 250 } catch (IOException ex) { 251 LOG.error("Could not read pid from file " + pidFile); 252 } 253 254 if (pid > 0) { 255 LOG.info("Killing pid " + pid + " (" + pidFile + ")"); 256 killProcess(pid); 257 } 258 } 259 } 260 261 private void ensureShutdownHookInstalled() { 262 if (shutdownHookInstalled) { 263 return; 264 } 265 266 Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { 267 @Override 268 public void run() { 269 shutdownAllProcesses(); 270 } 271 })); 272 273 shutdownHookInstalled = true; 274 } 275 276 private void cleanupOldState() { 277 executeCommand("rm -rf " + workDir); 278 } 279 280 private void writeStringToFile(String s, String fileName) { 281 try { 282 BufferedWriter out = new BufferedWriter(new FileWriter(fileName)); 283 out.write(s); 284 out.close(); 285 } catch (IOException e) { 286 LOG.error("Error writing to: " + fileName, e); 287 } 288 } 289 290 private String serverWorkingDir(ServerType serverType, int port) { 291 return workDir + "/" + serverType + "-" + port; 292 } 293 294 private int getServerPID(ServerType serverType, int port) throws IOException { 295 String pidFile = pidFilePath(serverType, port); 296 return readPidFromFile(pidFile); 297 } 298 299 private static int readPidFromFile(String pidFile) throws IOException { 300 Scanner scanner = new Scanner(new File(pidFile)); 301 try { 302 return scanner.nextInt(); 303 } finally { 304 scanner.close(); 305 } 306 } 307 308 private String pidFilePath(ServerType serverType, int port) { 309 String dir = serverWorkingDir(serverType, port); 310 String user = System.getenv("USER"); 311 String pidFile = String.format("%s/hbase-%s-%s.pid", dir, user, serverType.fullName); 312 return pidFile; 313 } 314 315 private void killServer(ServerType serverType, int port) throws IOException { 316 int pid = getServerPID(serverType, port); 317 if (pid > 0) { 318 LOG.info("Killing " + serverType + "; pid=" + pid); 319 killProcess(pid); 320 } 321 } 322 323 private void killProcess(int pid) { 324 String cmd = "kill -s KILL " + pid; 325 executeCommand(cmd); 326 } 327 328 private void startServer(ServerType serverType, int rsPort) { 329 // create working directory for this region server. 330 String dir = serverWorkingDir(serverType, rsPort); 331 String confStr = generateConfig(serverType, rsPort, dir); 332 LOG.debug("Creating directory " + dir); 333 new File(dir).mkdirs(); 334 335 writeStringToFile(confStr, dir + "/hbase-site.xml"); 336 337 // Set debug options to an empty string so that hbase-config.sh does not configure them 338 // using default ports. If we want to run remote debugging on process-based local cluster's 339 // daemons, we can automatically choose non-conflicting JDWP and JMX ports for each daemon 340 // and specify them here. 341 writeStringToFile("unset HBASE_MASTER_OPTS\n" + "unset HBASE_REGIONSERVER_OPTS\n" 342 + "unset HBASE_ZOOKEEPER_OPTS\n" + "HBASE_MASTER_DBG_OPTS=' '\n" 343 + "HBASE_REGIONSERVER_DBG_OPTS=' '\n" + "HBASE_ZOOKEEPER_DBG_OPTS=' '\n" 344 + "HBASE_MASTER_JMX_OPTS=' '\n" + "HBASE_REGIONSERVER_JMX_OPTS=' '\n" 345 + "HBASE_ZOOKEEPER_JMX_OPTS=' '\n", dir + "/hbase-env.sh"); 346 347 Map<String, String> envOverrides = new HashMap<>(); 348 envOverrides.put("HBASE_LOG_DIR", dir); 349 envOverrides.put("HBASE_PID_DIR", dir); 350 try { 351 FileUtils.copyFile(new File(hbaseHome, "conf/log4j.properties"), 352 new File(dir, "log4j.properties")); 353 } catch (IOException ex) { 354 LOG.error("Could not install log4j.properties into " + dir); 355 } 356 357 executeCommand(hbaseDaemonScript + " --config " + dir + " start " + serverType.fullName, 358 envOverrides); 359 daemonPidFiles.add(pidFilePath(serverType, rsPort)); 360 logTailDirs.add(dir); 361 } 362 363 private final String generateConfig(ServerType serverType, int rpcPort, String daemonDir) { 364 StringBuilder sb = new StringBuilder(); 365 Map<String, Object> confMap = new TreeMap<>(); 366 confMap.put(HConstants.CLUSTER_DISTRIBUTED, true); 367 368 if (serverType == ServerType.MASTER) { 369 confMap.put(HConstants.MASTER_PORT, rpcPort); 370 371 int masterInfoPort = HBaseTestingUtility.randomFreePort(); 372 reportWebUIPort("master", masterInfoPort); 373 confMap.put(HConstants.MASTER_INFO_PORT, masterInfoPort); 374 } else if (serverType == ServerType.RS) { 375 confMap.put(HConstants.REGIONSERVER_PORT, rpcPort); 376 377 int rsInfoPort = HBaseTestingUtility.randomFreePort(); 378 reportWebUIPort("region server", rsInfoPort); 379 confMap.put(HConstants.REGIONSERVER_INFO_PORT, rsInfoPort); 380 } else { 381 confMap.put(HConstants.ZOOKEEPER_DATA_DIR, daemonDir); 382 } 383 384 confMap.put(HConstants.ZOOKEEPER_CLIENT_PORT, zkClientPort); 385 confMap.put(HConstants.HREGION_MAX_FILESIZE, MAX_FILE_SIZE_OVERRIDE); 386 387 if (dfsCluster != null) { 388 String fsURL = "hdfs://" + HConstants.LOCALHOST + ":" + dfsCluster.getNameNodePort(); 389 confMap.put("fs.defaultFS", fsURL); 390 confMap.put("hbase.rootdir", fsURL + "/hbase_test"); 391 } 392 393 sb.append("<configuration>\n"); 394 for (Map.Entry<String, Object> entry : confMap.entrySet()) { 395 sb.append(" <property>\n"); 396 sb.append(" <name>" + entry.getKey() + "</name>\n"); 397 sb.append(" <value>" + entry.getValue() + "</value>\n"); 398 sb.append(" </property>\n"); 399 } 400 sb.append("</configuration>\n"); 401 return sb.toString(); 402 } 403 404 private static void reportWebUIPort(String daemon, int port) { 405 LOG.info("Local " + daemon + " web UI is at http://" + HConstants.LOCALHOST + ":" + port); 406 } 407 408 public Configuration getConf() { 409 return conf; 410 } 411 412 public void shutdown() { 413 if (dfsCluster != null) { 414 dfsCluster.shutdown(); 415 } 416 shutdownAllProcesses(); 417 } 418 419 private static final Pattern TO_REMOVE_FROM_LOG_LINES_RE = 420 Pattern.compile("org\\.apache\\.hadoop\\.hbase\\."); 421 422 private static final Pattern LOG_PATH_FORMAT_RE = Pattern.compile("^.*/([A-Z]+)-(\\d+)/[^/]+$"); 423 424 private static String processLine(String line) { 425 Matcher m = TO_REMOVE_FROM_LOG_LINES_RE.matcher(line); 426 return m.replaceAll(""); 427 } 428 429 private final class LocalDaemonLogTailer implements Runnable { 430 private final Set<String> tailedFiles = new HashSet<>(); 431 private final List<String> dirList = new ArrayList<>(); 432 private final Object printLock = new Object(); 433 434 private final FilenameFilter LOG_FILES = new FilenameFilter() { 435 @Override 436 public boolean accept(File dir, String name) { 437 return name.endsWith(".out") || name.endsWith(".log"); 438 } 439 }; 440 441 @Override 442 public void run() { 443 try { 444 runInternal(); 445 } catch (IOException ex) { 446 LOG.error(ex.toString(), ex); 447 } 448 } 449 450 private void runInternal() throws IOException { 451 Thread.currentThread().setName(getClass().getSimpleName()); 452 while (true) { 453 scanDirs(); 454 try { 455 Thread.sleep(500); 456 } catch (InterruptedException e) { 457 LOG.error("Log tailer thread interrupted", e); 458 break; 459 } 460 } 461 } 462 463 private void scanDirs() throws FileNotFoundException { 464 dirList.clear(); 465 dirList.addAll(logTailDirs); 466 for (String d : dirList) { 467 for (File f : new File(d).listFiles(LOG_FILES)) { 468 String filePath = f.getAbsolutePath(); 469 if (!tailedFiles.contains(filePath)) { 470 tailedFiles.add(filePath); 471 startTailingFile(filePath); 472 } 473 } 474 } 475 } 476 477 private void startTailingFile(final String filePath) throws FileNotFoundException { 478 final PrintStream dest = filePath.endsWith(".log") ? System.err : System.out; 479 final ServerType serverType; 480 final int serverPort; 481 Matcher m = LOG_PATH_FORMAT_RE.matcher(filePath); 482 if (m.matches()) { 483 serverType = ServerType.valueOf(m.group(1)); 484 serverPort = Integer.valueOf(m.group(2)); 485 } else { 486 LOG.error("Unrecognized log path format: " + filePath); 487 return; 488 } 489 final String logMsgPrefix = 490 "[" + serverType + (serverPort != 0 ? ":" + serverPort : "") + "] "; 491 492 LOG.debug("Tailing " + filePath); 493 Thread t = new Thread(new Runnable() { 494 @Override 495 public void run() { 496 try { 497 FileInputStream fis = new FileInputStream(filePath); 498 BufferedReader br = new BufferedReader(new InputStreamReader(fis)); 499 String line; 500 while (true) { 501 try { 502 Thread.sleep(200); 503 } catch (InterruptedException e) { 504 LOG.error("Tailer for " + filePath + " interrupted"); 505 break; 506 } 507 while ((line = br.readLine()) != null) { 508 line = logMsgPrefix + processLine(line); 509 synchronized (printLock) { 510 if (line.endsWith("\n")) { 511 dest.print(line); 512 } else { 513 dest.println(line); 514 } 515 dest.flush(); 516 } 517 } 518 } 519 } catch (IOException ex) { 520 LOG.error("Failed tailing " + filePath, ex); 521 } 522 } 523 }); 524 t.setDaemon(true); 525 t.setName("Tailer for " + filePath); 526 t.start(); 527 } 528 529 } 530 531 private void startDaemonLogTailer() { 532 logTailerThread = new Thread(new LocalDaemonLogTailer()); 533 logTailerThread.setDaemon(true); 534 logTailerThread.start(); 535 } 536 537}