001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master; 019 020import static org.apache.hadoop.hbase.SplitLogCounters.resetCounters; 021import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_heartbeat; 022import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_node_create_queued; 023import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_orphan_task_acquired; 024import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_rescan; 025import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_rescan_deleted; 026import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit; 027import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_dead_server_task; 028import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_failed; 029import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_force; 030import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_threshold_reached; 031import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_unassigned; 032import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_task_deleted; 033import static org.junit.Assert.assertEquals; 034import static org.junit.Assert.assertFalse; 035import static org.junit.Assert.assertTrue; 036 037import java.io.IOException; 038import java.util.Map; 039import java.util.concurrent.atomic.LongAdder; 040import org.apache.hadoop.conf.Configuration; 041import org.apache.hadoop.fs.FileSystem; 042import org.apache.hadoop.fs.Path; 043import org.apache.hadoop.hbase.CoordinatedStateManager; 044import org.apache.hadoop.hbase.HBaseClassTestRule; 045import org.apache.hadoop.hbase.HBaseTestingUtility; 046import org.apache.hadoop.hbase.HConstants; 047import org.apache.hadoop.hbase.ServerName; 048import org.apache.hadoop.hbase.SplitLogCounters; 049import org.apache.hadoop.hbase.SplitLogTask; 050import org.apache.hadoop.hbase.Waiter; 051import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination; 052import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager; 053import org.apache.hadoop.hbase.master.SplitLogManager.Task; 054import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch; 055import org.apache.hadoop.hbase.testclassification.LargeTests; 056import org.apache.hadoop.hbase.testclassification.MasterTests; 057import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 058import org.apache.hadoop.hbase.zookeeper.TestMasterAddressTracker.NodeCreationListener; 059import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; 060import org.apache.hadoop.hbase.zookeeper.ZKUtil; 061import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 062import org.apache.zookeeper.CreateMode; 063import org.apache.zookeeper.KeeperException; 064import org.apache.zookeeper.ZooDefs.Ids; 065import org.junit.After; 066import org.junit.Assert; 067import org.junit.Before; 068import org.junit.ClassRule; 069import org.junit.Test; 070import org.junit.experimental.categories.Category; 071import org.mockito.Mockito; 072import org.slf4j.Logger; 073import org.slf4j.LoggerFactory; 074 075@Category({ MasterTests.class, LargeTests.class }) 076public class TestSplitLogManager { 077 078 @ClassRule 079 public static final HBaseClassTestRule CLASS_RULE = 080 HBaseClassTestRule.forClass(TestSplitLogManager.class); 081 082 private static final Logger LOG = LoggerFactory.getLogger(TestSplitLogManager.class); 083 084 private final ServerManager sm = Mockito.mock(ServerManager.class); 085 086 private ZKWatcher zkw; 087 private DummyMasterServices master; 088 private SplitLogManager slm; 089 private Configuration conf; 090 private int to; 091 092 private static HBaseTestingUtility TEST_UTIL; 093 094 class DummyMasterServices extends MockNoopMasterServices { 095 private ZKWatcher zkw; 096 private CoordinatedStateManager cm; 097 098 public DummyMasterServices(ZKWatcher zkw, Configuration conf) { 099 super(conf); 100 this.zkw = zkw; 101 cm = new ZkCoordinatedStateManager(this); 102 } 103 104 @Override 105 public ZKWatcher getZooKeeper() { 106 return zkw; 107 } 108 109 @Override 110 public CoordinatedStateManager getCoordinatedStateManager() { 111 return cm; 112 } 113 114 @Override 115 public ServerManager getServerManager() { 116 return sm; 117 } 118 } 119 120 @Before 121 public void setup() throws Exception { 122 TEST_UTIL = new HBaseTestingUtility(); 123 TEST_UTIL.startMiniZKCluster(); 124 conf = TEST_UTIL.getConfiguration(); 125 // Use a different ZK wrapper instance for each tests. 126 zkw = 127 new ZKWatcher(conf, "split-log-manager-tests" + TEST_UTIL.getRandomUUID().toString(), null); 128 master = new DummyMasterServices(zkw, conf); 129 130 ZKUtil.deleteChildrenRecursively(zkw, zkw.getZNodePaths().baseZNode); 131 ZKUtil.createAndFailSilent(zkw, zkw.getZNodePaths().baseZNode); 132 assertTrue(ZKUtil.checkExists(zkw, zkw.getZNodePaths().baseZNode) != -1); 133 LOG.debug(zkw.getZNodePaths().baseZNode + " created"); 134 ZKUtil.createAndFailSilent(zkw, zkw.getZNodePaths().splitLogZNode); 135 assertTrue(ZKUtil.checkExists(zkw, zkw.getZNodePaths().splitLogZNode) != -1); 136 LOG.debug(zkw.getZNodePaths().splitLogZNode + " created"); 137 138 resetCounters(); 139 140 // By default, we let the test manage the error as before, so the server 141 // does not appear as dead from the master point of view, only from the split log pov. 142 Mockito.when(sm.isServerOnline(Mockito.any())).thenReturn(true); 143 144 to = 12000; 145 conf.setInt(HConstants.HBASE_SPLITLOG_MANAGER_TIMEOUT, to); 146 conf.setInt("hbase.splitlog.manager.unassigned.timeout", 2 * to); 147 148 conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100); 149 to = to + 16 * 100; 150 } 151 152 @After 153 public void teardown() throws IOException, KeeperException { 154 master.stop(""); 155 if (slm != null) { 156 slm.stop(); 157 } 158 TEST_UTIL.shutdownMiniZKCluster(); 159 } 160 161 @Test 162 public void testBatchWaitMillis() { 163 assertEquals(100, SplitLogManager.getBatchWaitTimeMillis(0)); 164 assertEquals(100, SplitLogManager.getBatchWaitTimeMillis(1)); 165 assertEquals(1000, SplitLogManager.getBatchWaitTimeMillis(10)); 166 assertEquals(60_000, SplitLogManager.getBatchWaitTimeMillis(101)); 167 assertEquals(60_000, SplitLogManager.getBatchWaitTimeMillis(1011)); 168 } 169 170 private interface Expr { 171 long eval(); 172 } 173 174 private void waitForCounter(final LongAdder ctr, long oldval, long newval, long timems) 175 throws Exception { 176 Expr e = new Expr() { 177 @Override 178 public long eval() { 179 return ctr.sum(); 180 } 181 }; 182 waitForCounter(e, oldval, newval, timems); 183 return; 184 } 185 186 private void waitForCounter(final Expr e, final long oldval, long newval, long timems) 187 throws Exception { 188 189 TEST_UTIL.waitFor(timems, 10, new Waiter.Predicate<Exception>() { 190 @Override 191 public boolean evaluate() throws Exception { 192 return (e.eval() != oldval); 193 } 194 }); 195 196 assertEquals(newval, e.eval()); 197 } 198 199 private Task findOrCreateOrphanTask(String path) { 200 return slm.tasks.computeIfAbsent(path, k -> { 201 LOG.info("creating orphan task " + k); 202 SplitLogCounters.tot_mgr_orphan_task_acquired.increment(); 203 return new Task(); 204 }); 205 } 206 207 private String submitTaskAndWait(TaskBatch batch, String name) 208 throws KeeperException, InterruptedException { 209 String tasknode = ZKSplitLog.getEncodedNodeName(zkw, name); 210 NodeCreationListener listener = new NodeCreationListener(zkw, tasknode); 211 zkw.registerListener(listener); 212 ZKUtil.watchAndCheckExists(zkw, tasknode); 213 214 slm.enqueueSplitTask(name, batch); 215 assertEquals(1, batch.installed); 216 assertTrue(findOrCreateOrphanTask(tasknode).batch == batch); 217 assertEquals(1L, tot_mgr_node_create_queued.sum()); 218 219 LOG.debug("waiting for task node creation"); 220 listener.waitForCreation(); 221 LOG.debug("task created"); 222 return tasknode; 223 } 224 225 /** 226 * Test whether the splitlog correctly creates a task in zookeeper 227 */ 228 @Test 229 public void testTaskCreation() throws Exception { 230 231 LOG.info("TestTaskCreation - test the creation of a task in zk"); 232 slm = new SplitLogManager(master, conf); 233 TaskBatch batch = new TaskBatch(); 234 235 String tasknode = submitTaskAndWait(batch, "foo/1"); 236 237 byte[] data = ZKUtil.getData(zkw, tasknode); 238 SplitLogTask slt = SplitLogTask.parseFrom(data); 239 LOG.info("Task node created " + slt.toString()); 240 assertTrue(slt.isUnassigned(master.getServerName())); 241 } 242 243 @Test 244 public void testOrphanTaskAcquisition() throws Exception { 245 LOG.info("TestOrphanTaskAcquisition"); 246 247 String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash"); 248 SplitLogTask slt = new SplitLogTask.Owned(master.getServerName()); 249 zkw.getRecoverableZooKeeper().create(tasknode, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE, 250 CreateMode.PERSISTENT); 251 252 slm = new SplitLogManager(master, conf); 253 waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to / 2); 254 Task task = findOrCreateOrphanTask(tasknode); 255 assertTrue(task.isOrphan()); 256 waitForCounter(tot_mgr_heartbeat, 0, 1, to / 2); 257 assertFalse(task.isUnassigned()); 258 long curt = EnvironmentEdgeManager.currentTime(); 259 assertTrue((task.last_update <= curt) && (task.last_update > (curt - 1000))); 260 LOG.info("waiting for manager to resubmit the orphan task"); 261 waitForCounter(tot_mgr_resubmit, 0, 1, to + to / 2); 262 assertTrue(task.isUnassigned()); 263 waitForCounter(tot_mgr_rescan, 0, 1, to + to / 2); 264 } 265 266 @Test 267 public void testUnassignedOrphan() throws Exception { 268 LOG.info("TestUnassignedOrphan - an unassigned task is resubmitted at" + " startup"); 269 String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash"); 270 // create an unassigned orphan task 271 SplitLogTask slt = new SplitLogTask.Unassigned(master.getServerName()); 272 zkw.getRecoverableZooKeeper().create(tasknode, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE, 273 CreateMode.PERSISTENT); 274 int version = ZKUtil.checkExists(zkw, tasknode); 275 276 slm = new SplitLogManager(master, conf); 277 waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to / 2); 278 Task task = findOrCreateOrphanTask(tasknode); 279 assertTrue(task.isOrphan()); 280 assertTrue(task.isUnassigned()); 281 // wait for RESCAN node to be created 282 waitForCounter(tot_mgr_rescan, 0, 1, to / 2); 283 Task task2 = findOrCreateOrphanTask(tasknode); 284 assertTrue(task == task2); 285 LOG.debug("task = " + task); 286 assertEquals(1L, tot_mgr_resubmit.sum()); 287 assertEquals(1, task.incarnation.get()); 288 assertEquals(0, task.unforcedResubmits.get()); 289 assertTrue(task.isOrphan()); 290 assertTrue(task.isUnassigned()); 291 assertTrue(ZKUtil.checkExists(zkw, tasknode) > version); 292 } 293 294 @Test 295 public void testMultipleResubmits() throws Exception { 296 LOG.info("TestMultipleResbmits - no indefinite resubmissions"); 297 conf.setInt("hbase.splitlog.max.resubmit", 2); 298 slm = new SplitLogManager(master, conf); 299 TaskBatch batch = new TaskBatch(); 300 301 String tasknode = submitTaskAndWait(batch, "foo/1"); 302 int version = ZKUtil.checkExists(zkw, tasknode); 303 final ServerName worker1 = ServerName.valueOf("worker1,1,1"); 304 final ServerName worker2 = ServerName.valueOf("worker2,1,1"); 305 final ServerName worker3 = ServerName.valueOf("worker3,1,1"); 306 SplitLogTask slt = new SplitLogTask.Owned(worker1); 307 ZKUtil.setData(zkw, tasknode, slt.toByteArray()); 308 waitForCounter(tot_mgr_heartbeat, 0, 1, to / 2); 309 waitForCounter(tot_mgr_resubmit, 0, 1, to + to / 2); 310 int version1 = ZKUtil.checkExists(zkw, tasknode); 311 assertTrue(version1 > version); 312 slt = new SplitLogTask.Owned(worker2); 313 ZKUtil.setData(zkw, tasknode, slt.toByteArray()); 314 waitForCounter(tot_mgr_heartbeat, 1, 2, to / 2); 315 waitForCounter(tot_mgr_resubmit, 1, 2, to + to / 2); 316 int version2 = ZKUtil.checkExists(zkw, tasknode); 317 assertTrue(version2 > version1); 318 slt = new SplitLogTask.Owned(worker3); 319 ZKUtil.setData(zkw, tasknode, slt.toByteArray()); 320 waitForCounter(tot_mgr_heartbeat, 2, 3, to / 2); 321 waitForCounter(tot_mgr_resubmit_threshold_reached, 0, 1, to + to / 2); 322 Thread.sleep(to + to / 2); 323 assertEquals(2L, tot_mgr_resubmit.sum() - tot_mgr_resubmit_force.sum()); 324 } 325 326 @Test 327 public void testRescanCleanup() throws Exception { 328 LOG.info("TestRescanCleanup - ensure RESCAN nodes are cleaned up"); 329 330 slm = new SplitLogManager(master, conf); 331 TaskBatch batch = new TaskBatch(); 332 333 String tasknode = submitTaskAndWait(batch, "foo/1"); 334 int version = ZKUtil.checkExists(zkw, tasknode); 335 final ServerName worker1 = ServerName.valueOf("worker1,1,1"); 336 SplitLogTask slt = new SplitLogTask.Owned(worker1); 337 ZKUtil.setData(zkw, tasknode, slt.toByteArray()); 338 waitForCounter(tot_mgr_heartbeat, 0, 1, to / 2); 339 waitForCounter(new Expr() { 340 @Override 341 public long eval() { 342 return (tot_mgr_resubmit.sum() + tot_mgr_resubmit_failed.sum()); 343 } 344 }, 0, 1, 5 * 60000); // wait long enough 345 Assert.assertEquals("Could not run test. Lost ZK connection?", 0, 346 tot_mgr_resubmit_failed.sum()); 347 int version1 = ZKUtil.checkExists(zkw, tasknode); 348 assertTrue(version1 > version); 349 byte[] taskstate = ZKUtil.getData(zkw, tasknode); 350 slt = SplitLogTask.parseFrom(taskstate); 351 assertTrue(slt.isUnassigned(master.getServerName())); 352 353 waitForCounter(tot_mgr_rescan_deleted, 0, 1, to / 2); 354 } 355 356 @Test 357 public void testTaskDone() throws Exception { 358 LOG.info("TestTaskDone - cleanup task node once in DONE state"); 359 360 slm = new SplitLogManager(master, conf); 361 TaskBatch batch = new TaskBatch(); 362 String tasknode = submitTaskAndWait(batch, "foo/1"); 363 final ServerName worker1 = ServerName.valueOf("worker1,1,1"); 364 SplitLogTask slt = new SplitLogTask.Done(worker1); 365 ZKUtil.setData(zkw, tasknode, slt.toByteArray()); 366 synchronized (batch) { 367 while (batch.installed != batch.done) { 368 batch.wait(); 369 } 370 } 371 waitForCounter(tot_mgr_task_deleted, 0, 1, to / 2); 372 assertTrue(ZKUtil.checkExists(zkw, tasknode) == -1); 373 } 374 375 @Test 376 public void testTaskErr() throws Exception { 377 LOG.info("TestTaskErr - cleanup task node once in ERR state"); 378 379 conf.setInt("hbase.splitlog.max.resubmit", 0); 380 slm = new SplitLogManager(master, conf); 381 TaskBatch batch = new TaskBatch(); 382 383 String tasknode = submitTaskAndWait(batch, "foo/1"); 384 final ServerName worker1 = ServerName.valueOf("worker1,1,1"); 385 SplitLogTask slt = new SplitLogTask.Err(worker1); 386 ZKUtil.setData(zkw, tasknode, slt.toByteArray()); 387 388 synchronized (batch) { 389 while (batch.installed != batch.error) { 390 batch.wait(); 391 } 392 } 393 waitForCounter(tot_mgr_task_deleted, 0, 1, to / 2); 394 assertTrue(ZKUtil.checkExists(zkw, tasknode) == -1); 395 conf.setInt("hbase.splitlog.max.resubmit", ZKSplitLogManagerCoordination.DEFAULT_MAX_RESUBMIT); 396 } 397 398 @Test 399 public void testTaskResigned() throws Exception { 400 LOG.info("TestTaskResigned - resubmit task node once in RESIGNED state"); 401 assertEquals(0, tot_mgr_resubmit.sum()); 402 slm = new SplitLogManager(master, conf); 403 assertEquals(0, tot_mgr_resubmit.sum()); 404 TaskBatch batch = new TaskBatch(); 405 String tasknode = submitTaskAndWait(batch, "foo/1"); 406 assertEquals(0, tot_mgr_resubmit.sum()); 407 final ServerName worker1 = ServerName.valueOf("worker1,1,1"); 408 assertEquals(0, tot_mgr_resubmit.sum()); 409 SplitLogTask slt = new SplitLogTask.Resigned(worker1); 410 assertEquals(0, tot_mgr_resubmit.sum()); 411 ZKUtil.setData(zkw, tasknode, slt.toByteArray()); 412 ZKUtil.checkExists(zkw, tasknode); 413 // Could be small race here. 414 if (tot_mgr_resubmit.sum() == 0) { 415 waitForCounter(tot_mgr_resubmit, 0, 1, to / 2); 416 } 417 assertEquals(1, tot_mgr_resubmit.sum()); 418 419 byte[] taskstate = ZKUtil.getData(zkw, tasknode); 420 slt = SplitLogTask.parseFrom(taskstate); 421 assertTrue(slt.isUnassigned(master.getServerName())); 422 } 423 424 @Test 425 public void testUnassignedTimeout() throws Exception { 426 LOG.info("TestUnassignedTimeout - iff all tasks are unassigned then" + " resubmit"); 427 428 // create an orphan task in OWNED state 429 String tasknode1 = ZKSplitLog.getEncodedNodeName(zkw, "orphan/1"); 430 final ServerName worker1 = ServerName.valueOf("worker1,1,1"); 431 SplitLogTask slt = new SplitLogTask.Owned(worker1); 432 zkw.getRecoverableZooKeeper().create(tasknode1, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE, 433 CreateMode.PERSISTENT); 434 435 slm = new SplitLogManager(master, conf); 436 waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to / 2); 437 438 // submit another task which will stay in unassigned mode 439 TaskBatch batch = new TaskBatch(); 440 submitTaskAndWait(batch, "foo/1"); 441 442 // keep updating the orphan owned node every to/2 seconds 443 for (int i = 0; i < (3 * to) / 100; i++) { 444 Thread.sleep(100); 445 final ServerName worker2 = ServerName.valueOf("worker1,1,1"); 446 slt = new SplitLogTask.Owned(worker2); 447 ZKUtil.setData(zkw, tasknode1, slt.toByteArray()); 448 } 449 450 // since we have stopped heartbeating the owned node therefore it should 451 // get resubmitted 452 LOG.info("waiting for manager to resubmit the orphan task"); 453 waitForCounter(tot_mgr_resubmit, 0, 1, to + to / 2); 454 455 // now all the nodes are unassigned. manager should post another rescan 456 waitForCounter(tot_mgr_resubmit_unassigned, 0, 1, 2 * to + to / 2); 457 } 458 459 @Test 460 public void testDeadWorker() throws Exception { 461 LOG.info("testDeadWorker"); 462 463 conf.setLong("hbase.splitlog.max.resubmit", 0); 464 slm = new SplitLogManager(master, conf); 465 TaskBatch batch = new TaskBatch(); 466 467 String tasknode = submitTaskAndWait(batch, "foo/1"); 468 int version = ZKUtil.checkExists(zkw, tasknode); 469 final ServerName worker1 = ServerName.valueOf("worker1,1,1"); 470 SplitLogTask slt = new SplitLogTask.Owned(worker1); 471 ZKUtil.setData(zkw, tasknode, slt.toByteArray()); 472 if (tot_mgr_heartbeat.sum() == 0) { 473 waitForCounter(tot_mgr_heartbeat, 0, 1, to / 2); 474 } 475 slm.handleDeadWorker(worker1); 476 if (tot_mgr_resubmit.sum() == 0) { 477 waitForCounter(tot_mgr_resubmit, 0, 1, to + to / 2); 478 } 479 if (tot_mgr_resubmit_dead_server_task.sum() == 0) { 480 waitForCounter(tot_mgr_resubmit_dead_server_task, 0, 1, to + to / 2); 481 } 482 483 int version1 = ZKUtil.checkExists(zkw, tasknode); 484 assertTrue(version1 > version); 485 byte[] taskstate = ZKUtil.getData(zkw, tasknode); 486 slt = SplitLogTask.parseFrom(taskstate); 487 assertTrue(slt.isUnassigned(master.getServerName())); 488 return; 489 } 490 491 @Test 492 public void testWorkerCrash() throws Exception { 493 slm = new SplitLogManager(master, conf); 494 TaskBatch batch = new TaskBatch(); 495 496 String tasknode = submitTaskAndWait(batch, "foo/1"); 497 final ServerName worker1 = ServerName.valueOf("worker1,1,1"); 498 499 SplitLogTask slt = new SplitLogTask.Owned(worker1); 500 ZKUtil.setData(zkw, tasknode, slt.toByteArray()); 501 if (tot_mgr_heartbeat.sum() == 0) { 502 waitForCounter(tot_mgr_heartbeat, 0, 1, to / 2); 503 } 504 505 // Not yet resubmitted. 506 Assert.assertEquals(0, tot_mgr_resubmit.sum()); 507 508 // This server becomes dead 509 Mockito.when(sm.isServerOnline(worker1)).thenReturn(false); 510 511 Thread.sleep(1300); // The timeout checker is done every 1000 ms (hardcoded). 512 513 // It has been resubmitted 514 Assert.assertEquals(1, tot_mgr_resubmit.sum()); 515 } 516 517 @Test 518 public void testEmptyLogDir() throws Exception { 519 LOG.info("testEmptyLogDir"); 520 slm = new SplitLogManager(master, conf); 521 FileSystem fs = TEST_UTIL.getTestFileSystem(); 522 Path emptyLogDirPath = 523 new Path(new Path(fs.getWorkingDirectory(), HConstants.HREGION_LOGDIR_NAME), 524 ServerName.valueOf("emptyLogDir", 1, 1).toString()); 525 fs.mkdirs(emptyLogDirPath); 526 slm.splitLogDistributed(emptyLogDirPath); 527 assertFalse(fs.exists(emptyLogDirPath)); 528 } 529 530 @Test 531 public void testLogFilesAreArchived() throws Exception { 532 LOG.info("testLogFilesAreArchived"); 533 slm = new SplitLogManager(master, conf); 534 FileSystem fs = TEST_UTIL.getTestFileSystem(); 535 Path dir = TEST_UTIL.getDataTestDirOnTestFS("testLogFilesAreArchived"); 536 conf.set(HConstants.HBASE_DIR, dir.toString()); 537 String serverName = ServerName.valueOf("foo", 1, 1).toString(); 538 Path logDirPath = new Path(new Path(dir, HConstants.HREGION_LOGDIR_NAME), serverName); 539 fs.mkdirs(logDirPath); 540 // create an empty log file 541 String logFile = new Path(logDirPath, TEST_UTIL.getRandomUUID().toString()).toString(); 542 fs.create(new Path(logDirPath, logFile)).close(); 543 544 // spin up a thread mocking split done. 545 new Thread() { 546 @Override 547 public void run() { 548 boolean done = false; 549 while (!done) { 550 for (Map.Entry<String, Task> entry : slm.getTasks().entrySet()) { 551 final ServerName worker1 = ServerName.valueOf("worker1,1,1"); 552 SplitLogTask slt = new SplitLogTask.Done(worker1); 553 boolean encounteredZKException = false; 554 try { 555 ZKUtil.setData(zkw, entry.getKey(), slt.toByteArray()); 556 } catch (KeeperException e) { 557 LOG.warn(e.toString(), e); 558 encounteredZKException = true; 559 } 560 if (!encounteredZKException) { 561 done = true; 562 } 563 } 564 } 565 }; 566 }.start(); 567 568 slm.splitLogDistributed(logDirPath); 569 570 assertFalse(fs.exists(logDirPath)); 571 } 572}