001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master; 019 020import static org.apache.hadoop.hbase.SplitLogCounters.resetCounters; 021import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_heartbeat; 022import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_node_create_queued; 023import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_orphan_task_acquired; 024import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_rescan; 025import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_rescan_deleted; 026import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit; 027import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_dead_server_task; 028import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_failed; 029import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_force; 030import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_threshold_reached; 031import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_unassigned; 032import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_task_deleted; 033import static org.junit.Assert.assertEquals; 034import static org.junit.Assert.assertFalse; 035import static org.junit.Assert.assertTrue; 036 037import java.io.IOException; 038import java.util.Map; 039import java.util.UUID; 040import java.util.concurrent.atomic.LongAdder; 041import org.apache.hadoop.conf.Configuration; 042import org.apache.hadoop.fs.FileSystem; 043import org.apache.hadoop.fs.Path; 044import org.apache.hadoop.hbase.CoordinatedStateManager; 045import org.apache.hadoop.hbase.HBaseClassTestRule; 046import org.apache.hadoop.hbase.HBaseTestingUtility; 047import org.apache.hadoop.hbase.HConstants; 048import org.apache.hadoop.hbase.ServerName; 049import org.apache.hadoop.hbase.SplitLogCounters; 050import org.apache.hadoop.hbase.SplitLogTask; 051import org.apache.hadoop.hbase.Waiter; 052import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination; 053import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager; 054import org.apache.hadoop.hbase.master.SplitLogManager.Task; 055import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch; 056import org.apache.hadoop.hbase.regionserver.TestMasterAddressTracker.NodeCreationListener; 057import org.apache.hadoop.hbase.testclassification.MasterTests; 058import org.apache.hadoop.hbase.testclassification.MediumTests; 059import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; 060import org.apache.hadoop.hbase.zookeeper.ZKUtil; 061import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 062import org.apache.zookeeper.CreateMode; 063import org.apache.zookeeper.KeeperException; 064import org.apache.zookeeper.ZooDefs.Ids; 065import org.junit.After; 066import org.junit.Assert; 067import org.junit.Before; 068import org.junit.ClassRule; 069import org.junit.Test; 070import org.junit.experimental.categories.Category; 071import org.mockito.Mockito; 072import org.slf4j.Logger; 073import org.slf4j.LoggerFactory; 074 075@Category({MasterTests.class, MediumTests.class}) 076public class TestSplitLogManager { 077 078 @ClassRule 079 public static final HBaseClassTestRule CLASS_RULE = 080 HBaseClassTestRule.forClass(TestSplitLogManager.class); 081 082 private static final Logger LOG = LoggerFactory.getLogger(TestSplitLogManager.class); 083 084 private final ServerManager sm = Mockito.mock(ServerManager.class); 085 086 private ZKWatcher zkw; 087 private DummyMasterServices master; 088 private SplitLogManager slm; 089 private Configuration conf; 090 private int to; 091 092 private static HBaseTestingUtility TEST_UTIL; 093 094 class DummyMasterServices extends MockNoopMasterServices { 095 private ZKWatcher zkw; 096 private CoordinatedStateManager cm; 097 098 public DummyMasterServices(ZKWatcher zkw, Configuration conf) { 099 super(conf); 100 this.zkw = zkw; 101 cm = new ZkCoordinatedStateManager(this); 102 } 103 104 @Override 105 public ZKWatcher getZooKeeper() { 106 return zkw; 107 } 108 109 @Override 110 public CoordinatedStateManager getCoordinatedStateManager() { 111 return cm; 112 } 113 114 @Override 115 public ServerManager getServerManager() { 116 return sm; 117 } 118 } 119 120 @Before 121 public void setup() throws Exception { 122 TEST_UTIL = new HBaseTestingUtility(); 123 TEST_UTIL.startMiniZKCluster(); 124 conf = TEST_UTIL.getConfiguration(); 125 // Use a different ZK wrapper instance for each tests. 126 zkw = 127 new ZKWatcher(conf, "split-log-manager-tests" + UUID.randomUUID().toString(), null); 128 master = new DummyMasterServices(zkw, conf); 129 130 ZKUtil.deleteChildrenRecursively(zkw, zkw.znodePaths.baseZNode); 131 ZKUtil.createAndFailSilent(zkw, zkw.znodePaths.baseZNode); 132 assertTrue(ZKUtil.checkExists(zkw, zkw.znodePaths.baseZNode) != -1); 133 LOG.debug(zkw.znodePaths.baseZNode + " created"); 134 ZKUtil.createAndFailSilent(zkw, zkw.znodePaths.splitLogZNode); 135 assertTrue(ZKUtil.checkExists(zkw, zkw.znodePaths.splitLogZNode) != -1); 136 LOG.debug(zkw.znodePaths.splitLogZNode + " created"); 137 138 resetCounters(); 139 140 // By default, we let the test manage the error as before, so the server 141 // does not appear as dead from the master point of view, only from the split log pov. 142 Mockito.when(sm.isServerOnline(Mockito.any())).thenReturn(true); 143 144 to = 12000; 145 conf.setInt(HConstants.HBASE_SPLITLOG_MANAGER_TIMEOUT, to); 146 conf.setInt("hbase.splitlog.manager.unassigned.timeout", 2 * to); 147 148 conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100); 149 to = to + 16 * 100; 150 } 151 152 @After 153 public void teardown() throws IOException, KeeperException { 154 master.stop(""); 155 if (slm != null) slm.stop(); 156 TEST_UTIL.shutdownMiniZKCluster(); 157 } 158 159 private interface Expr { 160 long eval(); 161 } 162 163 private void waitForCounter(final LongAdder ctr, long oldval, long newval, long timems) 164 throws Exception { 165 Expr e = new Expr() { 166 @Override 167 public long eval() { 168 return ctr.sum(); 169 } 170 }; 171 waitForCounter(e, oldval, newval, timems); 172 return; 173 } 174 175 private void waitForCounter(final Expr e, final long oldval, long newval, long timems) 176 throws Exception { 177 178 TEST_UTIL.waitFor(timems, 10, new Waiter.Predicate<Exception>() { 179 @Override 180 public boolean evaluate() throws Exception { 181 return (e.eval() != oldval); 182 } 183 }); 184 185 assertEquals(newval, e.eval()); 186 } 187 188 private Task findOrCreateOrphanTask(String path) { 189 return slm.tasks.computeIfAbsent(path, k -> { 190 LOG.info("creating orphan task " + k); 191 SplitLogCounters.tot_mgr_orphan_task_acquired.increment(); 192 return new Task(); 193 }); 194 } 195 196 private String submitTaskAndWait(TaskBatch batch, String name) throws KeeperException, 197 InterruptedException { 198 String tasknode = ZKSplitLog.getEncodedNodeName(zkw, name); 199 NodeCreationListener listener = new NodeCreationListener(zkw, tasknode); 200 zkw.registerListener(listener); 201 ZKUtil.watchAndCheckExists(zkw, tasknode); 202 203 slm.enqueueSplitTask(name, batch); 204 assertEquals(1, batch.installed); 205 assertTrue(findOrCreateOrphanTask(tasknode).batch == batch); 206 assertEquals(1L, tot_mgr_node_create_queued.sum()); 207 208 LOG.debug("waiting for task node creation"); 209 listener.waitForCreation(); 210 LOG.debug("task created"); 211 return tasknode; 212 } 213 214 /** 215 * Test whether the splitlog correctly creates a task in zookeeper 216 * @throws Exception 217 */ 218 @Test 219 public void testTaskCreation() throws Exception { 220 221 LOG.info("TestTaskCreation - test the creation of a task in zk"); 222 slm = new SplitLogManager(master, conf); 223 TaskBatch batch = new TaskBatch(); 224 225 String tasknode = submitTaskAndWait(batch, "foo/1"); 226 227 byte[] data = ZKUtil.getData(zkw, tasknode); 228 SplitLogTask slt = SplitLogTask.parseFrom(data); 229 LOG.info("Task node created " + slt.toString()); 230 assertTrue(slt.isUnassigned(master.getServerName())); 231 } 232 233 @Test 234 public void testOrphanTaskAcquisition() throws Exception { 235 LOG.info("TestOrphanTaskAcquisition"); 236 237 String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash"); 238 SplitLogTask slt = new SplitLogTask.Owned(master.getServerName()); 239 zkw.getRecoverableZooKeeper().create(tasknode, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE, 240 CreateMode.PERSISTENT); 241 242 slm = new SplitLogManager(master, conf); 243 waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2); 244 Task task = findOrCreateOrphanTask(tasknode); 245 assertTrue(task.isOrphan()); 246 waitForCounter(tot_mgr_heartbeat, 0, 1, to/2); 247 assertFalse(task.isUnassigned()); 248 long curt = System.currentTimeMillis(); 249 assertTrue((task.last_update <= curt) && 250 (task.last_update > (curt - 1000))); 251 LOG.info("waiting for manager to resubmit the orphan task"); 252 waitForCounter(tot_mgr_resubmit, 0, 1, to + to/2); 253 assertTrue(task.isUnassigned()); 254 waitForCounter(tot_mgr_rescan, 0, 1, to + to/2); 255 } 256 257 @Test 258 public void testUnassignedOrphan() throws Exception { 259 LOG.info("TestUnassignedOrphan - an unassigned task is resubmitted at" + 260 " startup"); 261 String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash"); 262 //create an unassigned orphan task 263 SplitLogTask slt = new SplitLogTask.Unassigned(master.getServerName()); 264 zkw.getRecoverableZooKeeper().create(tasknode, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE, 265 CreateMode.PERSISTENT); 266 int version = ZKUtil.checkExists(zkw, tasknode); 267 268 slm = new SplitLogManager(master, conf); 269 waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2); 270 Task task = findOrCreateOrphanTask(tasknode); 271 assertTrue(task.isOrphan()); 272 assertTrue(task.isUnassigned()); 273 // wait for RESCAN node to be created 274 waitForCounter(tot_mgr_rescan, 0, 1, to / 2); 275 Task task2 = findOrCreateOrphanTask(tasknode); 276 assertTrue(task == task2); 277 LOG.debug("task = " + task); 278 assertEquals(1L, tot_mgr_resubmit.sum()); 279 assertEquals(1, task.incarnation.get()); 280 assertEquals(0, task.unforcedResubmits.get()); 281 assertTrue(task.isOrphan()); 282 assertTrue(task.isUnassigned()); 283 assertTrue(ZKUtil.checkExists(zkw, tasknode) > version); 284 } 285 286 @Test 287 public void testMultipleResubmits() throws Exception { 288 LOG.info("TestMultipleResbmits - no indefinite resubmissions"); 289 conf.setInt("hbase.splitlog.max.resubmit", 2); 290 slm = new SplitLogManager(master, conf); 291 TaskBatch batch = new TaskBatch(); 292 293 String tasknode = submitTaskAndWait(batch, "foo/1"); 294 int version = ZKUtil.checkExists(zkw, tasknode); 295 final ServerName worker1 = ServerName.valueOf("worker1,1,1"); 296 final ServerName worker2 = ServerName.valueOf("worker2,1,1"); 297 final ServerName worker3 = ServerName.valueOf("worker3,1,1"); 298 SplitLogTask slt = new SplitLogTask.Owned(worker1); 299 ZKUtil.setData(zkw, tasknode, slt.toByteArray()); 300 waitForCounter(tot_mgr_heartbeat, 0, 1, to/2); 301 waitForCounter(tot_mgr_resubmit, 0, 1, to + to/2); 302 int version1 = ZKUtil.checkExists(zkw, tasknode); 303 assertTrue(version1 > version); 304 slt = new SplitLogTask.Owned(worker2); 305 ZKUtil.setData(zkw, tasknode, slt.toByteArray()); 306 waitForCounter(tot_mgr_heartbeat, 1, 2, to/2); 307 waitForCounter(tot_mgr_resubmit, 1, 2, to + to/2); 308 int version2 = ZKUtil.checkExists(zkw, tasknode); 309 assertTrue(version2 > version1); 310 slt = new SplitLogTask.Owned(worker3); 311 ZKUtil.setData(zkw, tasknode, slt.toByteArray()); 312 waitForCounter(tot_mgr_heartbeat, 2, 3, to/2); 313 waitForCounter(tot_mgr_resubmit_threshold_reached, 0, 1, to + to/2); 314 Thread.sleep(to + to/2); 315 assertEquals(2L, tot_mgr_resubmit.sum() - tot_mgr_resubmit_force.sum()); 316 } 317 318 @Test 319 public void testRescanCleanup() throws Exception { 320 LOG.info("TestRescanCleanup - ensure RESCAN nodes are cleaned up"); 321 322 slm = new SplitLogManager(master, conf); 323 TaskBatch batch = new TaskBatch(); 324 325 String tasknode = submitTaskAndWait(batch, "foo/1"); 326 int version = ZKUtil.checkExists(zkw, tasknode); 327 final ServerName worker1 = ServerName.valueOf("worker1,1,1"); 328 SplitLogTask slt = new SplitLogTask.Owned(worker1); 329 ZKUtil.setData(zkw, tasknode, slt.toByteArray()); 330 waitForCounter(tot_mgr_heartbeat, 0, 1, to/2); 331 waitForCounter(new Expr() { 332 @Override 333 public long eval() { 334 return (tot_mgr_resubmit.sum() + tot_mgr_resubmit_failed.sum()); 335 } 336 }, 0, 1, 5*60000); // wait long enough 337 Assert.assertEquals("Could not run test. Lost ZK connection?", 0, tot_mgr_resubmit_failed.sum()); 338 int version1 = ZKUtil.checkExists(zkw, tasknode); 339 assertTrue(version1 > version); 340 byte[] taskstate = ZKUtil.getData(zkw, tasknode); 341 slt = SplitLogTask.parseFrom(taskstate); 342 assertTrue(slt.isUnassigned(master.getServerName())); 343 344 waitForCounter(tot_mgr_rescan_deleted, 0, 1, to/2); 345 } 346 347 @Test 348 public void testTaskDone() throws Exception { 349 LOG.info("TestTaskDone - cleanup task node once in DONE state"); 350 351 slm = new SplitLogManager(master, conf); 352 TaskBatch batch = new TaskBatch(); 353 String tasknode = submitTaskAndWait(batch, "foo/1"); 354 final ServerName worker1 = ServerName.valueOf("worker1,1,1"); 355 SplitLogTask slt = new SplitLogTask.Done(worker1); 356 ZKUtil.setData(zkw, tasknode, slt.toByteArray()); 357 synchronized (batch) { 358 while (batch.installed != batch.done) { 359 batch.wait(); 360 } 361 } 362 waitForCounter(tot_mgr_task_deleted, 0, 1, to/2); 363 assertTrue(ZKUtil.checkExists(zkw, tasknode) == -1); 364 } 365 366 @Test 367 public void testTaskErr() throws Exception { 368 LOG.info("TestTaskErr - cleanup task node once in ERR state"); 369 370 conf.setInt("hbase.splitlog.max.resubmit", 0); 371 slm = new SplitLogManager(master, conf); 372 TaskBatch batch = new TaskBatch(); 373 374 String tasknode = submitTaskAndWait(batch, "foo/1"); 375 final ServerName worker1 = ServerName.valueOf("worker1,1,1"); 376 SplitLogTask slt = new SplitLogTask.Err(worker1); 377 ZKUtil.setData(zkw, tasknode, slt.toByteArray()); 378 379 synchronized (batch) { 380 while (batch.installed != batch.error) { 381 batch.wait(); 382 } 383 } 384 waitForCounter(tot_mgr_task_deleted, 0, 1, to/2); 385 assertTrue(ZKUtil.checkExists(zkw, tasknode) == -1); 386 conf.setInt("hbase.splitlog.max.resubmit", ZKSplitLogManagerCoordination.DEFAULT_MAX_RESUBMIT); 387 } 388 389 @Test 390 public void testTaskResigned() throws Exception { 391 LOG.info("TestTaskResigned - resubmit task node once in RESIGNED state"); 392 assertEquals(0, tot_mgr_resubmit.sum()); 393 slm = new SplitLogManager(master, conf); 394 assertEquals(0, tot_mgr_resubmit.sum()); 395 TaskBatch batch = new TaskBatch(); 396 String tasknode = submitTaskAndWait(batch, "foo/1"); 397 assertEquals(0, tot_mgr_resubmit.sum()); 398 final ServerName worker1 = ServerName.valueOf("worker1,1,1"); 399 assertEquals(0, tot_mgr_resubmit.sum()); 400 SplitLogTask slt = new SplitLogTask.Resigned(worker1); 401 assertEquals(0, tot_mgr_resubmit.sum()); 402 ZKUtil.setData(zkw, tasknode, slt.toByteArray()); 403 ZKUtil.checkExists(zkw, tasknode); 404 // Could be small race here. 405 if (tot_mgr_resubmit.sum() == 0) { 406 waitForCounter(tot_mgr_resubmit, 0, 1, to/2); 407 } 408 assertEquals(1, tot_mgr_resubmit.sum()); 409 410 byte[] taskstate = ZKUtil.getData(zkw, tasknode); 411 slt = SplitLogTask.parseFrom(taskstate); 412 assertTrue(slt.isUnassigned(master.getServerName())); 413 } 414 415 @Test 416 public void testUnassignedTimeout() throws Exception { 417 LOG.info("TestUnassignedTimeout - iff all tasks are unassigned then" + 418 " resubmit"); 419 420 // create an orphan task in OWNED state 421 String tasknode1 = ZKSplitLog.getEncodedNodeName(zkw, "orphan/1"); 422 final ServerName worker1 = ServerName.valueOf("worker1,1,1"); 423 SplitLogTask slt = new SplitLogTask.Owned(worker1); 424 zkw.getRecoverableZooKeeper().create(tasknode1, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE, 425 CreateMode.PERSISTENT); 426 427 slm = new SplitLogManager(master, conf); 428 waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2); 429 430 // submit another task which will stay in unassigned mode 431 TaskBatch batch = new TaskBatch(); 432 submitTaskAndWait(batch, "foo/1"); 433 434 // keep updating the orphan owned node every to/2 seconds 435 for (int i = 0; i < (3 * to)/100; i++) { 436 Thread.sleep(100); 437 final ServerName worker2 = ServerName.valueOf("worker1,1,1"); 438 slt = new SplitLogTask.Owned(worker2); 439 ZKUtil.setData(zkw, tasknode1, slt.toByteArray()); 440 } 441 442 // since we have stopped heartbeating the owned node therefore it should 443 // get resubmitted 444 LOG.info("waiting for manager to resubmit the orphan task"); 445 waitForCounter(tot_mgr_resubmit, 0, 1, to + to/2); 446 447 // now all the nodes are unassigned. manager should post another rescan 448 waitForCounter(tot_mgr_resubmit_unassigned, 0, 1, 2 * to + to/2); 449 } 450 451 @Test 452 public void testDeadWorker() throws Exception { 453 LOG.info("testDeadWorker"); 454 455 conf.setLong("hbase.splitlog.max.resubmit", 0); 456 slm = new SplitLogManager(master, conf); 457 TaskBatch batch = new TaskBatch(); 458 459 String tasknode = submitTaskAndWait(batch, "foo/1"); 460 int version = ZKUtil.checkExists(zkw, tasknode); 461 final ServerName worker1 = ServerName.valueOf("worker1,1,1"); 462 SplitLogTask slt = new SplitLogTask.Owned(worker1); 463 ZKUtil.setData(zkw, tasknode, slt.toByteArray()); 464 if (tot_mgr_heartbeat.sum() == 0) waitForCounter(tot_mgr_heartbeat, 0, 1, to/2); 465 slm.handleDeadWorker(worker1); 466 if (tot_mgr_resubmit.sum() == 0) waitForCounter(tot_mgr_resubmit, 0, 1, to+to/2); 467 if (tot_mgr_resubmit_dead_server_task.sum() == 0) { 468 waitForCounter(tot_mgr_resubmit_dead_server_task, 0, 1, to + to/2); 469 } 470 471 int version1 = ZKUtil.checkExists(zkw, tasknode); 472 assertTrue(version1 > version); 473 byte[] taskstate = ZKUtil.getData(zkw, tasknode); 474 slt = SplitLogTask.parseFrom(taskstate); 475 assertTrue(slt.isUnassigned(master.getServerName())); 476 return; 477 } 478 479 @Test 480 public void testWorkerCrash() throws Exception { 481 slm = new SplitLogManager(master, conf); 482 TaskBatch batch = new TaskBatch(); 483 484 String tasknode = submitTaskAndWait(batch, "foo/1"); 485 final ServerName worker1 = ServerName.valueOf("worker1,1,1"); 486 487 SplitLogTask slt = new SplitLogTask.Owned(worker1); 488 ZKUtil.setData(zkw, tasknode, slt.toByteArray()); 489 if (tot_mgr_heartbeat.sum() == 0) waitForCounter(tot_mgr_heartbeat, 0, 1, to/2); 490 491 // Not yet resubmitted. 492 Assert.assertEquals(0, tot_mgr_resubmit.sum()); 493 494 // This server becomes dead 495 Mockito.when(sm.isServerOnline(worker1)).thenReturn(false); 496 497 Thread.sleep(1300); // The timeout checker is done every 1000 ms (hardcoded). 498 499 // It has been resubmitted 500 Assert.assertEquals(1, tot_mgr_resubmit.sum()); 501 } 502 503 @Test 504 public void testEmptyLogDir() throws Exception { 505 LOG.info("testEmptyLogDir"); 506 slm = new SplitLogManager(master, conf); 507 FileSystem fs = TEST_UTIL.getTestFileSystem(); 508 Path emptyLogDirPath = new Path(new Path(fs.getWorkingDirectory(), HConstants.HREGION_LOGDIR_NAME), 509 ServerName.valueOf("emptyLogDir", 1, 1).toString()); 510 fs.mkdirs(emptyLogDirPath); 511 slm.splitLogDistributed(emptyLogDirPath); 512 assertFalse(fs.exists(emptyLogDirPath)); 513 } 514 515 @Test 516 public void testLogFilesAreArchived() throws Exception { 517 LOG.info("testLogFilesAreArchived"); 518 slm = new SplitLogManager(master, conf); 519 FileSystem fs = TEST_UTIL.getTestFileSystem(); 520 Path dir = TEST_UTIL.getDataTestDirOnTestFS("testLogFilesAreArchived"); 521 conf.set(HConstants.HBASE_DIR, dir.toString()); 522 String serverName = ServerName.valueOf("foo", 1, 1).toString(); 523 Path logDirPath = new Path(new Path(dir, HConstants.HREGION_LOGDIR_NAME), serverName); 524 fs.mkdirs(logDirPath); 525 // create an empty log file 526 String logFile = new Path(logDirPath, UUID.randomUUID().toString()).toString(); 527 fs.create(new Path(logDirPath, logFile)).close(); 528 529 // spin up a thread mocking split done. 530 new Thread() { 531 @Override 532 public void run() { 533 boolean done = false; 534 while (!done) { 535 for (Map.Entry<String, Task> entry : slm.getTasks().entrySet()) { 536 final ServerName worker1 = ServerName.valueOf("worker1,1,1"); 537 SplitLogTask slt = new SplitLogTask.Done(worker1); 538 boolean encounteredZKException = false; 539 try { 540 ZKUtil.setData(zkw, entry.getKey(), slt.toByteArray()); 541 } catch (KeeperException e) { 542 LOG.warn(e.toString(), e); 543 encounteredZKException = true; 544 } 545 if (!encounteredZKException) { 546 done = true; 547 } 548 } 549 } 550 }; 551 }.start(); 552 553 slm.splitLogDistributed(logDirPath); 554 555 assertFalse(fs.exists(logDirPath)); 556 } 557}