001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with this 004 * work for additional information regarding copyright ownership. The ASF 005 * licenses this file to you under the Apache License, Version 2.0 (the 006 * "License"); you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 013 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 014 * License for the specific language governing permissions and limitations 015 * under the License. 016 */ 017package org.apache.hadoop.hbase.util; 018 019import java.io.BufferedReader; 020import java.io.BufferedWriter; 021import java.io.File; 022import java.io.FileInputStream; 023import java.io.FileNotFoundException; 024import java.io.FileWriter; 025import java.io.FilenameFilter; 026import java.io.IOException; 027import java.io.InputStreamReader; 028import java.io.PrintStream; 029import java.util.ArrayList; 030import java.util.Collections; 031import java.util.HashMap; 032import java.util.HashSet; 033import java.util.List; 034import java.util.Map; 035import java.util.Scanner; 036import java.util.Set; 037import java.util.TreeMap; 038import java.util.regex.Matcher; 039import java.util.regex.Pattern; 040 041import org.apache.commons.io.FileUtils; 042import org.apache.hadoop.conf.Configuration; 043import org.apache.hadoop.hbase.HBaseTestingUtility; 044import org.apache.hadoop.hbase.HConstants; 045import org.apache.hadoop.hbase.MiniHBaseCluster; 046import org.apache.hadoop.hbase.TableName; 047import org.apache.hadoop.hbase.testclassification.LargeTests; 048import org.apache.hadoop.hbase.testclassification.MiscTests; 049import org.apache.hadoop.hbase.zookeeper.ZKUtil; 050import org.apache.hadoop.hdfs.MiniDFSCluster; 051import org.junit.experimental.categories.Category; 052import org.slf4j.Logger; 053import org.slf4j.LoggerFactory; 054 055/** 056 * A helper class for process-based mini-cluster tests. Unlike 057 * {@link MiniHBaseCluster}, starts daemons as separate processes, allowing to 058 * do real kill testing. 059 */ 060@Category({MiscTests.class, LargeTests.class}) 061public class ProcessBasedLocalHBaseCluster { 062 063 private final String hbaseHome, workDir; 064 private final Configuration conf; 065 private final int numMasters, numRegionServers, numDataNodes; 066 private final List<Integer> rsPorts, masterPorts; 067 068 private final int zkClientPort; 069 070 private static final int MAX_FILE_SIZE_OVERRIDE = 10 * 1000 * 1000; 071 072 private static final Logger LOG = LoggerFactory.getLogger( 073 ProcessBasedLocalHBaseCluster.class); 074 075 private List<String> daemonPidFiles = 076 Collections.synchronizedList(new ArrayList<String>());; 077 078 private boolean shutdownHookInstalled; 079 080 private String hbaseDaemonScript; 081 082 private MiniDFSCluster dfsCluster; 083 084 private HBaseTestingUtility testUtil; 085 086 private Thread logTailerThread; 087 088 private List<String> logTailDirs = Collections.synchronizedList(new ArrayList<String>()); 089 090 private static enum ServerType { 091 MASTER("master"), 092 RS("regionserver"), 093 ZK("zookeeper"); 094 095 private final String fullName; 096 097 private ServerType(String fullName) { 098 this.fullName = fullName; 099 } 100 } 101 102 /** 103 * Constructor. Modifies the passed configuration. 104 * @param conf the {@link Configuration} to use 105 * @param numDataNodes the number of data nodes 106 * @param numRegionServers the number of region servers 107 */ 108 public ProcessBasedLocalHBaseCluster(Configuration conf, 109 int numDataNodes, int numRegionServers) { 110 this.conf = conf; 111 this.hbaseHome = HBaseHomePath.getHomePath(); 112 this.numMasters = 1; 113 this.numRegionServers = numRegionServers; 114 this.workDir = hbaseHome + "/target/local_cluster"; 115 this.numDataNodes = numDataNodes; 116 117 hbaseDaemonScript = hbaseHome + "/bin/hbase-daemon.sh"; 118 zkClientPort = HBaseTestingUtility.randomFreePort(); 119 120 this.rsPorts = sortedPorts(numRegionServers); 121 this.masterPorts = sortedPorts(numMasters); 122 123 conf.set(HConstants.ZOOKEEPER_QUORUM, HConstants.LOCALHOST); 124 conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zkClientPort); 125 } 126 127 /** 128 * Makes this local HBase cluster use a mini-DFS cluster. Must be called before 129 * {@link #startHBase()}. 130 * @throws IOException 131 */ 132 public void startMiniDFS() throws Exception { 133 if (testUtil == null) { 134 testUtil = new HBaseTestingUtility(conf); 135 } 136 dfsCluster = testUtil.startMiniDFSCluster(numDataNodes); 137 } 138 139 /** 140 * Generates a list of random port numbers in the sorted order. A sorted 141 * order makes sense if we ever want to refer to these servers by their index 142 * in the returned array, e.g. server #0, #1, etc. 143 */ 144 private static List<Integer> sortedPorts(int n) { 145 List<Integer> ports = new ArrayList<>(n); 146 for (int i = 0; i < n; ++i) { 147 ports.add(HBaseTestingUtility.randomFreePort()); 148 } 149 Collections.sort(ports); 150 return ports; 151 } 152 153 public void startHBase() throws IOException { 154 startDaemonLogTailer(); 155 cleanupOldState(); 156 157 // start ZK 158 LOG.info("Starting ZooKeeper on port " + zkClientPort); 159 startZK(); 160 161 HBaseTestingUtility.waitForHostPort(HConstants.LOCALHOST, zkClientPort); 162 163 for (int masterPort : masterPorts) { 164 startMaster(masterPort); 165 } 166 167 ZKUtil.waitForBaseZNode(conf); 168 169 for (int rsPort : rsPorts) { 170 startRegionServer(rsPort); 171 } 172 173 LOG.info("Waiting for HBase startup by scanning META"); 174 int attemptsLeft = 10; 175 while (attemptsLeft-- > 0) { 176 try { 177 testUtil.getConnection().getTable(TableName.META_TABLE_NAME); 178 } catch (Exception e) { 179 LOG.info("Waiting for HBase to startup. Retries left: " + attemptsLeft, 180 e); 181 Threads.sleep(1000); 182 } 183 } 184 185 LOG.info("Process-based HBase Cluster with " + numRegionServers + 186 " region servers up and running... \n\n"); 187 } 188 189 public void startRegionServer(int port) { 190 startServer(ServerType.RS, port); 191 } 192 193 public void startMaster(int port) { 194 startServer(ServerType.MASTER, port); 195 } 196 197 public void killRegionServer(int port) throws IOException { 198 killServer(ServerType.RS, port); 199 } 200 201 public void killMaster() throws IOException { 202 killServer(ServerType.MASTER, 0); 203 } 204 205 public void startZK() { 206 startServer(ServerType.ZK, 0); 207 } 208 209 private void executeCommand(String command) { 210 executeCommand(command, null); 211 } 212 213 private void executeCommand(String command, Map<String, 214 String> envOverrides) { 215 ensureShutdownHookInstalled(); 216 LOG.debug("Command : " + command); 217 218 try { 219 String [] envp = null; 220 if (envOverrides != null) { 221 Map<String, String> map = new HashMap<>(System.getenv()); 222 map.putAll(envOverrides); 223 envp = new String[map.size()]; 224 int idx = 0; 225 for (Map.Entry<String, String> e: map.entrySet()) { 226 envp[idx++] = e.getKey() + "=" + e.getValue(); 227 } 228 } 229 230 Process p = Runtime.getRuntime().exec(command, envp); 231 232 BufferedReader stdInput = new BufferedReader( 233 new InputStreamReader(p.getInputStream())); 234 BufferedReader stdError = new BufferedReader( 235 new InputStreamReader(p.getErrorStream())); 236 237 // read the output from the command 238 String s = null; 239 while ((s = stdInput.readLine()) != null) { 240 System.out.println(s); 241 } 242 243 // read any errors from the attempted command 244 while ((s = stdError.readLine()) != null) { 245 System.out.println(s); 246 } 247 } catch (IOException e) { 248 LOG.error("Error running: " + command, e); 249 } 250 } 251 252 private void shutdownAllProcesses() { 253 LOG.info("Killing daemons using pid files"); 254 final List<String> pidFiles = new ArrayList<>(daemonPidFiles); 255 for (String pidFile : pidFiles) { 256 int pid = 0; 257 try { 258 pid = readPidFromFile(pidFile); 259 } catch (IOException ex) { 260 LOG.error("Could not read pid from file " + pidFile); 261 } 262 263 if (pid > 0) { 264 LOG.info("Killing pid " + pid + " (" + pidFile + ")"); 265 killProcess(pid); 266 } 267 } 268 } 269 270 private void ensureShutdownHookInstalled() { 271 if (shutdownHookInstalled) { 272 return; 273 } 274 275 Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { 276 @Override 277 public void run() { 278 shutdownAllProcesses(); 279 } 280 })); 281 282 shutdownHookInstalled = true; 283 } 284 285 private void cleanupOldState() { 286 executeCommand("rm -rf " + workDir); 287 } 288 289 private void writeStringToFile(String s, String fileName) { 290 try { 291 BufferedWriter out = new BufferedWriter(new FileWriter(fileName)); 292 out.write(s); 293 out.close(); 294 } catch (IOException e) { 295 LOG.error("Error writing to: " + fileName, e); 296 } 297 } 298 299 private String serverWorkingDir(ServerType serverType, int port) { 300 return workDir + "/" + serverType + "-" + port; 301 } 302 303 private int getServerPID(ServerType serverType, int port) throws IOException { 304 String pidFile = pidFilePath(serverType, port); 305 return readPidFromFile(pidFile); 306 } 307 308 private static int readPidFromFile(String pidFile) throws IOException { 309 Scanner scanner = new Scanner(new File(pidFile)); 310 try { 311 return scanner.nextInt(); 312 } finally { 313 scanner.close(); 314 } 315 } 316 317 private String pidFilePath(ServerType serverType, int port) { 318 String dir = serverWorkingDir(serverType, port); 319 String user = System.getenv("USER"); 320 String pidFile = String.format("%s/hbase-%s-%s.pid", 321 dir, user, serverType.fullName); 322 return pidFile; 323 } 324 325 private void killServer(ServerType serverType, int port) throws IOException { 326 int pid = getServerPID(serverType, port); 327 if (pid > 0) { 328 LOG.info("Killing " + serverType + "; pid=" + pid); 329 killProcess(pid); 330 } 331 } 332 333 private void killProcess(int pid) { 334 String cmd = "kill -s KILL " + pid; 335 executeCommand(cmd); 336 } 337 338 private void startServer(ServerType serverType, int rsPort) { 339 // create working directory for this region server. 340 String dir = serverWorkingDir(serverType, rsPort); 341 String confStr = generateConfig(serverType, rsPort, dir); 342 LOG.debug("Creating directory " + dir); 343 new File(dir).mkdirs(); 344 345 writeStringToFile(confStr, dir + "/hbase-site.xml"); 346 347 // Set debug options to an empty string so that hbase-config.sh does not configure them 348 // using default ports. If we want to run remote debugging on process-based local cluster's 349 // daemons, we can automatically choose non-conflicting JDWP and JMX ports for each daemon 350 // and specify them here. 351 writeStringToFile( 352 "unset HBASE_MASTER_OPTS\n" + 353 "unset HBASE_REGIONSERVER_OPTS\n" + 354 "unset HBASE_ZOOKEEPER_OPTS\n" + 355 "HBASE_MASTER_DBG_OPTS=' '\n" + 356 "HBASE_REGIONSERVER_DBG_OPTS=' '\n" + 357 "HBASE_ZOOKEEPER_DBG_OPTS=' '\n" + 358 "HBASE_MASTER_JMX_OPTS=' '\n" + 359 "HBASE_REGIONSERVER_JMX_OPTS=' '\n" + 360 "HBASE_ZOOKEEPER_JMX_OPTS=' '\n", 361 dir + "/hbase-env.sh"); 362 363 Map<String, String> envOverrides = new HashMap<>(); 364 envOverrides.put("HBASE_LOG_DIR", dir); 365 envOverrides.put("HBASE_PID_DIR", dir); 366 try { 367 FileUtils.copyFile( 368 new File(hbaseHome, "conf/log4j.properties"), 369 new File(dir, "log4j.properties")); 370 } catch (IOException ex) { 371 LOG.error("Could not install log4j.properties into " + dir); 372 } 373 374 executeCommand(hbaseDaemonScript + " --config " + dir + 375 " start " + serverType.fullName, envOverrides); 376 daemonPidFiles.add(pidFilePath(serverType, rsPort)); 377 logTailDirs.add(dir); 378 } 379 380 private final String generateConfig(ServerType serverType, int rpcPort, 381 String daemonDir) { 382 StringBuilder sb = new StringBuilder(); 383 Map<String, Object> confMap = new TreeMap<>(); 384 confMap.put(HConstants.CLUSTER_DISTRIBUTED, true); 385 386 if (serverType == ServerType.MASTER) { 387 confMap.put(HConstants.MASTER_PORT, rpcPort); 388 389 int masterInfoPort = HBaseTestingUtility.randomFreePort(); 390 reportWebUIPort("master", masterInfoPort); 391 confMap.put(HConstants.MASTER_INFO_PORT, masterInfoPort); 392 } else if (serverType == ServerType.RS) { 393 confMap.put(HConstants.REGIONSERVER_PORT, rpcPort); 394 395 int rsInfoPort = HBaseTestingUtility.randomFreePort(); 396 reportWebUIPort("region server", rsInfoPort); 397 confMap.put(HConstants.REGIONSERVER_INFO_PORT, rsInfoPort); 398 } else { 399 confMap.put(HConstants.ZOOKEEPER_DATA_DIR, daemonDir); 400 } 401 402 confMap.put(HConstants.ZOOKEEPER_CLIENT_PORT, zkClientPort); 403 confMap.put(HConstants.HREGION_MAX_FILESIZE, MAX_FILE_SIZE_OVERRIDE); 404 405 if (dfsCluster != null) { 406 String fsURL = "hdfs://" + HConstants.LOCALHOST + ":" + dfsCluster.getNameNodePort(); 407 confMap.put("fs.defaultFS", fsURL); 408 confMap.put("hbase.rootdir", fsURL + "/hbase_test"); 409 } 410 411 sb.append("<configuration>\n"); 412 for (Map.Entry<String, Object> entry : confMap.entrySet()) { 413 sb.append(" <property>\n"); 414 sb.append(" <name>" + entry.getKey() + "</name>\n"); 415 sb.append(" <value>" + entry.getValue() + "</value>\n"); 416 sb.append(" </property>\n"); 417 } 418 sb.append("</configuration>\n"); 419 return sb.toString(); 420 } 421 422 private static void reportWebUIPort(String daemon, int port) { 423 LOG.info("Local " + daemon + " web UI is at http://" 424 + HConstants.LOCALHOST + ":" + port); 425 } 426 427 public Configuration getConf() { 428 return conf; 429 } 430 431 public void shutdown() { 432 if (dfsCluster != null) { 433 dfsCluster.shutdown(); 434 } 435 shutdownAllProcesses(); 436 } 437 438 private static final Pattern TO_REMOVE_FROM_LOG_LINES_RE = 439 Pattern.compile("org\\.apache\\.hadoop\\.hbase\\."); 440 441 private static final Pattern LOG_PATH_FORMAT_RE = 442 Pattern.compile("^.*/([A-Z]+)-(\\d+)/[^/]+$"); 443 444 private static String processLine(String line) { 445 Matcher m = TO_REMOVE_FROM_LOG_LINES_RE.matcher(line); 446 return m.replaceAll(""); 447 } 448 449 private final class LocalDaemonLogTailer implements Runnable { 450 private final Set<String> tailedFiles = new HashSet<>(); 451 private final List<String> dirList = new ArrayList<>(); 452 private final Object printLock = new Object(); 453 454 private final FilenameFilter LOG_FILES = new FilenameFilter() { 455 @Override 456 public boolean accept(File dir, String name) { 457 return name.endsWith(".out") || name.endsWith(".log"); 458 } 459 }; 460 461 @Override 462 public void run() { 463 try { 464 runInternal(); 465 } catch (IOException ex) { 466 LOG.error(ex.toString(), ex); 467 } 468 } 469 470 private void runInternal() throws IOException { 471 Thread.currentThread().setName(getClass().getSimpleName()); 472 while (true) { 473 scanDirs(); 474 try { 475 Thread.sleep(500); 476 } catch (InterruptedException e) { 477 LOG.error("Log tailer thread interrupted", e); 478 break; 479 } 480 } 481 } 482 483 private void scanDirs() throws FileNotFoundException { 484 dirList.clear(); 485 dirList.addAll(logTailDirs); 486 for (String d : dirList) { 487 for (File f : new File(d).listFiles(LOG_FILES)) { 488 String filePath = f.getAbsolutePath(); 489 if (!tailedFiles.contains(filePath)) { 490 tailedFiles.add(filePath); 491 startTailingFile(filePath); 492 } 493 } 494 } 495 } 496 497 private void startTailingFile(final String filePath) throws FileNotFoundException { 498 final PrintStream dest = filePath.endsWith(".log") ? System.err : System.out; 499 final ServerType serverType; 500 final int serverPort; 501 Matcher m = LOG_PATH_FORMAT_RE.matcher(filePath); 502 if (m.matches()) { 503 serverType = ServerType.valueOf(m.group(1)); 504 serverPort = Integer.valueOf(m.group(2)); 505 } else { 506 LOG.error("Unrecognized log path format: " + filePath); 507 return; 508 } 509 final String logMsgPrefix = 510 "[" + serverType + (serverPort != 0 ? ":" + serverPort : "") + "] "; 511 512 LOG.debug("Tailing " + filePath); 513 Thread t = new Thread(new Runnable() { 514 @Override 515 public void run() { 516 try { 517 FileInputStream fis = new FileInputStream(filePath); 518 BufferedReader br = new BufferedReader(new InputStreamReader(fis)); 519 String line; 520 while (true) { 521 try { 522 Thread.sleep(200); 523 } catch (InterruptedException e) { 524 LOG.error("Tailer for " + filePath + " interrupted"); 525 break; 526 } 527 while ((line = br.readLine()) != null) { 528 line = logMsgPrefix + processLine(line); 529 synchronized (printLock) { 530 if (line.endsWith("\n")) { 531 dest.print(line); 532 } else { 533 dest.println(line); 534 } 535 dest.flush(); 536 } 537 } 538 } 539 } catch (IOException ex) { 540 LOG.error("Failed tailing " + filePath, ex); 541 } 542 } 543 }); 544 t.setDaemon(true); 545 t.setName("Tailer for " + filePath); 546 t.start(); 547 } 548 549 } 550 551 private void startDaemonLogTailer() { 552 logTailerThread = new Thread(new LocalDaemonLogTailer()); 553 logTailerThread.setDaemon(true); 554 logTailerThread.start(); 555 } 556 557} 558