001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master;
019
020import static org.apache.hadoop.hbase.SplitLogCounters.resetCounters;
021import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_heartbeat;
022import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_node_create_queued;
023import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_orphan_task_acquired;
024import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_rescan;
025import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_rescan_deleted;
026import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit;
027import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_dead_server_task;
028import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_failed;
029import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_force;
030import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_threshold_reached;
031import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_unassigned;
032import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_task_deleted;
033import static org.junit.Assert.assertEquals;
034import static org.junit.Assert.assertFalse;
035import static org.junit.Assert.assertTrue;
036
037import java.io.IOException;
038import java.util.Map;
039import java.util.concurrent.atomic.LongAdder;
040import org.apache.hadoop.conf.Configuration;
041import org.apache.hadoop.fs.FileSystem;
042import org.apache.hadoop.fs.Path;
043import org.apache.hadoop.hbase.CoordinatedStateManager;
044import org.apache.hadoop.hbase.HBaseClassTestRule;
045import org.apache.hadoop.hbase.HBaseTestingUtility;
046import org.apache.hadoop.hbase.HConstants;
047import org.apache.hadoop.hbase.ServerName;
048import org.apache.hadoop.hbase.SplitLogCounters;
049import org.apache.hadoop.hbase.SplitLogTask;
050import org.apache.hadoop.hbase.Waiter;
051import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination;
052import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager;
053import org.apache.hadoop.hbase.master.SplitLogManager.Task;
054import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch;
055import org.apache.hadoop.hbase.regionserver.TestMasterAddressTracker.NodeCreationListener;
056import org.apache.hadoop.hbase.testclassification.MasterTests;
057import org.apache.hadoop.hbase.testclassification.MediumTests;
058import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
059import org.apache.hadoop.hbase.zookeeper.ZKUtil;
060import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
061import org.apache.zookeeper.CreateMode;
062import org.apache.zookeeper.KeeperException;
063import org.apache.zookeeper.ZooDefs.Ids;
064import org.junit.After;
065import org.junit.Assert;
066import org.junit.Before;
067import org.junit.ClassRule;
068import org.junit.Test;
069import org.junit.experimental.categories.Category;
070import org.mockito.Mockito;
071import org.slf4j.Logger;
072import org.slf4j.LoggerFactory;
073
074@Category({MasterTests.class, MediumTests.class})
075public class TestSplitLogManager {
076
077  @ClassRule
078  public static final HBaseClassTestRule CLASS_RULE =
079      HBaseClassTestRule.forClass(TestSplitLogManager.class);
080
081  private static final Logger LOG = LoggerFactory.getLogger(TestSplitLogManager.class);
082
083  private final ServerManager sm = Mockito.mock(ServerManager.class);
084
085  private ZKWatcher zkw;
086  private DummyMasterServices master;
087  private SplitLogManager slm;
088  private Configuration conf;
089  private int to;
090
091  private static HBaseTestingUtility TEST_UTIL;
092
093  class DummyMasterServices extends MockNoopMasterServices {
094    private ZKWatcher zkw;
095    private CoordinatedStateManager cm;
096
097    public DummyMasterServices(ZKWatcher zkw, Configuration conf) {
098      super(conf);
099      this.zkw = zkw;
100      cm = new ZkCoordinatedStateManager(this);
101    }
102
103    @Override
104    public ZKWatcher getZooKeeper() {
105      return zkw;
106    }
107
108    @Override
109    public CoordinatedStateManager getCoordinatedStateManager() {
110      return cm;
111    }
112
113    @Override
114    public ServerManager getServerManager() {
115      return sm;
116    }
117  }
118
119  @Before
120  public void setup() throws Exception {
121    TEST_UTIL = new HBaseTestingUtility();
122    TEST_UTIL.startMiniZKCluster();
123    conf = TEST_UTIL.getConfiguration();
124    // Use a different ZK wrapper instance for each tests.
125    zkw =
126        new ZKWatcher(conf, "split-log-manager-tests" + TEST_UTIL.getRandomUUID().toString(), null);
127    master = new DummyMasterServices(zkw, conf);
128
129    ZKUtil.deleteChildrenRecursively(zkw, zkw.getZNodePaths().baseZNode);
130    ZKUtil.createAndFailSilent(zkw, zkw.getZNodePaths().baseZNode);
131    assertTrue(ZKUtil.checkExists(zkw, zkw.getZNodePaths().baseZNode) != -1);
132    LOG.debug(zkw.getZNodePaths().baseZNode + " created");
133    ZKUtil.createAndFailSilent(zkw, zkw.getZNodePaths().splitLogZNode);
134    assertTrue(ZKUtil.checkExists(zkw, zkw.getZNodePaths().splitLogZNode) != -1);
135    LOG.debug(zkw.getZNodePaths().splitLogZNode + " created");
136
137    resetCounters();
138
139    // By default, we let the test manage the error as before, so the server
140    // does not appear as dead from the master point of view, only from the split log pov.
141    Mockito.when(sm.isServerOnline(Mockito.any())).thenReturn(true);
142
143    to = 12000;
144    conf.setInt(HConstants.HBASE_SPLITLOG_MANAGER_TIMEOUT, to);
145    conf.setInt("hbase.splitlog.manager.unassigned.timeout", 2 * to);
146
147    conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100);
148    to = to + 16 * 100;
149  }
150
151  @After
152  public void teardown() throws IOException, KeeperException {
153    master.stop("");
154    if (slm != null) slm.stop();
155    TEST_UTIL.shutdownMiniZKCluster();
156  }
157
158  private interface Expr {
159    long eval();
160  }
161
162  private void waitForCounter(final LongAdder ctr, long oldval, long newval, long timems)
163      throws Exception {
164    Expr e = new Expr() {
165      @Override
166      public long eval() {
167        return ctr.sum();
168      }
169    };
170    waitForCounter(e, oldval, newval, timems);
171    return;
172  }
173
174  private void waitForCounter(final Expr e, final long oldval, long newval, long timems)
175      throws Exception {
176
177    TEST_UTIL.waitFor(timems, 10, new Waiter.Predicate<Exception>() {
178      @Override
179      public boolean evaluate() throws Exception {
180        return (e.eval() != oldval);
181      }
182    });
183
184    assertEquals(newval, e.eval());
185  }
186
187  private Task findOrCreateOrphanTask(String path) {
188    return slm.tasks.computeIfAbsent(path, k -> {
189      LOG.info("creating orphan task " + k);
190      SplitLogCounters.tot_mgr_orphan_task_acquired.increment();
191      return new Task();
192    });
193  }
194
195  private String submitTaskAndWait(TaskBatch batch, String name) throws KeeperException,
196      InterruptedException {
197    String tasknode = ZKSplitLog.getEncodedNodeName(zkw, name);
198    NodeCreationListener listener = new NodeCreationListener(zkw, tasknode);
199    zkw.registerListener(listener);
200    ZKUtil.watchAndCheckExists(zkw, tasknode);
201
202    slm.enqueueSplitTask(name, batch);
203    assertEquals(1, batch.installed);
204    assertTrue(findOrCreateOrphanTask(tasknode).batch == batch);
205    assertEquals(1L, tot_mgr_node_create_queued.sum());
206
207    LOG.debug("waiting for task node creation");
208    listener.waitForCreation();
209    LOG.debug("task created");
210    return tasknode;
211  }
212
213  /**
214   * Test whether the splitlog correctly creates a task in zookeeper
215   * @throws Exception
216   */
217  @Test
218  public void testTaskCreation() throws Exception {
219
220    LOG.info("TestTaskCreation - test the creation of a task in zk");
221    slm = new SplitLogManager(master, conf);
222    TaskBatch batch = new TaskBatch();
223
224    String tasknode = submitTaskAndWait(batch, "foo/1");
225
226    byte[] data = ZKUtil.getData(zkw, tasknode);
227    SplitLogTask slt = SplitLogTask.parseFrom(data);
228    LOG.info("Task node created " + slt.toString());
229    assertTrue(slt.isUnassigned(master.getServerName()));
230  }
231
232  @Test
233  public void testOrphanTaskAcquisition() throws Exception {
234    LOG.info("TestOrphanTaskAcquisition");
235
236    String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash");
237    SplitLogTask slt = new SplitLogTask.Owned(master.getServerName());
238    zkw.getRecoverableZooKeeper().create(tasknode, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
239        CreateMode.PERSISTENT);
240
241    slm = new SplitLogManager(master, conf);
242    waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2);
243    Task task = findOrCreateOrphanTask(tasknode);
244    assertTrue(task.isOrphan());
245    waitForCounter(tot_mgr_heartbeat, 0, 1, to/2);
246    assertFalse(task.isUnassigned());
247    long curt = System.currentTimeMillis();
248    assertTrue((task.last_update <= curt) &&
249        (task.last_update > (curt - 1000)));
250    LOG.info("waiting for manager to resubmit the orphan task");
251    waitForCounter(tot_mgr_resubmit, 0, 1, to + to/2);
252    assertTrue(task.isUnassigned());
253    waitForCounter(tot_mgr_rescan, 0, 1, to + to/2);
254  }
255
256  @Test
257  public void testUnassignedOrphan() throws Exception {
258    LOG.info("TestUnassignedOrphan - an unassigned task is resubmitted at" +
259        " startup");
260    String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash");
261    //create an unassigned orphan task
262    SplitLogTask slt = new SplitLogTask.Unassigned(master.getServerName());
263    zkw.getRecoverableZooKeeper().create(tasknode, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
264        CreateMode.PERSISTENT);
265    int version = ZKUtil.checkExists(zkw, tasknode);
266
267    slm = new SplitLogManager(master, conf);
268    waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2);
269    Task task = findOrCreateOrphanTask(tasknode);
270    assertTrue(task.isOrphan());
271    assertTrue(task.isUnassigned());
272    // wait for RESCAN node to be created
273    waitForCounter(tot_mgr_rescan, 0, 1, to / 2);
274    Task task2 = findOrCreateOrphanTask(tasknode);
275    assertTrue(task == task2);
276    LOG.debug("task = " + task);
277    assertEquals(1L, tot_mgr_resubmit.sum());
278    assertEquals(1, task.incarnation.get());
279    assertEquals(0, task.unforcedResubmits.get());
280    assertTrue(task.isOrphan());
281    assertTrue(task.isUnassigned());
282    assertTrue(ZKUtil.checkExists(zkw, tasknode) > version);
283  }
284
285  @Test
286  public void testMultipleResubmits() throws Exception {
287    LOG.info("TestMultipleResbmits - no indefinite resubmissions");
288    conf.setInt("hbase.splitlog.max.resubmit", 2);
289    slm = new SplitLogManager(master, conf);
290    TaskBatch batch = new TaskBatch();
291
292    String tasknode = submitTaskAndWait(batch, "foo/1");
293    int version = ZKUtil.checkExists(zkw, tasknode);
294    final ServerName worker1 = ServerName.valueOf("worker1,1,1");
295    final ServerName worker2 = ServerName.valueOf("worker2,1,1");
296    final ServerName worker3 = ServerName.valueOf("worker3,1,1");
297    SplitLogTask slt = new SplitLogTask.Owned(worker1);
298    ZKUtil.setData(zkw, tasknode, slt.toByteArray());
299    waitForCounter(tot_mgr_heartbeat, 0, 1, to/2);
300    waitForCounter(tot_mgr_resubmit, 0, 1, to + to/2);
301    int version1 = ZKUtil.checkExists(zkw, tasknode);
302    assertTrue(version1 > version);
303    slt = new SplitLogTask.Owned(worker2);
304    ZKUtil.setData(zkw, tasknode, slt.toByteArray());
305    waitForCounter(tot_mgr_heartbeat, 1, 2, to/2);
306    waitForCounter(tot_mgr_resubmit, 1, 2, to + to/2);
307    int version2 = ZKUtil.checkExists(zkw, tasknode);
308    assertTrue(version2 > version1);
309    slt = new SplitLogTask.Owned(worker3);
310    ZKUtil.setData(zkw, tasknode, slt.toByteArray());
311    waitForCounter(tot_mgr_heartbeat, 2, 3, to/2);
312    waitForCounter(tot_mgr_resubmit_threshold_reached, 0, 1, to + to/2);
313    Thread.sleep(to + to/2);
314    assertEquals(2L, tot_mgr_resubmit.sum() - tot_mgr_resubmit_force.sum());
315  }
316
317  @Test
318  public void testRescanCleanup() throws Exception {
319    LOG.info("TestRescanCleanup - ensure RESCAN nodes are cleaned up");
320
321    slm = new SplitLogManager(master, conf);
322    TaskBatch batch = new TaskBatch();
323
324    String tasknode = submitTaskAndWait(batch, "foo/1");
325    int version = ZKUtil.checkExists(zkw, tasknode);
326    final ServerName worker1 = ServerName.valueOf("worker1,1,1");
327    SplitLogTask slt = new SplitLogTask.Owned(worker1);
328    ZKUtil.setData(zkw, tasknode, slt.toByteArray());
329    waitForCounter(tot_mgr_heartbeat, 0, 1, to/2);
330    waitForCounter(new Expr() {
331      @Override
332      public long eval() {
333        return (tot_mgr_resubmit.sum() + tot_mgr_resubmit_failed.sum());
334      }
335    }, 0, 1, 5*60000); // wait long enough
336    Assert.assertEquals("Could not run test. Lost ZK connection?", 0, tot_mgr_resubmit_failed.sum());
337    int version1 = ZKUtil.checkExists(zkw, tasknode);
338    assertTrue(version1 > version);
339    byte[] taskstate = ZKUtil.getData(zkw, tasknode);
340    slt = SplitLogTask.parseFrom(taskstate);
341    assertTrue(slt.isUnassigned(master.getServerName()));
342
343    waitForCounter(tot_mgr_rescan_deleted, 0, 1, to/2);
344  }
345
346  @Test
347  public void testTaskDone() throws Exception {
348    LOG.info("TestTaskDone - cleanup task node once in DONE state");
349
350    slm = new SplitLogManager(master, conf);
351    TaskBatch batch = new TaskBatch();
352    String tasknode = submitTaskAndWait(batch, "foo/1");
353    final ServerName worker1 = ServerName.valueOf("worker1,1,1");
354    SplitLogTask slt = new SplitLogTask.Done(worker1);
355    ZKUtil.setData(zkw, tasknode, slt.toByteArray());
356    synchronized (batch) {
357      while (batch.installed != batch.done) {
358        batch.wait();
359      }
360    }
361    waitForCounter(tot_mgr_task_deleted, 0, 1, to/2);
362    assertTrue(ZKUtil.checkExists(zkw, tasknode) == -1);
363  }
364
365  @Test
366  public void testTaskErr() throws Exception {
367    LOG.info("TestTaskErr - cleanup task node once in ERR state");
368
369    conf.setInt("hbase.splitlog.max.resubmit", 0);
370    slm = new SplitLogManager(master, conf);
371    TaskBatch batch = new TaskBatch();
372
373    String tasknode = submitTaskAndWait(batch, "foo/1");
374    final ServerName worker1 = ServerName.valueOf("worker1,1,1");
375    SplitLogTask slt = new SplitLogTask.Err(worker1);
376    ZKUtil.setData(zkw, tasknode, slt.toByteArray());
377
378    synchronized (batch) {
379      while (batch.installed != batch.error) {
380        batch.wait();
381      }
382    }
383    waitForCounter(tot_mgr_task_deleted, 0, 1, to/2);
384    assertTrue(ZKUtil.checkExists(zkw, tasknode) == -1);
385    conf.setInt("hbase.splitlog.max.resubmit", ZKSplitLogManagerCoordination.DEFAULT_MAX_RESUBMIT);
386  }
387
388  @Test
389  public void testTaskResigned() throws Exception {
390    LOG.info("TestTaskResigned - resubmit task node once in RESIGNED state");
391    assertEquals(0, tot_mgr_resubmit.sum());
392    slm = new SplitLogManager(master, conf);
393    assertEquals(0, tot_mgr_resubmit.sum());
394    TaskBatch batch = new TaskBatch();
395    String tasknode = submitTaskAndWait(batch, "foo/1");
396    assertEquals(0, tot_mgr_resubmit.sum());
397    final ServerName worker1 = ServerName.valueOf("worker1,1,1");
398    assertEquals(0, tot_mgr_resubmit.sum());
399    SplitLogTask slt = new SplitLogTask.Resigned(worker1);
400    assertEquals(0, tot_mgr_resubmit.sum());
401    ZKUtil.setData(zkw, tasknode, slt.toByteArray());
402    ZKUtil.checkExists(zkw, tasknode);
403    // Could be small race here.
404    if (tot_mgr_resubmit.sum() == 0) {
405      waitForCounter(tot_mgr_resubmit, 0, 1, to/2);
406    }
407    assertEquals(1, tot_mgr_resubmit.sum());
408
409    byte[] taskstate = ZKUtil.getData(zkw, tasknode);
410    slt = SplitLogTask.parseFrom(taskstate);
411    assertTrue(slt.isUnassigned(master.getServerName()));
412  }
413
414  @Test
415  public void testUnassignedTimeout() throws Exception {
416    LOG.info("TestUnassignedTimeout - iff all tasks are unassigned then" +
417        " resubmit");
418
419    // create an orphan task in OWNED state
420    String tasknode1 = ZKSplitLog.getEncodedNodeName(zkw, "orphan/1");
421    final ServerName worker1 = ServerName.valueOf("worker1,1,1");
422    SplitLogTask slt = new SplitLogTask.Owned(worker1);
423    zkw.getRecoverableZooKeeper().create(tasknode1, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
424        CreateMode.PERSISTENT);
425
426    slm = new SplitLogManager(master, conf);
427    waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2);
428
429    // submit another task which will stay in unassigned mode
430    TaskBatch batch = new TaskBatch();
431    submitTaskAndWait(batch, "foo/1");
432
433    // keep updating the orphan owned node every to/2 seconds
434    for (int i = 0; i < (3 * to)/100; i++) {
435      Thread.sleep(100);
436      final ServerName worker2 = ServerName.valueOf("worker1,1,1");
437      slt = new SplitLogTask.Owned(worker2);
438      ZKUtil.setData(zkw, tasknode1, slt.toByteArray());
439    }
440
441    // since we have stopped heartbeating the owned node therefore it should
442    // get resubmitted
443    LOG.info("waiting for manager to resubmit the orphan task");
444    waitForCounter(tot_mgr_resubmit, 0, 1, to + to/2);
445
446    // now all the nodes are unassigned. manager should post another rescan
447    waitForCounter(tot_mgr_resubmit_unassigned, 0, 1, 2 * to + to/2);
448  }
449
450  @Test
451  public void testDeadWorker() throws Exception {
452    LOG.info("testDeadWorker");
453
454    conf.setLong("hbase.splitlog.max.resubmit", 0);
455    slm = new SplitLogManager(master, conf);
456    TaskBatch batch = new TaskBatch();
457
458    String tasknode = submitTaskAndWait(batch, "foo/1");
459    int version = ZKUtil.checkExists(zkw, tasknode);
460    final ServerName worker1 = ServerName.valueOf("worker1,1,1");
461    SplitLogTask slt = new SplitLogTask.Owned(worker1);
462    ZKUtil.setData(zkw, tasknode, slt.toByteArray());
463    if (tot_mgr_heartbeat.sum() == 0) waitForCounter(tot_mgr_heartbeat, 0, 1, to/2);
464    slm.handleDeadWorker(worker1);
465    if (tot_mgr_resubmit.sum() == 0) waitForCounter(tot_mgr_resubmit, 0, 1, to+to/2);
466    if (tot_mgr_resubmit_dead_server_task.sum() == 0) {
467      waitForCounter(tot_mgr_resubmit_dead_server_task, 0, 1, to + to/2);
468    }
469
470    int version1 = ZKUtil.checkExists(zkw, tasknode);
471    assertTrue(version1 > version);
472    byte[] taskstate = ZKUtil.getData(zkw, tasknode);
473    slt = SplitLogTask.parseFrom(taskstate);
474    assertTrue(slt.isUnassigned(master.getServerName()));
475    return;
476  }
477
478  @Test
479  public void testWorkerCrash() throws Exception {
480    slm = new SplitLogManager(master, conf);
481    TaskBatch batch = new TaskBatch();
482
483    String tasknode = submitTaskAndWait(batch, "foo/1");
484    final ServerName worker1 = ServerName.valueOf("worker1,1,1");
485
486    SplitLogTask slt = new SplitLogTask.Owned(worker1);
487    ZKUtil.setData(zkw, tasknode, slt.toByteArray());
488    if (tot_mgr_heartbeat.sum() == 0) waitForCounter(tot_mgr_heartbeat, 0, 1, to/2);
489
490    // Not yet resubmitted.
491    Assert.assertEquals(0, tot_mgr_resubmit.sum());
492
493    // This server becomes dead
494    Mockito.when(sm.isServerOnline(worker1)).thenReturn(false);
495
496    Thread.sleep(1300); // The timeout checker is done every 1000 ms (hardcoded).
497
498    // It has been resubmitted
499    Assert.assertEquals(1, tot_mgr_resubmit.sum());
500  }
501
502  @Test
503  public void testEmptyLogDir() throws Exception {
504    LOG.info("testEmptyLogDir");
505    slm = new SplitLogManager(master, conf);
506    FileSystem fs = TEST_UTIL.getTestFileSystem();
507    Path emptyLogDirPath = new Path(new Path(fs.getWorkingDirectory(), HConstants.HREGION_LOGDIR_NAME),
508        ServerName.valueOf("emptyLogDir", 1, 1).toString());
509    fs.mkdirs(emptyLogDirPath);
510    slm.splitLogDistributed(emptyLogDirPath);
511    assertFalse(fs.exists(emptyLogDirPath));
512  }
513
514  @Test
515  public void testLogFilesAreArchived() throws Exception {
516    LOG.info("testLogFilesAreArchived");
517    slm = new SplitLogManager(master, conf);
518    FileSystem fs = TEST_UTIL.getTestFileSystem();
519    Path dir = TEST_UTIL.getDataTestDirOnTestFS("testLogFilesAreArchived");
520    conf.set(HConstants.HBASE_DIR, dir.toString());
521    String serverName = ServerName.valueOf("foo", 1, 1).toString();
522    Path logDirPath = new Path(new Path(dir, HConstants.HREGION_LOGDIR_NAME), serverName);
523    fs.mkdirs(logDirPath);
524    // create an empty log file
525    String logFile = new Path(logDirPath, TEST_UTIL.getRandomUUID().toString()).toString();
526    fs.create(new Path(logDirPath, logFile)).close();
527
528    // spin up a thread mocking split done.
529    new Thread() {
530      @Override
531      public void run() {
532        boolean done = false;
533        while (!done) {
534          for (Map.Entry<String, Task> entry : slm.getTasks().entrySet()) {
535            final ServerName worker1 = ServerName.valueOf("worker1,1,1");
536            SplitLogTask slt = new SplitLogTask.Done(worker1);
537            boolean encounteredZKException = false;
538            try {
539              ZKUtil.setData(zkw, entry.getKey(), slt.toByteArray());
540            } catch (KeeperException e) {
541              LOG.warn(e.toString(), e);
542              encounteredZKException = true;
543            }
544            if (!encounteredZKException) {
545              done = true;
546            }
547          }
548        }
549      };
550    }.start();
551
552    slm.splitLogDistributed(logDirPath);
553
554    assertFalse(fs.exists(logDirPath));
555  }
556}