001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master; 019 020import static org.apache.hadoop.hbase.SplitLogCounters.resetCounters; 021import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_heartbeat; 022import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_node_create_queued; 023import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_orphan_task_acquired; 024import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_rescan; 025import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_rescan_deleted; 026import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit; 027import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_dead_server_task; 028import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_failed; 029import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_force; 030import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_threshold_reached; 031import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_unassigned; 032import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_task_deleted; 033import static org.junit.jupiter.api.Assertions.assertEquals; 034import static org.junit.jupiter.api.Assertions.assertFalse; 035import static org.junit.jupiter.api.Assertions.assertTrue; 036 037import java.io.IOException; 038import java.util.Map; 039import java.util.concurrent.atomic.LongAdder; 040import org.apache.hadoop.conf.Configuration; 041import org.apache.hadoop.fs.FileSystem; 042import org.apache.hadoop.fs.Path; 043import org.apache.hadoop.hbase.CoordinatedStateManager; 044import org.apache.hadoop.hbase.HBaseTestingUtil; 045import org.apache.hadoop.hbase.HConstants; 046import org.apache.hadoop.hbase.ServerName; 047import org.apache.hadoop.hbase.SplitLogCounters; 048import org.apache.hadoop.hbase.SplitLogTask; 049import org.apache.hadoop.hbase.Waiter; 050import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination; 051import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager; 052import org.apache.hadoop.hbase.master.SplitLogManager.Task; 053import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch; 054import org.apache.hadoop.hbase.testclassification.LargeTests; 055import org.apache.hadoop.hbase.testclassification.MasterTests; 056import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 057import org.apache.hadoop.hbase.zookeeper.TestMasterAddressTracker.NodeCreationListener; 058import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; 059import org.apache.hadoop.hbase.zookeeper.ZKUtil; 060import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 061import org.apache.zookeeper.CreateMode; 062import org.apache.zookeeper.KeeperException; 063import org.apache.zookeeper.ZooDefs.Ids; 064import org.junit.jupiter.api.AfterEach; 065import org.junit.jupiter.api.BeforeEach; 066import org.junit.jupiter.api.Tag; 067import org.junit.jupiter.api.Test; 068import org.mockito.Mockito; 069import org.slf4j.Logger; 070import org.slf4j.LoggerFactory; 071 072@Tag(MasterTests.TAG) 073@Tag(LargeTests.TAG) 074public class TestSplitLogManager { 075 076 private static final Logger LOG = LoggerFactory.getLogger(TestSplitLogManager.class); 077 078 private final ServerManager sm = Mockito.mock(ServerManager.class); 079 080 private ZKWatcher zkw; 081 private DummyMasterServices master; 082 private SplitLogManager slm; 083 private Configuration conf; 084 private int to; 085 086 private static HBaseTestingUtil TEST_UTIL; 087 088 class DummyMasterServices extends MockNoopMasterServices { 089 private ZKWatcher zkw; 090 private CoordinatedStateManager cm; 091 092 public DummyMasterServices(ZKWatcher zkw, Configuration conf) { 093 super(conf); 094 this.zkw = zkw; 095 cm = new ZkCoordinatedStateManager(this); 096 } 097 098 @Override 099 public ZKWatcher getZooKeeper() { 100 return zkw; 101 } 102 103 @Override 104 public CoordinatedStateManager getCoordinatedStateManager() { 105 return cm; 106 } 107 108 @Override 109 public ServerManager getServerManager() { 110 return sm; 111 } 112 } 113 114 @BeforeEach 115 public void setup() throws Exception { 116 TEST_UTIL = new HBaseTestingUtil(); 117 TEST_UTIL.startMiniZKCluster(); 118 conf = TEST_UTIL.getConfiguration(); 119 // Use a different ZK wrapper instance for each tests. 120 zkw = 121 new ZKWatcher(conf, "split-log-manager-tests" + TEST_UTIL.getRandomUUID().toString(), null); 122 master = new DummyMasterServices(zkw, conf); 123 124 ZKUtil.deleteChildrenRecursively(zkw, zkw.getZNodePaths().baseZNode); 125 ZKUtil.createAndFailSilent(zkw, zkw.getZNodePaths().baseZNode); 126 assertTrue(ZKUtil.checkExists(zkw, zkw.getZNodePaths().baseZNode) != -1); 127 LOG.debug(zkw.getZNodePaths().baseZNode + " created"); 128 ZKUtil.createAndFailSilent(zkw, zkw.getZNodePaths().splitLogZNode); 129 assertTrue(ZKUtil.checkExists(zkw, zkw.getZNodePaths().splitLogZNode) != -1); 130 LOG.debug(zkw.getZNodePaths().splitLogZNode + " created"); 131 132 resetCounters(); 133 134 // By default, we let the test manage the error as before, so the server 135 // does not appear as dead from the master point of view, only from the split log pov. 136 Mockito.when(sm.isServerOnline(Mockito.any())).thenReturn(true); 137 138 to = 12000; 139 conf.setInt(HConstants.HBASE_SPLITLOG_MANAGER_TIMEOUT, to); 140 conf.setInt("hbase.splitlog.manager.unassigned.timeout", 2 * to); 141 142 conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100); 143 to = to + 16 * 100; 144 } 145 146 @AfterEach 147 public void teardown() throws IOException, KeeperException { 148 master.stop(""); 149 if (slm != null) { 150 slm.stop(); 151 } 152 TEST_UTIL.shutdownMiniZKCluster(); 153 } 154 155 @Test 156 public void testBatchWaitMillis() { 157 assertEquals(100, SplitLogManager.getBatchWaitTimeMillis(0)); 158 assertEquals(100, SplitLogManager.getBatchWaitTimeMillis(1)); 159 assertEquals(1000, SplitLogManager.getBatchWaitTimeMillis(10)); 160 assertEquals(60_000, SplitLogManager.getBatchWaitTimeMillis(101)); 161 assertEquals(60_000, SplitLogManager.getBatchWaitTimeMillis(1011)); 162 } 163 164 private interface Expr { 165 long eval(); 166 } 167 168 private void waitForCounter(final LongAdder ctr, long oldval, long newval, long timems) 169 throws Exception { 170 Expr e = new Expr() { 171 @Override 172 public long eval() { 173 return ctr.sum(); 174 } 175 }; 176 waitForCounter(e, oldval, newval, timems); 177 return; 178 } 179 180 private void waitForCounter(final Expr e, final long oldval, long newval, long timems) 181 throws Exception { 182 183 TEST_UTIL.waitFor(timems, 10, new Waiter.Predicate<Exception>() { 184 @Override 185 public boolean evaluate() throws Exception { 186 return (e.eval() != oldval); 187 } 188 }); 189 190 assertEquals(newval, e.eval()); 191 } 192 193 private Task findOrCreateOrphanTask(String path) { 194 return slm.tasks.computeIfAbsent(path, k -> { 195 LOG.info("creating orphan task " + k); 196 SplitLogCounters.tot_mgr_orphan_task_acquired.increment(); 197 return new Task(); 198 }); 199 } 200 201 private String submitTaskAndWait(TaskBatch batch, String name) 202 throws KeeperException, InterruptedException { 203 String tasknode = ZKSplitLog.getEncodedNodeName(zkw, name); 204 NodeCreationListener listener = new NodeCreationListener(zkw, tasknode); 205 zkw.registerListener(listener); 206 ZKUtil.watchAndCheckExists(zkw, tasknode); 207 208 slm.enqueueSplitTask(name, batch); 209 assertEquals(1, batch.installed); 210 assertTrue(findOrCreateOrphanTask(tasknode).batch == batch); 211 assertEquals(1L, tot_mgr_node_create_queued.sum()); 212 213 LOG.debug("waiting for task node creation"); 214 listener.waitForCreation(); 215 LOG.debug("task created"); 216 return tasknode; 217 } 218 219 /** 220 * Test whether the splitlog correctly creates a task in zookeeper 221 */ 222 @Test 223 public void testTaskCreation() throws Exception { 224 225 LOG.info("TestTaskCreation - test the creation of a task in zk"); 226 slm = new SplitLogManager(master, conf); 227 TaskBatch batch = new TaskBatch(); 228 229 String tasknode = submitTaskAndWait(batch, "foo/1"); 230 231 byte[] data = ZKUtil.getData(zkw, tasknode); 232 SplitLogTask slt = SplitLogTask.parseFrom(data); 233 LOG.info("Task node created " + slt.toString()); 234 assertTrue(slt.isUnassigned(master.getServerName())); 235 } 236 237 @Test 238 public void testOrphanTaskAcquisition() throws Exception { 239 LOG.info("TestOrphanTaskAcquisition"); 240 241 String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash"); 242 SplitLogTask slt = new SplitLogTask.Owned(master.getServerName()); 243 zkw.getRecoverableZooKeeper().create(tasknode, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE, 244 CreateMode.PERSISTENT); 245 246 slm = new SplitLogManager(master, conf); 247 waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to / 2); 248 Task task = findOrCreateOrphanTask(tasknode); 249 assertTrue(task.isOrphan()); 250 waitForCounter(tot_mgr_heartbeat, 0, 1, to / 2); 251 assertFalse(task.isUnassigned()); 252 long curt = EnvironmentEdgeManager.currentTime(); 253 assertTrue((task.last_update <= curt) && (task.last_update > (curt - 1000))); 254 LOG.info("waiting for manager to resubmit the orphan task"); 255 waitForCounter(tot_mgr_resubmit, 0, 1, to + to / 2); 256 assertTrue(task.isUnassigned()); 257 waitForCounter(tot_mgr_rescan, 0, 1, to + to / 2); 258 } 259 260 @Test 261 public void testUnassignedOrphan() throws Exception { 262 LOG.info("TestUnassignedOrphan - an unassigned task is resubmitted at" + " startup"); 263 String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash"); 264 // create an unassigned orphan task 265 SplitLogTask slt = new SplitLogTask.Unassigned(master.getServerName()); 266 zkw.getRecoverableZooKeeper().create(tasknode, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE, 267 CreateMode.PERSISTENT); 268 int version = ZKUtil.checkExists(zkw, tasknode); 269 270 slm = new SplitLogManager(master, conf); 271 waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to / 2); 272 Task task = findOrCreateOrphanTask(tasknode); 273 assertTrue(task.isOrphan()); 274 assertTrue(task.isUnassigned()); 275 // wait for RESCAN node to be created 276 waitForCounter(tot_mgr_rescan, 0, 1, to / 2); 277 Task task2 = findOrCreateOrphanTask(tasknode); 278 assertTrue(task == task2); 279 LOG.debug("task = " + task); 280 assertEquals(1L, tot_mgr_resubmit.sum()); 281 assertEquals(1, task.incarnation.get()); 282 assertEquals(0, task.unforcedResubmits.get()); 283 assertTrue(task.isOrphan()); 284 assertTrue(task.isUnassigned()); 285 assertTrue(ZKUtil.checkExists(zkw, tasknode) > version); 286 } 287 288 @Test 289 public void testMultipleResubmits() throws Exception { 290 LOG.info("TestMultipleResbmits - no indefinite resubmissions"); 291 conf.setInt("hbase.splitlog.max.resubmit", 2); 292 slm = new SplitLogManager(master, conf); 293 TaskBatch batch = new TaskBatch(); 294 295 String tasknode = submitTaskAndWait(batch, "foo/1"); 296 int version = ZKUtil.checkExists(zkw, tasknode); 297 final ServerName worker1 = ServerName.valueOf("worker1,1,1"); 298 final ServerName worker2 = ServerName.valueOf("worker2,1,1"); 299 final ServerName worker3 = ServerName.valueOf("worker3,1,1"); 300 SplitLogTask slt = new SplitLogTask.Owned(worker1); 301 ZKUtil.setData(zkw, tasknode, slt.toByteArray()); 302 waitForCounter(tot_mgr_heartbeat, 0, 1, to / 2); 303 waitForCounter(tot_mgr_resubmit, 0, 1, to + to / 2); 304 int version1 = ZKUtil.checkExists(zkw, tasknode); 305 assertTrue(version1 > version); 306 slt = new SplitLogTask.Owned(worker2); 307 ZKUtil.setData(zkw, tasknode, slt.toByteArray()); 308 waitForCounter(tot_mgr_heartbeat, 1, 2, to / 2); 309 waitForCounter(tot_mgr_resubmit, 1, 2, to + to / 2); 310 int version2 = ZKUtil.checkExists(zkw, tasknode); 311 assertTrue(version2 > version1); 312 slt = new SplitLogTask.Owned(worker3); 313 ZKUtil.setData(zkw, tasknode, slt.toByteArray()); 314 waitForCounter(tot_mgr_heartbeat, 2, 3, to / 2); 315 waitForCounter(tot_mgr_resubmit_threshold_reached, 0, 1, to + to / 2); 316 Thread.sleep(to + to / 2); 317 assertEquals(2L, tot_mgr_resubmit.sum() - tot_mgr_resubmit_force.sum()); 318 } 319 320 @Test 321 public void testRescanCleanup() throws Exception { 322 LOG.info("TestRescanCleanup - ensure RESCAN nodes are cleaned up"); 323 324 slm = new SplitLogManager(master, conf); 325 TaskBatch batch = new TaskBatch(); 326 327 String tasknode = submitTaskAndWait(batch, "foo/1"); 328 int version = ZKUtil.checkExists(zkw, tasknode); 329 final ServerName worker1 = ServerName.valueOf("worker1,1,1"); 330 SplitLogTask slt = new SplitLogTask.Owned(worker1); 331 ZKUtil.setData(zkw, tasknode, slt.toByteArray()); 332 waitForCounter(tot_mgr_heartbeat, 0, 1, to / 2); 333 waitForCounter(new Expr() { 334 @Override 335 public long eval() { 336 return (tot_mgr_resubmit.sum() + tot_mgr_resubmit_failed.sum()); 337 } 338 }, 0, 1, 5 * 60000); // wait long enough 339 assertEquals(tot_mgr_resubmit_failed.sum(), 0, "Could not run test. Lost ZK connection?"); 340 int version1 = ZKUtil.checkExists(zkw, tasknode); 341 assertTrue(version1 > version); 342 byte[] taskstate = ZKUtil.getData(zkw, tasknode); 343 slt = SplitLogTask.parseFrom(taskstate); 344 assertTrue(slt.isUnassigned(master.getServerName())); 345 346 waitForCounter(tot_mgr_rescan_deleted, 0, 1, to / 2); 347 } 348 349 @Test 350 public void testTaskDone() throws Exception { 351 LOG.info("TestTaskDone - cleanup task node once in DONE state"); 352 353 slm = new SplitLogManager(master, conf); 354 TaskBatch batch = new TaskBatch(); 355 String tasknode = submitTaskAndWait(batch, "foo/1"); 356 final ServerName worker1 = ServerName.valueOf("worker1,1,1"); 357 SplitLogTask slt = new SplitLogTask.Done(worker1); 358 ZKUtil.setData(zkw, tasknode, slt.toByteArray()); 359 synchronized (batch) { 360 while (batch.installed != batch.done) { 361 batch.wait(); 362 } 363 } 364 waitForCounter(tot_mgr_task_deleted, 0, 1, to / 2); 365 assertTrue(ZKUtil.checkExists(zkw, tasknode) == -1); 366 } 367 368 @Test 369 public void testTaskErr() throws Exception { 370 LOG.info("TestTaskErr - cleanup task node once in ERR state"); 371 372 conf.setInt("hbase.splitlog.max.resubmit", 0); 373 slm = new SplitLogManager(master, conf); 374 TaskBatch batch = new TaskBatch(); 375 376 String tasknode = submitTaskAndWait(batch, "foo/1"); 377 final ServerName worker1 = ServerName.valueOf("worker1,1,1"); 378 SplitLogTask slt = new SplitLogTask.Err(worker1); 379 ZKUtil.setData(zkw, tasknode, slt.toByteArray()); 380 381 synchronized (batch) { 382 while (batch.installed != batch.error) { 383 batch.wait(); 384 } 385 } 386 waitForCounter(tot_mgr_task_deleted, 0, 1, to / 2); 387 assertTrue(ZKUtil.checkExists(zkw, tasknode) == -1); 388 conf.setInt("hbase.splitlog.max.resubmit", ZKSplitLogManagerCoordination.DEFAULT_MAX_RESUBMIT); 389 } 390 391 @Test 392 public void testTaskResigned() throws Exception { 393 LOG.info("TestTaskResigned - resubmit task node once in RESIGNED state"); 394 assertEquals(0, tot_mgr_resubmit.sum()); 395 slm = new SplitLogManager(master, conf); 396 assertEquals(0, tot_mgr_resubmit.sum()); 397 TaskBatch batch = new TaskBatch(); 398 String tasknode = submitTaskAndWait(batch, "foo/1"); 399 assertEquals(0, tot_mgr_resubmit.sum()); 400 final ServerName worker1 = ServerName.valueOf("worker1,1,1"); 401 assertEquals(0, tot_mgr_resubmit.sum()); 402 SplitLogTask slt = new SplitLogTask.Resigned(worker1); 403 assertEquals(0, tot_mgr_resubmit.sum()); 404 ZKUtil.setData(zkw, tasknode, slt.toByteArray()); 405 ZKUtil.checkExists(zkw, tasknode); 406 // Could be small race here. 407 if (tot_mgr_resubmit.sum() == 0) { 408 waitForCounter(tot_mgr_resubmit, 0, 1, to / 2); 409 } 410 assertEquals(1, tot_mgr_resubmit.sum()); 411 412 byte[] taskstate = ZKUtil.getData(zkw, tasknode); 413 slt = SplitLogTask.parseFrom(taskstate); 414 assertTrue(slt.isUnassigned(master.getServerName())); 415 } 416 417 @Test 418 public void testUnassignedTimeout() throws Exception { 419 LOG.info("TestUnassignedTimeout - iff all tasks are unassigned then" + " resubmit"); 420 421 // create an orphan task in OWNED state 422 String tasknode1 = ZKSplitLog.getEncodedNodeName(zkw, "orphan/1"); 423 final ServerName worker1 = ServerName.valueOf("worker1,1,1"); 424 SplitLogTask slt = new SplitLogTask.Owned(worker1); 425 zkw.getRecoverableZooKeeper().create(tasknode1, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE, 426 CreateMode.PERSISTENT); 427 428 slm = new SplitLogManager(master, conf); 429 waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to / 2); 430 431 // submit another task which will stay in unassigned mode 432 TaskBatch batch = new TaskBatch(); 433 submitTaskAndWait(batch, "foo/1"); 434 435 // keep updating the orphan owned node every to/2 seconds 436 for (int i = 0; i < (3 * to) / 100; i++) { 437 Thread.sleep(100); 438 final ServerName worker2 = ServerName.valueOf("worker1,1,1"); 439 slt = new SplitLogTask.Owned(worker2); 440 ZKUtil.setData(zkw, tasknode1, slt.toByteArray()); 441 } 442 443 // since we have stopped heartbeating the owned node therefore it should 444 // get resubmitted 445 LOG.info("waiting for manager to resubmit the orphan task"); 446 waitForCounter(tot_mgr_resubmit, 0, 1, to + to / 2); 447 448 // now all the nodes are unassigned. manager should post another rescan 449 waitForCounter(tot_mgr_resubmit_unassigned, 0, 1, 2 * to + to / 2); 450 } 451 452 @Test 453 public void testDeadWorker() throws Exception { 454 LOG.info("testDeadWorker"); 455 456 conf.setLong("hbase.splitlog.max.resubmit", 0); 457 slm = new SplitLogManager(master, conf); 458 TaskBatch batch = new TaskBatch(); 459 460 String tasknode = submitTaskAndWait(batch, "foo/1"); 461 int version = ZKUtil.checkExists(zkw, tasknode); 462 final ServerName worker1 = ServerName.valueOf("worker1,1,1"); 463 SplitLogTask slt = new SplitLogTask.Owned(worker1); 464 ZKUtil.setData(zkw, tasknode, slt.toByteArray()); 465 if (tot_mgr_heartbeat.sum() == 0) { 466 waitForCounter(tot_mgr_heartbeat, 0, 1, to / 2); 467 } 468 slm.handleDeadWorker(worker1); 469 if (tot_mgr_resubmit.sum() == 0) { 470 waitForCounter(tot_mgr_resubmit, 0, 1, to + to / 2); 471 } 472 if (tot_mgr_resubmit_dead_server_task.sum() == 0) { 473 waitForCounter(tot_mgr_resubmit_dead_server_task, 0, 1, to + to / 2); 474 } 475 476 int version1 = ZKUtil.checkExists(zkw, tasknode); 477 assertTrue(version1 > version); 478 byte[] taskstate = ZKUtil.getData(zkw, tasknode); 479 slt = SplitLogTask.parseFrom(taskstate); 480 assertTrue(slt.isUnassigned(master.getServerName())); 481 return; 482 } 483 484 @Test 485 public void testWorkerCrash() throws Exception { 486 slm = new SplitLogManager(master, conf); 487 TaskBatch batch = new TaskBatch(); 488 489 String tasknode = submitTaskAndWait(batch, "foo/1"); 490 final ServerName worker1 = ServerName.valueOf("worker1,1,1"); 491 492 SplitLogTask slt = new SplitLogTask.Owned(worker1); 493 ZKUtil.setData(zkw, tasknode, slt.toByteArray()); 494 if (tot_mgr_heartbeat.sum() == 0) { 495 waitForCounter(tot_mgr_heartbeat, 0, 1, to / 2); 496 } 497 498 // Not yet resubmitted. 499 assertEquals(0, tot_mgr_resubmit.sum()); 500 501 // This server becomes dead 502 Mockito.when(sm.isServerOnline(worker1)).thenReturn(false); 503 504 Thread.sleep(1300); // The timeout checker is done every 1000 ms (hardcoded). 505 506 // It has been resubmitted 507 assertEquals(1, tot_mgr_resubmit.sum()); 508 } 509 510 @Test 511 public void testEmptyLogDir() throws Exception { 512 LOG.info("testEmptyLogDir"); 513 slm = new SplitLogManager(master, conf); 514 FileSystem fs = TEST_UTIL.getTestFileSystem(); 515 Path emptyLogDirPath = 516 new Path(new Path(fs.getWorkingDirectory(), HConstants.HREGION_LOGDIR_NAME), 517 ServerName.valueOf("emptyLogDir", 1, 1).toString()); 518 fs.mkdirs(emptyLogDirPath); 519 slm.splitLogDistributed(emptyLogDirPath); 520 assertFalse(fs.exists(emptyLogDirPath)); 521 } 522 523 @Test 524 public void testLogFilesAreArchived() throws Exception { 525 LOG.info("testLogFilesAreArchived"); 526 slm = new SplitLogManager(master, conf); 527 FileSystem fs = TEST_UTIL.getTestFileSystem(); 528 Path dir = TEST_UTIL.getDataTestDirOnTestFS("testLogFilesAreArchived"); 529 conf.set(HConstants.HBASE_DIR, dir.toString()); 530 String serverName = ServerName.valueOf("foo", 1, 1).toString(); 531 Path logDirPath = new Path(new Path(dir, HConstants.HREGION_LOGDIR_NAME), serverName); 532 fs.mkdirs(logDirPath); 533 // create an empty log file 534 String logFile = new Path(logDirPath, TEST_UTIL.getRandomUUID().toString()).toString(); 535 fs.create(new Path(logDirPath, logFile)).close(); 536 537 // spin up a thread mocking split done. 538 new Thread() { 539 @Override 540 public void run() { 541 boolean done = false; 542 while (!done) { 543 for (Map.Entry<String, Task> entry : slm.getTasks().entrySet()) { 544 final ServerName worker1 = ServerName.valueOf("worker1,1,1"); 545 SplitLogTask slt = new SplitLogTask.Done(worker1); 546 boolean encounteredZKException = false; 547 try { 548 ZKUtil.setData(zkw, entry.getKey(), slt.toByteArray()); 549 } catch (KeeperException e) { 550 LOG.warn(e.toString(), e); 551 encounteredZKException = true; 552 } 553 if (!encounteredZKException) { 554 done = true; 555 } 556 } 557 } 558 } 559 }.start(); 560 561 slm.splitLogDistributed(logDirPath); 562 563 assertFalse(fs.exists(logDirPath)); 564 } 565}