001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master; 019 020import static org.apache.hadoop.hbase.SplitLogCounters.resetCounters; 021import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_heartbeat; 022import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_node_create_queued; 023import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_orphan_task_acquired; 024import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_rescan; 025import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_rescan_deleted; 026import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit; 027import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_dead_server_task; 028import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_failed; 029import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_force; 030import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_threshold_reached; 031import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_unassigned; 032import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_task_deleted; 033import static org.junit.Assert.assertEquals; 034import static org.junit.Assert.assertFalse; 035import static org.junit.Assert.assertTrue; 036import java.io.IOException; 037import java.util.Map; 038import java.util.concurrent.atomic.LongAdder; 039import org.apache.hadoop.conf.Configuration; 040import org.apache.hadoop.fs.FileSystem; 041import org.apache.hadoop.fs.Path; 042import org.apache.hadoop.hbase.CoordinatedStateManager; 043import org.apache.hadoop.hbase.HBaseClassTestRule; 044import org.apache.hadoop.hbase.HBaseTestingUtility; 045import org.apache.hadoop.hbase.HConstants; 046import org.apache.hadoop.hbase.ServerName; 047import org.apache.hadoop.hbase.SplitLogCounters; 048import org.apache.hadoop.hbase.SplitLogTask; 049import org.apache.hadoop.hbase.Waiter; 050import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination; 051import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager; 052import org.apache.hadoop.hbase.master.SplitLogManager.Task; 053import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch; 054import org.apache.hadoop.hbase.regionserver.TestMasterAddressTracker.NodeCreationListener; 055import org.apache.hadoop.hbase.testclassification.LargeTests; 056import org.apache.hadoop.hbase.testclassification.MasterTests; 057import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; 058import org.apache.hadoop.hbase.zookeeper.ZKUtil; 059import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 060import org.apache.zookeeper.CreateMode; 061import org.apache.zookeeper.KeeperException; 062import org.apache.zookeeper.ZooDefs.Ids; 063import org.junit.After; 064import org.junit.Assert; 065import org.junit.Before; 066import org.junit.ClassRule; 067import org.junit.Test; 068import org.junit.experimental.categories.Category; 069import org.mockito.Mockito; 070import org.slf4j.Logger; 071import org.slf4j.LoggerFactory; 072 073@Category({MasterTests.class, LargeTests.class}) 074public class TestSplitLogManager { 075 076 @ClassRule 077 public static final HBaseClassTestRule CLASS_RULE = 078 HBaseClassTestRule.forClass(TestSplitLogManager.class); 079 080 private static final Logger LOG = LoggerFactory.getLogger(TestSplitLogManager.class); 081 082 private final ServerManager sm = Mockito.mock(ServerManager.class); 083 084 private ZKWatcher zkw; 085 private DummyMasterServices master; 086 private SplitLogManager slm; 087 private Configuration conf; 088 private int to; 089 090 private static HBaseTestingUtility TEST_UTIL; 091 092 class DummyMasterServices extends MockNoopMasterServices { 093 private ZKWatcher zkw; 094 private CoordinatedStateManager cm; 095 096 public DummyMasterServices(ZKWatcher zkw, Configuration conf) { 097 super(conf); 098 this.zkw = zkw; 099 cm = new ZkCoordinatedStateManager(this); 100 } 101 102 @Override 103 public ZKWatcher getZooKeeper() { 104 return zkw; 105 } 106 107 @Override 108 public CoordinatedStateManager getCoordinatedStateManager() { 109 return cm; 110 } 111 112 @Override 113 public ServerManager getServerManager() { 114 return sm; 115 } 116 } 117 118 @Before 119 public void setup() throws Exception { 120 TEST_UTIL = new HBaseTestingUtility(); 121 TEST_UTIL.startMiniZKCluster(); 122 conf = TEST_UTIL.getConfiguration(); 123 // Use a different ZK wrapper instance for each tests. 124 zkw = 125 new ZKWatcher(conf, "split-log-manager-tests" + TEST_UTIL.getRandomUUID().toString(), null); 126 master = new DummyMasterServices(zkw, conf); 127 128 ZKUtil.deleteChildrenRecursively(zkw, zkw.getZNodePaths().baseZNode); 129 ZKUtil.createAndFailSilent(zkw, zkw.getZNodePaths().baseZNode); 130 assertTrue(ZKUtil.checkExists(zkw, zkw.getZNodePaths().baseZNode) != -1); 131 LOG.debug(zkw.getZNodePaths().baseZNode + " created"); 132 ZKUtil.createAndFailSilent(zkw, zkw.getZNodePaths().splitLogZNode); 133 assertTrue(ZKUtil.checkExists(zkw, zkw.getZNodePaths().splitLogZNode) != -1); 134 LOG.debug(zkw.getZNodePaths().splitLogZNode + " created"); 135 136 resetCounters(); 137 138 // By default, we let the test manage the error as before, so the server 139 // does not appear as dead from the master point of view, only from the split log pov. 140 Mockito.when(sm.isServerOnline(Mockito.any())).thenReturn(true); 141 142 to = 12000; 143 conf.setInt(HConstants.HBASE_SPLITLOG_MANAGER_TIMEOUT, to); 144 conf.setInt("hbase.splitlog.manager.unassigned.timeout", 2 * to); 145 146 conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100); 147 to = to + 16 * 100; 148 } 149 150 @After 151 public void teardown() throws IOException, KeeperException { 152 master.stop(""); 153 if (slm != null) { 154 slm.stop(); 155 } 156 TEST_UTIL.shutdownMiniZKCluster(); 157 } 158 159 @Test 160 public void testBatchWaitMillis() { 161 assertEquals(100, SplitLogManager.getBatchWaitTimeMillis(0)); 162 assertEquals(100, SplitLogManager.getBatchWaitTimeMillis(1)); 163 assertEquals(1000, SplitLogManager.getBatchWaitTimeMillis(10)); 164 assertEquals(60_000, SplitLogManager.getBatchWaitTimeMillis(101)); 165 assertEquals(60_000, SplitLogManager.getBatchWaitTimeMillis(1011)); 166 } 167 168 private interface Expr { 169 long eval(); 170 } 171 172 private void waitForCounter(final LongAdder ctr, long oldval, long newval, long timems) 173 throws Exception { 174 Expr e = new Expr() { 175 @Override 176 public long eval() { 177 return ctr.sum(); 178 } 179 }; 180 waitForCounter(e, oldval, newval, timems); 181 return; 182 } 183 184 private void waitForCounter(final Expr e, final long oldval, long newval, long timems) 185 throws Exception { 186 187 TEST_UTIL.waitFor(timems, 10, new Waiter.Predicate<Exception>() { 188 @Override 189 public boolean evaluate() throws Exception { 190 return (e.eval() != oldval); 191 } 192 }); 193 194 assertEquals(newval, e.eval()); 195 } 196 197 private Task findOrCreateOrphanTask(String path) { 198 return slm.tasks.computeIfAbsent(path, k -> { 199 LOG.info("creating orphan task " + k); 200 SplitLogCounters.tot_mgr_orphan_task_acquired.increment(); 201 return new Task(); 202 }); 203 } 204 205 private String submitTaskAndWait(TaskBatch batch, String name) throws KeeperException, 206 InterruptedException { 207 String tasknode = ZKSplitLog.getEncodedNodeName(zkw, name); 208 NodeCreationListener listener = new NodeCreationListener(zkw, tasknode); 209 zkw.registerListener(listener); 210 ZKUtil.watchAndCheckExists(zkw, tasknode); 211 212 slm.enqueueSplitTask(name, batch); 213 assertEquals(1, batch.installed); 214 assertTrue(findOrCreateOrphanTask(tasknode).batch == batch); 215 assertEquals(1L, tot_mgr_node_create_queued.sum()); 216 217 LOG.debug("waiting for task node creation"); 218 listener.waitForCreation(); 219 LOG.debug("task created"); 220 return tasknode; 221 } 222 223 /** 224 * Test whether the splitlog correctly creates a task in zookeeper 225 */ 226 @Test 227 public void testTaskCreation() throws Exception { 228 229 LOG.info("TestTaskCreation - test the creation of a task in zk"); 230 slm = new SplitLogManager(master, conf); 231 TaskBatch batch = new TaskBatch(); 232 233 String tasknode = submitTaskAndWait(batch, "foo/1"); 234 235 byte[] data = ZKUtil.getData(zkw, tasknode); 236 SplitLogTask slt = SplitLogTask.parseFrom(data); 237 LOG.info("Task node created " + slt.toString()); 238 assertTrue(slt.isUnassigned(master.getServerName())); 239 } 240 241 @Test 242 public void testOrphanTaskAcquisition() throws Exception { 243 LOG.info("TestOrphanTaskAcquisition"); 244 245 String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash"); 246 SplitLogTask slt = new SplitLogTask.Owned(master.getServerName()); 247 zkw.getRecoverableZooKeeper().create(tasknode, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE, 248 CreateMode.PERSISTENT); 249 250 slm = new SplitLogManager(master, conf); 251 waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2); 252 Task task = findOrCreateOrphanTask(tasknode); 253 assertTrue(task.isOrphan()); 254 waitForCounter(tot_mgr_heartbeat, 0, 1, to/2); 255 assertFalse(task.isUnassigned()); 256 long curt = System.currentTimeMillis(); 257 assertTrue((task.last_update <= curt) && 258 (task.last_update > (curt - 1000))); 259 LOG.info("waiting for manager to resubmit the orphan task"); 260 waitForCounter(tot_mgr_resubmit, 0, 1, to + to/2); 261 assertTrue(task.isUnassigned()); 262 waitForCounter(tot_mgr_rescan, 0, 1, to + to/2); 263 } 264 265 @Test 266 public void testUnassignedOrphan() throws Exception { 267 LOG.info("TestUnassignedOrphan - an unassigned task is resubmitted at" + 268 " startup"); 269 String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash"); 270 //create an unassigned orphan task 271 SplitLogTask slt = new SplitLogTask.Unassigned(master.getServerName()); 272 zkw.getRecoverableZooKeeper().create(tasknode, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE, 273 CreateMode.PERSISTENT); 274 int version = ZKUtil.checkExists(zkw, tasknode); 275 276 slm = new SplitLogManager(master, conf); 277 waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2); 278 Task task = findOrCreateOrphanTask(tasknode); 279 assertTrue(task.isOrphan()); 280 assertTrue(task.isUnassigned()); 281 // wait for RESCAN node to be created 282 waitForCounter(tot_mgr_rescan, 0, 1, to / 2); 283 Task task2 = findOrCreateOrphanTask(tasknode); 284 assertTrue(task == task2); 285 LOG.debug("task = " + task); 286 assertEquals(1L, tot_mgr_resubmit.sum()); 287 assertEquals(1, task.incarnation.get()); 288 assertEquals(0, task.unforcedResubmits.get()); 289 assertTrue(task.isOrphan()); 290 assertTrue(task.isUnassigned()); 291 assertTrue(ZKUtil.checkExists(zkw, tasknode) > version); 292 } 293 294 @Test 295 public void testMultipleResubmits() throws Exception { 296 LOG.info("TestMultipleResbmits - no indefinite resubmissions"); 297 conf.setInt("hbase.splitlog.max.resubmit", 2); 298 slm = new SplitLogManager(master, conf); 299 TaskBatch batch = new TaskBatch(); 300 301 String tasknode = submitTaskAndWait(batch, "foo/1"); 302 int version = ZKUtil.checkExists(zkw, tasknode); 303 final ServerName worker1 = ServerName.valueOf("worker1,1,1"); 304 final ServerName worker2 = ServerName.valueOf("worker2,1,1"); 305 final ServerName worker3 = ServerName.valueOf("worker3,1,1"); 306 SplitLogTask slt = new SplitLogTask.Owned(worker1); 307 ZKUtil.setData(zkw, tasknode, slt.toByteArray()); 308 waitForCounter(tot_mgr_heartbeat, 0, 1, to/2); 309 waitForCounter(tot_mgr_resubmit, 0, 1, to + to/2); 310 int version1 = ZKUtil.checkExists(zkw, tasknode); 311 assertTrue(version1 > version); 312 slt = new SplitLogTask.Owned(worker2); 313 ZKUtil.setData(zkw, tasknode, slt.toByteArray()); 314 waitForCounter(tot_mgr_heartbeat, 1, 2, to/2); 315 waitForCounter(tot_mgr_resubmit, 1, 2, to + to/2); 316 int version2 = ZKUtil.checkExists(zkw, tasknode); 317 assertTrue(version2 > version1); 318 slt = new SplitLogTask.Owned(worker3); 319 ZKUtil.setData(zkw, tasknode, slt.toByteArray()); 320 waitForCounter(tot_mgr_heartbeat, 2, 3, to/2); 321 waitForCounter(tot_mgr_resubmit_threshold_reached, 0, 1, to + to/2); 322 Thread.sleep(to + to/2); 323 assertEquals(2L, tot_mgr_resubmit.sum() - tot_mgr_resubmit_force.sum()); 324 } 325 326 @Test 327 public void testRescanCleanup() throws Exception { 328 LOG.info("TestRescanCleanup - ensure RESCAN nodes are cleaned up"); 329 330 slm = new SplitLogManager(master, conf); 331 TaskBatch batch = new TaskBatch(); 332 333 String tasknode = submitTaskAndWait(batch, "foo/1"); 334 int version = ZKUtil.checkExists(zkw, tasknode); 335 final ServerName worker1 = ServerName.valueOf("worker1,1,1"); 336 SplitLogTask slt = new SplitLogTask.Owned(worker1); 337 ZKUtil.setData(zkw, tasknode, slt.toByteArray()); 338 waitForCounter(tot_mgr_heartbeat, 0, 1, to/2); 339 waitForCounter(new Expr() { 340 @Override 341 public long eval() { 342 return (tot_mgr_resubmit.sum() + tot_mgr_resubmit_failed.sum()); 343 } 344 }, 0, 1, 5*60000); // wait long enough 345 Assert.assertEquals("Could not run test. Lost ZK connection?", 346 0, tot_mgr_resubmit_failed.sum()); 347 int version1 = ZKUtil.checkExists(zkw, tasknode); 348 assertTrue(version1 > version); 349 byte[] taskstate = ZKUtil.getData(zkw, tasknode); 350 slt = SplitLogTask.parseFrom(taskstate); 351 assertTrue(slt.isUnassigned(master.getServerName())); 352 353 waitForCounter(tot_mgr_rescan_deleted, 0, 1, to/2); 354 } 355 356 @Test 357 public void testTaskDone() throws Exception { 358 LOG.info("TestTaskDone - cleanup task node once in DONE state"); 359 360 slm = new SplitLogManager(master, conf); 361 TaskBatch batch = new TaskBatch(); 362 String tasknode = submitTaskAndWait(batch, "foo/1"); 363 final ServerName worker1 = ServerName.valueOf("worker1,1,1"); 364 SplitLogTask slt = new SplitLogTask.Done(worker1); 365 ZKUtil.setData(zkw, tasknode, slt.toByteArray()); 366 synchronized (batch) { 367 while (batch.installed != batch.done) { 368 batch.wait(); 369 } 370 } 371 waitForCounter(tot_mgr_task_deleted, 0, 1, to/2); 372 assertTrue(ZKUtil.checkExists(zkw, tasknode) == -1); 373 } 374 375 @Test 376 public void testTaskErr() throws Exception { 377 LOG.info("TestTaskErr - cleanup task node once in ERR state"); 378 379 conf.setInt("hbase.splitlog.max.resubmit", 0); 380 slm = new SplitLogManager(master, conf); 381 TaskBatch batch = new TaskBatch(); 382 383 String tasknode = submitTaskAndWait(batch, "foo/1"); 384 final ServerName worker1 = ServerName.valueOf("worker1,1,1"); 385 SplitLogTask slt = new SplitLogTask.Err(worker1); 386 ZKUtil.setData(zkw, tasknode, slt.toByteArray()); 387 388 synchronized (batch) { 389 while (batch.installed != batch.error) { 390 batch.wait(); 391 } 392 } 393 waitForCounter(tot_mgr_task_deleted, 0, 1, to/2); 394 assertTrue(ZKUtil.checkExists(zkw, tasknode) == -1); 395 conf.setInt("hbase.splitlog.max.resubmit", ZKSplitLogManagerCoordination.DEFAULT_MAX_RESUBMIT); 396 } 397 398 @Test 399 public void testTaskResigned() throws Exception { 400 LOG.info("TestTaskResigned - resubmit task node once in RESIGNED state"); 401 assertEquals(0, tot_mgr_resubmit.sum()); 402 slm = new SplitLogManager(master, conf); 403 assertEquals(0, tot_mgr_resubmit.sum()); 404 TaskBatch batch = new TaskBatch(); 405 String tasknode = submitTaskAndWait(batch, "foo/1"); 406 assertEquals(0, tot_mgr_resubmit.sum()); 407 final ServerName worker1 = ServerName.valueOf("worker1,1,1"); 408 assertEquals(0, tot_mgr_resubmit.sum()); 409 SplitLogTask slt = new SplitLogTask.Resigned(worker1); 410 assertEquals(0, tot_mgr_resubmit.sum()); 411 ZKUtil.setData(zkw, tasknode, slt.toByteArray()); 412 ZKUtil.checkExists(zkw, tasknode); 413 // Could be small race here. 414 if (tot_mgr_resubmit.sum() == 0) { 415 waitForCounter(tot_mgr_resubmit, 0, 1, to/2); 416 } 417 assertEquals(1, tot_mgr_resubmit.sum()); 418 419 byte[] taskstate = ZKUtil.getData(zkw, tasknode); 420 slt = SplitLogTask.parseFrom(taskstate); 421 assertTrue(slt.isUnassigned(master.getServerName())); 422 } 423 424 @Test 425 public void testUnassignedTimeout() throws Exception { 426 LOG.info("TestUnassignedTimeout - iff all tasks are unassigned then" + 427 " resubmit"); 428 429 // create an orphan task in OWNED state 430 String tasknode1 = ZKSplitLog.getEncodedNodeName(zkw, "orphan/1"); 431 final ServerName worker1 = ServerName.valueOf("worker1,1,1"); 432 SplitLogTask slt = new SplitLogTask.Owned(worker1); 433 zkw.getRecoverableZooKeeper().create(tasknode1, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE, 434 CreateMode.PERSISTENT); 435 436 slm = new SplitLogManager(master, conf); 437 waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2); 438 439 // submit another task which will stay in unassigned mode 440 TaskBatch batch = new TaskBatch(); 441 submitTaskAndWait(batch, "foo/1"); 442 443 // keep updating the orphan owned node every to/2 seconds 444 for (int i = 0; i < (3 * to)/100; i++) { 445 Thread.sleep(100); 446 final ServerName worker2 = ServerName.valueOf("worker1,1,1"); 447 slt = new SplitLogTask.Owned(worker2); 448 ZKUtil.setData(zkw, tasknode1, slt.toByteArray()); 449 } 450 451 // since we have stopped heartbeating the owned node therefore it should 452 // get resubmitted 453 LOG.info("waiting for manager to resubmit the orphan task"); 454 waitForCounter(tot_mgr_resubmit, 0, 1, to + to/2); 455 456 // now all the nodes are unassigned. manager should post another rescan 457 waitForCounter(tot_mgr_resubmit_unassigned, 0, 1, 2 * to + to/2); 458 } 459 460 @Test 461 public void testDeadWorker() throws Exception { 462 LOG.info("testDeadWorker"); 463 464 conf.setLong("hbase.splitlog.max.resubmit", 0); 465 slm = new SplitLogManager(master, conf); 466 TaskBatch batch = new TaskBatch(); 467 468 String tasknode = submitTaskAndWait(batch, "foo/1"); 469 int version = ZKUtil.checkExists(zkw, tasknode); 470 final ServerName worker1 = ServerName.valueOf("worker1,1,1"); 471 SplitLogTask slt = new SplitLogTask.Owned(worker1); 472 ZKUtil.setData(zkw, tasknode, slt.toByteArray()); 473 if (tot_mgr_heartbeat.sum() == 0) { 474 waitForCounter(tot_mgr_heartbeat, 0, 1, to/2); 475 } 476 slm.handleDeadWorker(worker1); 477 if (tot_mgr_resubmit.sum() == 0) { 478 waitForCounter(tot_mgr_resubmit, 0, 1, to+to/2); 479 } 480 if (tot_mgr_resubmit_dead_server_task.sum() == 0) { 481 waitForCounter(tot_mgr_resubmit_dead_server_task, 0, 1, to + to/2); 482 } 483 484 int version1 = ZKUtil.checkExists(zkw, tasknode); 485 assertTrue(version1 > version); 486 byte[] taskstate = ZKUtil.getData(zkw, tasknode); 487 slt = SplitLogTask.parseFrom(taskstate); 488 assertTrue(slt.isUnassigned(master.getServerName())); 489 return; 490 } 491 492 @Test 493 public void testWorkerCrash() throws Exception { 494 slm = new SplitLogManager(master, conf); 495 TaskBatch batch = new TaskBatch(); 496 497 String tasknode = submitTaskAndWait(batch, "foo/1"); 498 final ServerName worker1 = ServerName.valueOf("worker1,1,1"); 499 500 SplitLogTask slt = new SplitLogTask.Owned(worker1); 501 ZKUtil.setData(zkw, tasknode, slt.toByteArray()); 502 if (tot_mgr_heartbeat.sum() == 0) { 503 waitForCounter(tot_mgr_heartbeat, 0, 1, to/2); 504 } 505 506 // Not yet resubmitted. 507 Assert.assertEquals(0, tot_mgr_resubmit.sum()); 508 509 // This server becomes dead 510 Mockito.when(sm.isServerOnline(worker1)).thenReturn(false); 511 512 Thread.sleep(1300); // The timeout checker is done every 1000 ms (hardcoded). 513 514 // It has been resubmitted 515 Assert.assertEquals(1, tot_mgr_resubmit.sum()); 516 } 517 518 @Test 519 public void testEmptyLogDir() throws Exception { 520 LOG.info("testEmptyLogDir"); 521 slm = new SplitLogManager(master, conf); 522 FileSystem fs = TEST_UTIL.getTestFileSystem(); 523 Path emptyLogDirPath = new Path(new Path(fs.getWorkingDirectory(), 524 HConstants.HREGION_LOGDIR_NAME), 525 ServerName.valueOf("emptyLogDir", 1, 1).toString()); 526 fs.mkdirs(emptyLogDirPath); 527 slm.splitLogDistributed(emptyLogDirPath); 528 assertFalse(fs.exists(emptyLogDirPath)); 529 } 530 531 @Test 532 public void testLogFilesAreArchived() throws Exception { 533 LOG.info("testLogFilesAreArchived"); 534 slm = new SplitLogManager(master, conf); 535 FileSystem fs = TEST_UTIL.getTestFileSystem(); 536 Path dir = TEST_UTIL.getDataTestDirOnTestFS("testLogFilesAreArchived"); 537 conf.set(HConstants.HBASE_DIR, dir.toString()); 538 String serverName = ServerName.valueOf("foo", 1, 1).toString(); 539 Path logDirPath = new Path(new Path(dir, HConstants.HREGION_LOGDIR_NAME), serverName); 540 fs.mkdirs(logDirPath); 541 // create an empty log file 542 String logFile = new Path(logDirPath, TEST_UTIL.getRandomUUID().toString()).toString(); 543 fs.create(new Path(logDirPath, logFile)).close(); 544 545 // spin up a thread mocking split done. 546 new Thread() { 547 @Override 548 public void run() { 549 boolean done = false; 550 while (!done) { 551 for (Map.Entry<String, Task> entry : slm.getTasks().entrySet()) { 552 final ServerName worker1 = ServerName.valueOf("worker1,1,1"); 553 SplitLogTask slt = new SplitLogTask.Done(worker1); 554 boolean encounteredZKException = false; 555 try { 556 ZKUtil.setData(zkw, entry.getKey(), slt.toByteArray()); 557 } catch (KeeperException e) { 558 LOG.warn(e.toString(), e); 559 encounteredZKException = true; 560 } 561 if (!encounteredZKException) { 562 done = true; 563 } 564 } 565 } 566 }; 567 }.start(); 568 569 slm.splitLogDistributed(logDirPath); 570 571 assertFalse(fs.exists(logDirPath)); 572 } 573}