002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master;
020import static org.apache.hadoop.hbase.SplitLogCounters.resetCounters;
021import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_heartbeat;
022import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_node_create_queued;
023import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_orphan_task_acquired;
024import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_rescan;
025import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_rescan_deleted;
026import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit;
027import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_dead_server_task;
028import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_failed;
029import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_force;
030import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_threshold_reached;
031import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_unassigned;
032import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_task_deleted;
033import static org.junit.Assert.assertEquals;
034import static org.junit.Assert.assertFalse;
035import static org.junit.Assert.assertTrue;
037import java.io.IOException;
038import java.util.Map;
039import java.util.UUID;
040import java.util.concurrent.atomic.LongAdder;
041import org.apache.hadoop.conf.Configuration;
042import org.apache.hadoop.fs.FileSystem;
043import org.apache.hadoop.fs.Path;
044import org.apache.hadoop.hbase.CoordinatedStateManager;
045import org.apache.hadoop.hbase.HBaseClassTestRule;
046import org.apache.hadoop.hbase.HBaseTestingUtility;
047import org.apache.hadoop.hbase.HConstants;
048import org.apache.hadoop.hbase.ServerName;
049import org.apache.hadoop.hbase.SplitLogCounters;
050import org.apache.hadoop.hbase.SplitLogTask;
051import org.apache.hadoop.hbase.Waiter;
052import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination;
053import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager;
054import org.apache.hadoop.hbase.master.SplitLogManager.Task;
055import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch;
056import org.apache.hadoop.hbase.regionserver.TestMasterAddressTracker.NodeCreationListener;
057import org.apache.hadoop.hbase.testclassification.MasterTests;
058import org.apache.hadoop.hbase.testclassification.MediumTests;
059import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
060import org.apache.hadoop.hbase.zookeeper.ZKUtil;
061import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
062import org.apache.zookeeper.CreateMode;
063import org.apache.zookeeper.KeeperException;
064import org.apache.zookeeper.ZooDefs.Ids;
065import org.junit.After;
066import org.junit.Assert;
067import org.junit.Before;
068import org.junit.ClassRule;
069import org.junit.Test;
070import org.junit.experimental.categories.Category;
071import org.mockito.Mockito;
072import org.slf4j.Logger;
073import org.slf4j.LoggerFactory;
075@Category({MasterTests.class, MediumTests.class})
076public class TestSplitLogManager {
078  @ClassRule
079  public static final HBaseClassTestRule CLASS_RULE =
080      HBaseClassTestRule.forClass(TestSplitLogManager.class);
082  private static final Logger LOG = LoggerFactory.getLogger(TestSplitLogManager.class);
084  private final ServerManager sm = Mockito.mock(ServerManager.class);
086  private ZKWatcher zkw;
087  private DummyMasterServices master;
088  private SplitLogManager slm;
089  private Configuration conf;
090  private int to;
092  private static HBaseTestingUtility TEST_UTIL;
094  class DummyMasterServices extends MockNoopMasterServices {
095    private ZKWatcher zkw;
096    private CoordinatedStateManager cm;
098    public DummyMasterServices(ZKWatcher zkw, Configuration conf) {
099      super(conf);
100      this.zkw = zkw;
101      cm = new ZkCoordinatedStateManager(this);
102    }
104    @Override
105    public ZKWatcher getZooKeeper() {
106      return zkw;
107    }
109    @Override
110    public CoordinatedStateManager getCoordinatedStateManager() {
111      return cm;
112    }
114    @Override
115    public ServerManager getServerManager() {
116      return sm;
117    }
118  }
120  @Before
121  public void setup() throws Exception {
122    TEST_UTIL = new HBaseTestingUtility();
123    TEST_UTIL.startMiniZKCluster();
124    conf = TEST_UTIL.getConfiguration();
125    // Use a different ZK wrapper instance for each tests.
126    zkw =
127        new ZKWatcher(conf, "split-log-manager-tests" + UUID.randomUUID().toString(), null);
128    master = new DummyMasterServices(zkw, conf);
130    ZKUtil.deleteChildrenRecursively(zkw, zkw.znodePaths.baseZNode);
131    ZKUtil.createAndFailSilent(zkw, zkw.znodePaths.baseZNode);
132    assertTrue(ZKUtil.checkExists(zkw, zkw.znodePaths.baseZNode) != -1);
133    LOG.debug(zkw.znodePaths.baseZNode + " created");
134    ZKUtil.createAndFailSilent(zkw, zkw.znodePaths.splitLogZNode);
135    assertTrue(ZKUtil.checkExists(zkw, zkw.znodePaths.splitLogZNode) != -1);
136    LOG.debug(zkw.znodePaths.splitLogZNode + " created");
138    resetCounters();
140    // By default, we let the test manage the error as before, so the server
141    // does not appear as dead from the master point of view, only from the split log pov.
142    Mockito.when(sm.isServerOnline(Mockito.any())).thenReturn(true);
144    to = 12000;
145    conf.setInt(HConstants.HBASE_SPLITLOG_MANAGER_TIMEOUT, to);
146    conf.setInt("hbase.splitlog.manager.unassigned.timeout", 2 * to);
148    conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100);
149    to = to + 16 * 100;
150  }
152  @After
153  public void teardown() throws IOException, KeeperException {
154    master.stop("");
155    if (slm != null) slm.stop();
156    TEST_UTIL.shutdownMiniZKCluster();
157  }
159  private interface Expr {
160    long eval();
161  }
163  private void waitForCounter(final LongAdder ctr, long oldval, long newval, long timems)
164      throws Exception {
165    Expr e = new Expr() {
166      @Override
167      public long eval() {
168        return ctr.sum();
169      }
170    };
171    waitForCounter(e, oldval, newval, timems);
172    return;
173  }
175  private void waitForCounter(final Expr e, final long oldval, long newval, long timems)
176      throws Exception {
178    TEST_UTIL.waitFor(timems, 10, new Waiter.Predicate<Exception>() {
179      @Override
180      public boolean evaluate() throws Exception {
181        return (e.eval() != oldval);
182      }
183    });
185    assertEquals(newval, e.eval());
186  }
188  private Task findOrCreateOrphanTask(String path) {
189    return slm.tasks.computeIfAbsent(path, k -> {
190      LOG.info("creating orphan task " + k);
191      SplitLogCounters.tot_mgr_orphan_task_acquired.increment();
192      return new Task();
193    });
194  }
196  private String submitTaskAndWait(TaskBatch batch, String name) throws KeeperException,
197      InterruptedException {
198    String tasknode = ZKSplitLog.getEncodedNodeName(zkw, name);
199    NodeCreationListener listener = new NodeCreationListener(zkw, tasknode);
200    zkw.registerListener(listener);
201    ZKUtil.watchAndCheckExists(zkw, tasknode);
203    slm.enqueueSplitTask(name, batch);
204    assertEquals(1, batch.installed);
205    assertTrue(findOrCreateOrphanTask(tasknode).batch == batch);
206    assertEquals(1L, tot_mgr_node_create_queued.sum());
208    LOG.debug("waiting for task node creation");
209    listener.waitForCreation();
210    LOG.debug("task created");
211    return tasknode;
212  }
214  /**
215   * Test whether the splitlog correctly creates a task in zookeeper
216   * @throws Exception
217   */
218  @Test
219  public void testTaskCreation() throws Exception {
221    LOG.info("TestTaskCreation - test the creation of a task in zk");
222    slm = new SplitLogManager(master, conf);
223    TaskBatch batch = new TaskBatch();
225    String tasknode = submitTaskAndWait(batch, "foo/1");
227    byte[] data = ZKUtil.getData(zkw, tasknode);
228    SplitLogTask slt = SplitLogTask.parseFrom(data);
229    LOG.info("Task node created " + slt.toString());
230    assertTrue(slt.isUnassigned(master.getServerName()));
231  }
233  @Test
234  public void testOrphanTaskAcquisition() throws Exception {
235    LOG.info("TestOrphanTaskAcquisition");
237    String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash");
238    SplitLogTask slt = new SplitLogTask.Owned(master.getServerName());
239    zkw.getRecoverableZooKeeper().create(tasknode, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
240        CreateMode.PERSISTENT);
242    slm = new SplitLogManager(master, conf);
243    waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2);
244    Task task = findOrCreateOrphanTask(tasknode);
245    assertTrue(task.isOrphan());
246    waitForCounter(tot_mgr_heartbeat, 0, 1, to/2);
247    assertFalse(task.isUnassigned());
248    long curt = System.currentTimeMillis();
249    assertTrue((task.last_update <= curt) &&
250        (task.last_update > (curt - 1000)));
251    LOG.info("waiting for manager to resubmit the orphan task");
252    waitForCounter(tot_mgr_resubmit, 0, 1, to + to/2);
253    assertTrue(task.isUnassigned());
254    waitForCounter(tot_mgr_rescan, 0, 1, to + to/2);
255  }
257  @Test
258  public void testUnassignedOrphan() throws Exception {
259    LOG.info("TestUnassignedOrphan - an unassigned task is resubmitted at" +
260        " startup");
261    String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash");
262    //create an unassigned orphan task
263    SplitLogTask slt = new SplitLogTask.Unassigned(master.getServerName());
264    zkw.getRecoverableZooKeeper().create(tasknode, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
265        CreateMode.PERSISTENT);
266    int version = ZKUtil.checkExists(zkw, tasknode);
268    slm = new SplitLogManager(master, conf);
269    waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2);
270    Task task = findOrCreateOrphanTask(tasknode);
271    assertTrue(task.isOrphan());
272    assertTrue(task.isUnassigned());
273    // wait for RESCAN node to be created
274    waitForCounter(tot_mgr_rescan, 0, 1, to / 2);
275    Task task2 = findOrCreateOrphanTask(tasknode);
276    assertTrue(task == task2);
277    LOG.debug("task = " + task);
278    assertEquals(1L, tot_mgr_resubmit.sum());
279    assertEquals(1, task.incarnation.get());
280    assertEquals(0, task.unforcedResubmits.get());
281    assertTrue(task.isOrphan());
282    assertTrue(task.isUnassigned());
283    assertTrue(ZKUtil.checkExists(zkw, tasknode) > version);
284  }
286  @Test
287  public void testMultipleResubmits() throws Exception {
288    LOG.info("TestMultipleResbmits - no indefinite resubmissions");
289    conf.setInt("hbase.splitlog.max.resubmit", 2);
290    slm = new SplitLogManager(master, conf);
291    TaskBatch batch = new TaskBatch();
293    String tasknode = submitTaskAndWait(batch, "foo/1");
294    int version = ZKUtil.checkExists(zkw, tasknode);
295    final ServerName worker1 = ServerName.valueOf("worker1,1,1");
296    final ServerName worker2 = ServerName.valueOf("worker2,1,1");
297    final ServerName worker3 = ServerName.valueOf("worker3,1,1");
298    SplitLogTask slt = new SplitLogTask.Owned(worker1);
299    ZKUtil.setData(zkw, tasknode, slt.toByteArray());
300    waitForCounter(tot_mgr_heartbeat, 0, 1, to/2);
301    waitForCounter(tot_mgr_resubmit, 0, 1, to + to/2);
302    int version1 = ZKUtil.checkExists(zkw, tasknode);
303    assertTrue(version1 > version);
304    slt = new SplitLogTask.Owned(worker2);
305    ZKUtil.setData(zkw, tasknode, slt.toByteArray());
306    waitForCounter(tot_mgr_heartbeat, 1, 2, to/2);
307    waitForCounter(tot_mgr_resubmit, 1, 2, to + to/2);
308    int version2 = ZKUtil.checkExists(zkw, tasknode);
309    assertTrue(version2 > version1);
310    slt = new SplitLogTask.Owned(worker3);
311    ZKUtil.setData(zkw, tasknode, slt.toByteArray());
312    waitForCounter(tot_mgr_heartbeat, 2, 3, to/2);
313    waitForCounter(tot_mgr_resubmit_threshold_reached, 0, 1, to + to/2);
314    Thread.sleep(to + to/2);
315    assertEquals(2L, tot_mgr_resubmit.sum() - tot_mgr_resubmit_force.sum());
316  }
318  @Test
319  public void testRescanCleanup() throws Exception {
320    LOG.info("TestRescanCleanup - ensure RESCAN nodes are cleaned up");
322    slm = new SplitLogManager(master, conf);
323    TaskBatch batch = new TaskBatch();
325    String tasknode = submitTaskAndWait(batch, "foo/1");
326    int version = ZKUtil.checkExists(zkw, tasknode);
327    final ServerName worker1 = ServerName.valueOf("worker1,1,1");
328    SplitLogTask slt = new SplitLogTask.Owned(worker1);
329    ZKUtil.setData(zkw, tasknode, slt.toByteArray());
330    waitForCounter(tot_mgr_heartbeat, 0, 1, to/2);
331    waitForCounter(new Expr() {
332      @Override
333      public long eval() {
334        return (tot_mgr_resubmit.sum() + tot_mgr_resubmit_failed.sum());
335      }
336    }, 0, 1, 5*60000); // wait long enough
337    Assert.assertEquals("Could not run test. Lost ZK connection?", 0, tot_mgr_resubmit_failed.sum());
338    int version1 = ZKUtil.checkExists(zkw, tasknode);
339    assertTrue(version1 > version);
340    byte[] taskstate = ZKUtil.getData(zkw, tasknode);
341    slt = SplitLogTask.parseFrom(taskstate);
342    assertTrue(slt.isUnassigned(master.getServerName()));
344    waitForCounter(tot_mgr_rescan_deleted, 0, 1, to/2);
345  }
347  @Test
348  public void testTaskDone() throws Exception {
349    LOG.info("TestTaskDone - cleanup task node once in DONE state");
351    slm = new SplitLogManager(master, conf);
352    TaskBatch batch = new TaskBatch();
353    String tasknode = submitTaskAndWait(batch, "foo/1");
354    final ServerName worker1 = ServerName.valueOf("worker1,1,1");
355    SplitLogTask slt = new SplitLogTask.Done(worker1);
356    ZKUtil.setData(zkw, tasknode, slt.toByteArray());
357    synchronized (batch) {
358      while (batch.installed != batch.done) {
359        batch.wait();
360      }
361    }
362    waitForCounter(tot_mgr_task_deleted, 0, 1, to/2);
363    assertTrue(ZKUtil.checkExists(zkw, tasknode) == -1);
364  }
366  @Test
367  public void testTaskErr() throws Exception {
368    LOG.info("TestTaskErr - cleanup task node once in ERR state");
370    conf.setInt("hbase.splitlog.max.resubmit", 0);
371    slm = new SplitLogManager(master, conf);
372    TaskBatch batch = new TaskBatch();
374    String tasknode = submitTaskAndWait(batch, "foo/1");
375    final ServerName worker1 = ServerName.valueOf("worker1,1,1");
376    SplitLogTask slt = new SplitLogTask.Err(worker1);
377    ZKUtil.setData(zkw, tasknode, slt.toByteArray());
379    synchronized (batch) {
380      while (batch.installed != batch.error) {
381        batch.wait();
382      }
383    }
384    waitForCounter(tot_mgr_task_deleted, 0, 1, to/2);
385    assertTrue(ZKUtil.checkExists(zkw, tasknode) == -1);
386    conf.setInt("hbase.splitlog.max.resubmit", ZKSplitLogManagerCoordination.DEFAULT_MAX_RESUBMIT);
387  }
389  @Test
390  public void testTaskResigned() throws Exception {
391    LOG.info("TestTaskResigned - resubmit task node once in RESIGNED state");
392    assertEquals(0, tot_mgr_resubmit.sum());
393    slm = new SplitLogManager(master, conf);
394    assertEquals(0, tot_mgr_resubmit.sum());
395    TaskBatch batch = new TaskBatch();
396    String tasknode = submitTaskAndWait(batch, "foo/1");
397    assertEquals(0, tot_mgr_resubmit.sum());
398    final ServerName worker1 = ServerName.valueOf("worker1,1,1");
399    assertEquals(0, tot_mgr_resubmit.sum());
400    SplitLogTask slt = new SplitLogTask.Resigned(worker1);
401    assertEquals(0, tot_mgr_resubmit.sum());
402    ZKUtil.setData(zkw, tasknode, slt.toByteArray());
403    ZKUtil.checkExists(zkw, tasknode);
404    // Could be small race here.
405    if (tot_mgr_resubmit.sum() == 0) {
406      waitForCounter(tot_mgr_resubmit, 0, 1, to/2);
407    }
408    assertEquals(1, tot_mgr_resubmit.sum());
410    byte[] taskstate = ZKUtil.getData(zkw, tasknode);
411    slt = SplitLogTask.parseFrom(taskstate);
412    assertTrue(slt.isUnassigned(master.getServerName()));
413  }
415  @Test
416  public void testUnassignedTimeout() throws Exception {
417    LOG.info("TestUnassignedTimeout - iff all tasks are unassigned then" +
418        " resubmit");
420    // create an orphan task in OWNED state
421    String tasknode1 = ZKSplitLog.getEncodedNodeName(zkw, "orphan/1");
422    final ServerName worker1 = ServerName.valueOf("worker1,1,1");
423    SplitLogTask slt = new SplitLogTask.Owned(worker1);
424    zkw.getRecoverableZooKeeper().create(tasknode1, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
425        CreateMode.PERSISTENT);
427    slm = new SplitLogManager(master, conf);
428    waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2);
430    // submit another task which will stay in unassigned mode
431    TaskBatch batch = new TaskBatch();
432    submitTaskAndWait(batch, "foo/1");
434    // keep updating the orphan owned node every to/2 seconds
435    for (int i = 0; i < (3 * to)/100; i++) {
436      Thread.sleep(100);
437      final ServerName worker2 = ServerName.valueOf("worker1,1,1");
438      slt = new SplitLogTask.Owned(worker2);
439      ZKUtil.setData(zkw, tasknode1, slt.toByteArray());
440    }
442    // since we have stopped heartbeating the owned node therefore it should
443    // get resubmitted
444    LOG.info("waiting for manager to resubmit the orphan task");
445    waitForCounter(tot_mgr_resubmit, 0, 1, to + to/2);
447    // now all the nodes are unassigned. manager should post another rescan
448    waitForCounter(tot_mgr_resubmit_unassigned, 0, 1, 2 * to + to/2);
449  }
451  @Test
452  public void testDeadWorker() throws Exception {
453    LOG.info("testDeadWorker");
455    conf.setLong("hbase.splitlog.max.resubmit", 0);
456    slm = new SplitLogManager(master, conf);
457    TaskBatch batch = new TaskBatch();
459    String tasknode = submitTaskAndWait(batch, "foo/1");
460    int version = ZKUtil.checkExists(zkw, tasknode);
461    final ServerName worker1 = ServerName.valueOf("worker1,1,1");
462    SplitLogTask slt = new SplitLogTask.Owned(worker1);
463    ZKUtil.setData(zkw, tasknode, slt.toByteArray());
464    if (tot_mgr_heartbeat.sum() == 0) waitForCounter(tot_mgr_heartbeat, 0, 1, to/2);
465    slm.handleDeadWorker(worker1);
466    if (tot_mgr_resubmit.sum() == 0) waitForCounter(tot_mgr_resubmit, 0, 1, to+to/2);
467    if (tot_mgr_resubmit_dead_server_task.sum() == 0) {
468      waitForCounter(tot_mgr_resubmit_dead_server_task, 0, 1, to + to/2);
469    }
471    int version1 = ZKUtil.checkExists(zkw, tasknode);
472    assertTrue(version1 > version);
473    byte[] taskstate = ZKUtil.getData(zkw, tasknode);
474    slt = SplitLogTask.parseFrom(taskstate);
475    assertTrue(slt.isUnassigned(master.getServerName()));
476    return;
477  }
479  @Test
480  public void testWorkerCrash() throws Exception {
481    slm = new SplitLogManager(master, conf);
482    TaskBatch batch = new TaskBatch();
484    String tasknode = submitTaskAndWait(batch, "foo/1");
485    final ServerName worker1 = ServerName.valueOf("worker1,1,1");
487    SplitLogTask slt = new SplitLogTask.Owned(worker1);
488    ZKUtil.setData(zkw, tasknode, slt.toByteArray());
489    if (tot_mgr_heartbeat.sum() == 0) waitForCounter(tot_mgr_heartbeat, 0, 1, to/2);
491    // Not yet resubmitted.
492    Assert.assertEquals(0, tot_mgr_resubmit.sum());
494    // This server becomes dead
495    Mockito.when(sm.isServerOnline(worker1)).thenReturn(false);
497    Thread.sleep(1300); // The timeout checker is done every 1000 ms (hardcoded).
499    // It has been resubmitted
500    Assert.assertEquals(1, tot_mgr_resubmit.sum());
501  }
503  @Test
504  public void testEmptyLogDir() throws Exception {
505    LOG.info("testEmptyLogDir");
506    slm = new SplitLogManager(master, conf);
507    FileSystem fs = TEST_UTIL.getTestFileSystem();
508    Path emptyLogDirPath = new Path(new Path(fs.getWorkingDirectory(), HConstants.HREGION_LOGDIR_NAME),
509        ServerName.valueOf("emptyLogDir", 1, 1).toString());
510    fs.mkdirs(emptyLogDirPath);
511    slm.splitLogDistributed(emptyLogDirPath);
512    assertFalse(fs.exists(emptyLogDirPath));
513  }
515  @Test
516  public void testLogFilesAreArchived() throws Exception {
517    LOG.info("testLogFilesAreArchived");
518    slm = new SplitLogManager(master, conf);
519    FileSystem fs = TEST_UTIL.getTestFileSystem();
520    Path dir = TEST_UTIL.getDataTestDirOnTestFS("testLogFilesAreArchived");
521    conf.set(HConstants.HBASE_DIR, dir.toString());
522    String serverName = ServerName.valueOf("foo", 1, 1).toString();
523    Path logDirPath = new Path(new Path(dir, HConstants.HREGION_LOGDIR_NAME), serverName);
524    fs.mkdirs(logDirPath);
525    // create an empty log file
526    String logFile = new Path(logDirPath, UUID.randomUUID().toString()).toString();
527    fs.create(new Path(logDirPath, logFile)).close();
529    // spin up a thread mocking split done.
530    new Thread() {
531      @Override
532      public void run() {
533        boolean done = false;
534        while (!done) {
535          for (Map.Entry<String, Task> entry : slm.getTasks().entrySet()) {
536            final ServerName worker1 = ServerName.valueOf("worker1,1,1");
537            SplitLogTask slt = new SplitLogTask.Done(worker1);
538            boolean encounteredZKException = false;
539            try {
540              ZKUtil.setData(zkw, entry.getKey(), slt.toByteArray());
541            } catch (KeeperException e) {
542              LOG.warn(e.toString(), e);
543              encounteredZKException = true;
544            }
545            if (!encounteredZKException) {
546              done = true;
547            }
548          }
549        }
550      };
551    }.start();
553    slm.splitLogDistributed(logDirPath);
555    assertFalse(fs.exists(logDirPath));
556  }