001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master; 019 020import static org.apache.hadoop.hbase.SplitLogCounters.resetCounters; 021import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_heartbeat; 022import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_node_create_queued; 023import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_orphan_task_acquired; 024import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_rescan; 025import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_rescan_deleted; 026import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit; 027import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_dead_server_task; 028import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_failed; 029import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_force; 030import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_threshold_reached; 031import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_unassigned; 032import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_task_deleted; 033import static org.junit.Assert.assertEquals; 034import static org.junit.Assert.assertFalse; 035import static org.junit.Assert.assertTrue; 036 037import java.io.IOException; 038import java.util.Map; 039import java.util.concurrent.atomic.LongAdder; 040import org.apache.hadoop.conf.Configuration; 041import org.apache.hadoop.fs.FileSystem; 042import org.apache.hadoop.fs.Path; 043import org.apache.hadoop.hbase.CoordinatedStateManager; 044import org.apache.hadoop.hbase.HBaseClassTestRule; 045import org.apache.hadoop.hbase.HBaseTestingUtility; 046import org.apache.hadoop.hbase.HConstants; 047import org.apache.hadoop.hbase.ServerName; 048import org.apache.hadoop.hbase.SplitLogCounters; 049import org.apache.hadoop.hbase.SplitLogTask; 050import org.apache.hadoop.hbase.Waiter; 051import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination; 052import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager; 053import org.apache.hadoop.hbase.master.SplitLogManager.Task; 054import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch; 055import org.apache.hadoop.hbase.regionserver.TestMasterAddressTracker.NodeCreationListener; 056import org.apache.hadoop.hbase.testclassification.MasterTests; 057import org.apache.hadoop.hbase.testclassification.MediumTests; 058import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; 059import org.apache.hadoop.hbase.zookeeper.ZKUtil; 060import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 061import org.apache.zookeeper.CreateMode; 062import org.apache.zookeeper.KeeperException; 063import org.apache.zookeeper.ZooDefs.Ids; 064import org.junit.After; 065import org.junit.Assert; 066import org.junit.Before; 067import org.junit.ClassRule; 068import org.junit.Test; 069import org.junit.experimental.categories.Category; 070import org.mockito.Mockito; 071import org.slf4j.Logger; 072import org.slf4j.LoggerFactory; 073 074@Category({MasterTests.class, MediumTests.class}) 075public class TestSplitLogManager { 076 077 @ClassRule 078 public static final HBaseClassTestRule CLASS_RULE = 079 HBaseClassTestRule.forClass(TestSplitLogManager.class); 080 081 private static final Logger LOG = LoggerFactory.getLogger(TestSplitLogManager.class); 082 083 private final ServerManager sm = Mockito.mock(ServerManager.class); 084 085 private ZKWatcher zkw; 086 private DummyMasterServices master; 087 private SplitLogManager slm; 088 private Configuration conf; 089 private int to; 090 091 private static HBaseTestingUtility TEST_UTIL; 092 093 class DummyMasterServices extends MockNoopMasterServices { 094 private ZKWatcher zkw; 095 private CoordinatedStateManager cm; 096 097 public DummyMasterServices(ZKWatcher zkw, Configuration conf) { 098 super(conf); 099 this.zkw = zkw; 100 cm = new ZkCoordinatedStateManager(this); 101 } 102 103 @Override 104 public ZKWatcher getZooKeeper() { 105 return zkw; 106 } 107 108 @Override 109 public CoordinatedStateManager getCoordinatedStateManager() { 110 return cm; 111 } 112 113 @Override 114 public ServerManager getServerManager() { 115 return sm; 116 } 117 } 118 119 @Before 120 public void setup() throws Exception { 121 TEST_UTIL = new HBaseTestingUtility(); 122 TEST_UTIL.startMiniZKCluster(); 123 conf = TEST_UTIL.getConfiguration(); 124 // Use a different ZK wrapper instance for each tests. 125 zkw = 126 new ZKWatcher(conf, "split-log-manager-tests" + TEST_UTIL.getRandomUUID().toString(), null); 127 master = new DummyMasterServices(zkw, conf); 128 129 ZKUtil.deleteChildrenRecursively(zkw, zkw.getZNodePaths().baseZNode); 130 ZKUtil.createAndFailSilent(zkw, zkw.getZNodePaths().baseZNode); 131 assertTrue(ZKUtil.checkExists(zkw, zkw.getZNodePaths().baseZNode) != -1); 132 LOG.debug(zkw.getZNodePaths().baseZNode + " created"); 133 ZKUtil.createAndFailSilent(zkw, zkw.getZNodePaths().splitLogZNode); 134 assertTrue(ZKUtil.checkExists(zkw, zkw.getZNodePaths().splitLogZNode) != -1); 135 LOG.debug(zkw.getZNodePaths().splitLogZNode + " created"); 136 137 resetCounters(); 138 139 // By default, we let the test manage the error as before, so the server 140 // does not appear as dead from the master point of view, only from the split log pov. 141 Mockito.when(sm.isServerOnline(Mockito.any())).thenReturn(true); 142 143 to = 12000; 144 conf.setInt(HConstants.HBASE_SPLITLOG_MANAGER_TIMEOUT, to); 145 conf.setInt("hbase.splitlog.manager.unassigned.timeout", 2 * to); 146 147 conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100); 148 to = to + 16 * 100; 149 } 150 151 @After 152 public void teardown() throws IOException, KeeperException { 153 master.stop(""); 154 if (slm != null) slm.stop(); 155 TEST_UTIL.shutdownMiniZKCluster(); 156 } 157 158 private interface Expr { 159 long eval(); 160 } 161 162 private void waitForCounter(final LongAdder ctr, long oldval, long newval, long timems) 163 throws Exception { 164 Expr e = new Expr() { 165 @Override 166 public long eval() { 167 return ctr.sum(); 168 } 169 }; 170 waitForCounter(e, oldval, newval, timems); 171 return; 172 } 173 174 private void waitForCounter(final Expr e, final long oldval, long newval, long timems) 175 throws Exception { 176 177 TEST_UTIL.waitFor(timems, 10, new Waiter.Predicate<Exception>() { 178 @Override 179 public boolean evaluate() throws Exception { 180 return (e.eval() != oldval); 181 } 182 }); 183 184 assertEquals(newval, e.eval()); 185 } 186 187 private Task findOrCreateOrphanTask(String path) { 188 return slm.tasks.computeIfAbsent(path, k -> { 189 LOG.info("creating orphan task " + k); 190 SplitLogCounters.tot_mgr_orphan_task_acquired.increment(); 191 return new Task(); 192 }); 193 } 194 195 private String submitTaskAndWait(TaskBatch batch, String name) throws KeeperException, 196 InterruptedException { 197 String tasknode = ZKSplitLog.getEncodedNodeName(zkw, name); 198 NodeCreationListener listener = new NodeCreationListener(zkw, tasknode); 199 zkw.registerListener(listener); 200 ZKUtil.watchAndCheckExists(zkw, tasknode); 201 202 slm.enqueueSplitTask(name, batch); 203 assertEquals(1, batch.installed); 204 assertTrue(findOrCreateOrphanTask(tasknode).batch == batch); 205 assertEquals(1L, tot_mgr_node_create_queued.sum()); 206 207 LOG.debug("waiting for task node creation"); 208 listener.waitForCreation(); 209 LOG.debug("task created"); 210 return tasknode; 211 } 212 213 /** 214 * Test whether the splitlog correctly creates a task in zookeeper 215 * @throws Exception 216 */ 217 @Test 218 public void testTaskCreation() throws Exception { 219 220 LOG.info("TestTaskCreation - test the creation of a task in zk"); 221 slm = new SplitLogManager(master, conf); 222 TaskBatch batch = new TaskBatch(); 223 224 String tasknode = submitTaskAndWait(batch, "foo/1"); 225 226 byte[] data = ZKUtil.getData(zkw, tasknode); 227 SplitLogTask slt = SplitLogTask.parseFrom(data); 228 LOG.info("Task node created " + slt.toString()); 229 assertTrue(slt.isUnassigned(master.getServerName())); 230 } 231 232 @Test 233 public void testOrphanTaskAcquisition() throws Exception { 234 LOG.info("TestOrphanTaskAcquisition"); 235 236 String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash"); 237 SplitLogTask slt = new SplitLogTask.Owned(master.getServerName()); 238 zkw.getRecoverableZooKeeper().create(tasknode, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE, 239 CreateMode.PERSISTENT); 240 241 slm = new SplitLogManager(master, conf); 242 waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2); 243 Task task = findOrCreateOrphanTask(tasknode); 244 assertTrue(task.isOrphan()); 245 waitForCounter(tot_mgr_heartbeat, 0, 1, to/2); 246 assertFalse(task.isUnassigned()); 247 long curt = System.currentTimeMillis(); 248 assertTrue((task.last_update <= curt) && 249 (task.last_update > (curt - 1000))); 250 LOG.info("waiting for manager to resubmit the orphan task"); 251 waitForCounter(tot_mgr_resubmit, 0, 1, to + to/2); 252 assertTrue(task.isUnassigned()); 253 waitForCounter(tot_mgr_rescan, 0, 1, to + to/2); 254 } 255 256 @Test 257 public void testUnassignedOrphan() throws Exception { 258 LOG.info("TestUnassignedOrphan - an unassigned task is resubmitted at" + 259 " startup"); 260 String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash"); 261 //create an unassigned orphan task 262 SplitLogTask slt = new SplitLogTask.Unassigned(master.getServerName()); 263 zkw.getRecoverableZooKeeper().create(tasknode, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE, 264 CreateMode.PERSISTENT); 265 int version = ZKUtil.checkExists(zkw, tasknode); 266 267 slm = new SplitLogManager(master, conf); 268 waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2); 269 Task task = findOrCreateOrphanTask(tasknode); 270 assertTrue(task.isOrphan()); 271 assertTrue(task.isUnassigned()); 272 // wait for RESCAN node to be created 273 waitForCounter(tot_mgr_rescan, 0, 1, to / 2); 274 Task task2 = findOrCreateOrphanTask(tasknode); 275 assertTrue(task == task2); 276 LOG.debug("task = " + task); 277 assertEquals(1L, tot_mgr_resubmit.sum()); 278 assertEquals(1, task.incarnation.get()); 279 assertEquals(0, task.unforcedResubmits.get()); 280 assertTrue(task.isOrphan()); 281 assertTrue(task.isUnassigned()); 282 assertTrue(ZKUtil.checkExists(zkw, tasknode) > version); 283 } 284 285 @Test 286 public void testMultipleResubmits() throws Exception { 287 LOG.info("TestMultipleResbmits - no indefinite resubmissions"); 288 conf.setInt("hbase.splitlog.max.resubmit", 2); 289 slm = new SplitLogManager(master, conf); 290 TaskBatch batch = new TaskBatch(); 291 292 String tasknode = submitTaskAndWait(batch, "foo/1"); 293 int version = ZKUtil.checkExists(zkw, tasknode); 294 final ServerName worker1 = ServerName.valueOf("worker1,1,1"); 295 final ServerName worker2 = ServerName.valueOf("worker2,1,1"); 296 final ServerName worker3 = ServerName.valueOf("worker3,1,1"); 297 SplitLogTask slt = new SplitLogTask.Owned(worker1); 298 ZKUtil.setData(zkw, tasknode, slt.toByteArray()); 299 waitForCounter(tot_mgr_heartbeat, 0, 1, to/2); 300 waitForCounter(tot_mgr_resubmit, 0, 1, to + to/2); 301 int version1 = ZKUtil.checkExists(zkw, tasknode); 302 assertTrue(version1 > version); 303 slt = new SplitLogTask.Owned(worker2); 304 ZKUtil.setData(zkw, tasknode, slt.toByteArray()); 305 waitForCounter(tot_mgr_heartbeat, 1, 2, to/2); 306 waitForCounter(tot_mgr_resubmit, 1, 2, to + to/2); 307 int version2 = ZKUtil.checkExists(zkw, tasknode); 308 assertTrue(version2 > version1); 309 slt = new SplitLogTask.Owned(worker3); 310 ZKUtil.setData(zkw, tasknode, slt.toByteArray()); 311 waitForCounter(tot_mgr_heartbeat, 2, 3, to/2); 312 waitForCounter(tot_mgr_resubmit_threshold_reached, 0, 1, to + to/2); 313 Thread.sleep(to + to/2); 314 assertEquals(2L, tot_mgr_resubmit.sum() - tot_mgr_resubmit_force.sum()); 315 } 316 317 @Test 318 public void testRescanCleanup() throws Exception { 319 LOG.info("TestRescanCleanup - ensure RESCAN nodes are cleaned up"); 320 321 slm = new SplitLogManager(master, conf); 322 TaskBatch batch = new TaskBatch(); 323 324 String tasknode = submitTaskAndWait(batch, "foo/1"); 325 int version = ZKUtil.checkExists(zkw, tasknode); 326 final ServerName worker1 = ServerName.valueOf("worker1,1,1"); 327 SplitLogTask slt = new SplitLogTask.Owned(worker1); 328 ZKUtil.setData(zkw, tasknode, slt.toByteArray()); 329 waitForCounter(tot_mgr_heartbeat, 0, 1, to/2); 330 waitForCounter(new Expr() { 331 @Override 332 public long eval() { 333 return (tot_mgr_resubmit.sum() + tot_mgr_resubmit_failed.sum()); 334 } 335 }, 0, 1, 5*60000); // wait long enough 336 Assert.assertEquals("Could not run test. Lost ZK connection?", 0, tot_mgr_resubmit_failed.sum()); 337 int version1 = ZKUtil.checkExists(zkw, tasknode); 338 assertTrue(version1 > version); 339 byte[] taskstate = ZKUtil.getData(zkw, tasknode); 340 slt = SplitLogTask.parseFrom(taskstate); 341 assertTrue(slt.isUnassigned(master.getServerName())); 342 343 waitForCounter(tot_mgr_rescan_deleted, 0, 1, to/2); 344 } 345 346 @Test 347 public void testTaskDone() throws Exception { 348 LOG.info("TestTaskDone - cleanup task node once in DONE state"); 349 350 slm = new SplitLogManager(master, conf); 351 TaskBatch batch = new TaskBatch(); 352 String tasknode = submitTaskAndWait(batch, "foo/1"); 353 final ServerName worker1 = ServerName.valueOf("worker1,1,1"); 354 SplitLogTask slt = new SplitLogTask.Done(worker1); 355 ZKUtil.setData(zkw, tasknode, slt.toByteArray()); 356 synchronized (batch) { 357 while (batch.installed != batch.done) { 358 batch.wait(); 359 } 360 } 361 waitForCounter(tot_mgr_task_deleted, 0, 1, to/2); 362 assertTrue(ZKUtil.checkExists(zkw, tasknode) == -1); 363 } 364 365 @Test 366 public void testTaskErr() throws Exception { 367 LOG.info("TestTaskErr - cleanup task node once in ERR state"); 368 369 conf.setInt("hbase.splitlog.max.resubmit", 0); 370 slm = new SplitLogManager(master, conf); 371 TaskBatch batch = new TaskBatch(); 372 373 String tasknode = submitTaskAndWait(batch, "foo/1"); 374 final ServerName worker1 = ServerName.valueOf("worker1,1,1"); 375 SplitLogTask slt = new SplitLogTask.Err(worker1); 376 ZKUtil.setData(zkw, tasknode, slt.toByteArray()); 377 378 synchronized (batch) { 379 while (batch.installed != batch.error) { 380 batch.wait(); 381 } 382 } 383 waitForCounter(tot_mgr_task_deleted, 0, 1, to/2); 384 assertTrue(ZKUtil.checkExists(zkw, tasknode) == -1); 385 conf.setInt("hbase.splitlog.max.resubmit", ZKSplitLogManagerCoordination.DEFAULT_MAX_RESUBMIT); 386 } 387 388 @Test 389 public void testTaskResigned() throws Exception { 390 LOG.info("TestTaskResigned - resubmit task node once in RESIGNED state"); 391 assertEquals(0, tot_mgr_resubmit.sum()); 392 slm = new SplitLogManager(master, conf); 393 assertEquals(0, tot_mgr_resubmit.sum()); 394 TaskBatch batch = new TaskBatch(); 395 String tasknode = submitTaskAndWait(batch, "foo/1"); 396 assertEquals(0, tot_mgr_resubmit.sum()); 397 final ServerName worker1 = ServerName.valueOf("worker1,1,1"); 398 assertEquals(0, tot_mgr_resubmit.sum()); 399 SplitLogTask slt = new SplitLogTask.Resigned(worker1); 400 assertEquals(0, tot_mgr_resubmit.sum()); 401 ZKUtil.setData(zkw, tasknode, slt.toByteArray()); 402 ZKUtil.checkExists(zkw, tasknode); 403 // Could be small race here. 404 if (tot_mgr_resubmit.sum() == 0) { 405 waitForCounter(tot_mgr_resubmit, 0, 1, to/2); 406 } 407 assertEquals(1, tot_mgr_resubmit.sum()); 408 409 byte[] taskstate = ZKUtil.getData(zkw, tasknode); 410 slt = SplitLogTask.parseFrom(taskstate); 411 assertTrue(slt.isUnassigned(master.getServerName())); 412 } 413 414 @Test 415 public void testUnassignedTimeout() throws Exception { 416 LOG.info("TestUnassignedTimeout - iff all tasks are unassigned then" + 417 " resubmit"); 418 419 // create an orphan task in OWNED state 420 String tasknode1 = ZKSplitLog.getEncodedNodeName(zkw, "orphan/1"); 421 final ServerName worker1 = ServerName.valueOf("worker1,1,1"); 422 SplitLogTask slt = new SplitLogTask.Owned(worker1); 423 zkw.getRecoverableZooKeeper().create(tasknode1, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE, 424 CreateMode.PERSISTENT); 425 426 slm = new SplitLogManager(master, conf); 427 waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2); 428 429 // submit another task which will stay in unassigned mode 430 TaskBatch batch = new TaskBatch(); 431 submitTaskAndWait(batch, "foo/1"); 432 433 // keep updating the orphan owned node every to/2 seconds 434 for (int i = 0; i < (3 * to)/100; i++) { 435 Thread.sleep(100); 436 final ServerName worker2 = ServerName.valueOf("worker1,1,1"); 437 slt = new SplitLogTask.Owned(worker2); 438 ZKUtil.setData(zkw, tasknode1, slt.toByteArray()); 439 } 440 441 // since we have stopped heartbeating the owned node therefore it should 442 // get resubmitted 443 LOG.info("waiting for manager to resubmit the orphan task"); 444 waitForCounter(tot_mgr_resubmit, 0, 1, to + to/2); 445 446 // now all the nodes are unassigned. manager should post another rescan 447 waitForCounter(tot_mgr_resubmit_unassigned, 0, 1, 2 * to + to/2); 448 } 449 450 @Test 451 public void testDeadWorker() throws Exception { 452 LOG.info("testDeadWorker"); 453 454 conf.setLong("hbase.splitlog.max.resubmit", 0); 455 slm = new SplitLogManager(master, conf); 456 TaskBatch batch = new TaskBatch(); 457 458 String tasknode = submitTaskAndWait(batch, "foo/1"); 459 int version = ZKUtil.checkExists(zkw, tasknode); 460 final ServerName worker1 = ServerName.valueOf("worker1,1,1"); 461 SplitLogTask slt = new SplitLogTask.Owned(worker1); 462 ZKUtil.setData(zkw, tasknode, slt.toByteArray()); 463 if (tot_mgr_heartbeat.sum() == 0) waitForCounter(tot_mgr_heartbeat, 0, 1, to/2); 464 slm.handleDeadWorker(worker1); 465 if (tot_mgr_resubmit.sum() == 0) waitForCounter(tot_mgr_resubmit, 0, 1, to+to/2); 466 if (tot_mgr_resubmit_dead_server_task.sum() == 0) { 467 waitForCounter(tot_mgr_resubmit_dead_server_task, 0, 1, to + to/2); 468 } 469 470 int version1 = ZKUtil.checkExists(zkw, tasknode); 471 assertTrue(version1 > version); 472 byte[] taskstate = ZKUtil.getData(zkw, tasknode); 473 slt = SplitLogTask.parseFrom(taskstate); 474 assertTrue(slt.isUnassigned(master.getServerName())); 475 return; 476 } 477 478 @Test 479 public void testWorkerCrash() throws Exception { 480 slm = new SplitLogManager(master, conf); 481 TaskBatch batch = new TaskBatch(); 482 483 String tasknode = submitTaskAndWait(batch, "foo/1"); 484 final ServerName worker1 = ServerName.valueOf("worker1,1,1"); 485 486 SplitLogTask slt = new SplitLogTask.Owned(worker1); 487 ZKUtil.setData(zkw, tasknode, slt.toByteArray()); 488 if (tot_mgr_heartbeat.sum() == 0) waitForCounter(tot_mgr_heartbeat, 0, 1, to/2); 489 490 // Not yet resubmitted. 491 Assert.assertEquals(0, tot_mgr_resubmit.sum()); 492 493 // This server becomes dead 494 Mockito.when(sm.isServerOnline(worker1)).thenReturn(false); 495 496 Thread.sleep(1300); // The timeout checker is done every 1000 ms (hardcoded). 497 498 // It has been resubmitted 499 Assert.assertEquals(1, tot_mgr_resubmit.sum()); 500 } 501 502 @Test 503 public void testEmptyLogDir() throws Exception { 504 LOG.info("testEmptyLogDir"); 505 slm = new SplitLogManager(master, conf); 506 FileSystem fs = TEST_UTIL.getTestFileSystem(); 507 Path emptyLogDirPath = new Path(new Path(fs.getWorkingDirectory(), HConstants.HREGION_LOGDIR_NAME), 508 ServerName.valueOf("emptyLogDir", 1, 1).toString()); 509 fs.mkdirs(emptyLogDirPath); 510 slm.splitLogDistributed(emptyLogDirPath); 511 assertFalse(fs.exists(emptyLogDirPath)); 512 } 513 514 @Test 515 public void testLogFilesAreArchived() throws Exception { 516 LOG.info("testLogFilesAreArchived"); 517 slm = new SplitLogManager(master, conf); 518 FileSystem fs = TEST_UTIL.getTestFileSystem(); 519 Path dir = TEST_UTIL.getDataTestDirOnTestFS("testLogFilesAreArchived"); 520 conf.set(HConstants.HBASE_DIR, dir.toString()); 521 String serverName = ServerName.valueOf("foo", 1, 1).toString(); 522 Path logDirPath = new Path(new Path(dir, HConstants.HREGION_LOGDIR_NAME), serverName); 523 fs.mkdirs(logDirPath); 524 // create an empty log file 525 String logFile = new Path(logDirPath, TEST_UTIL.getRandomUUID().toString()).toString(); 526 fs.create(new Path(logDirPath, logFile)).close(); 527 528 // spin up a thread mocking split done. 529 new Thread() { 530 @Override 531 public void run() { 532 boolean done = false; 533 while (!done) { 534 for (Map.Entry<String, Task> entry : slm.getTasks().entrySet()) { 535 final ServerName worker1 = ServerName.valueOf("worker1,1,1"); 536 SplitLogTask slt = new SplitLogTask.Done(worker1); 537 boolean encounteredZKException = false; 538 try { 539 ZKUtil.setData(zkw, entry.getKey(), slt.toByteArray()); 540 } catch (KeeperException e) { 541 LOG.warn(e.toString(), e); 542 encounteredZKException = true; 543 } 544 if (!encounteredZKException) { 545 done = true; 546 } 547 } 548 } 549 }; 550 }.start(); 551 552 slm.splitLogDistributed(logDirPath); 553 554 assertFalse(fs.exists(logDirPath)); 555 } 556}