001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master;
019
020import static org.apache.hadoop.hbase.SplitLogCounters.resetCounters;
021import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_heartbeat;
022import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_node_create_queued;
023import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_orphan_task_acquired;
024import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_rescan;
025import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_rescan_deleted;
026import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit;
027import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_dead_server_task;
028import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_failed;
029import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_force;
030import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_threshold_reached;
031import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_unassigned;
032import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_task_deleted;
033import static org.junit.Assert.assertEquals;
034import static org.junit.Assert.assertFalse;
035import static org.junit.Assert.assertTrue;
036
037import java.io.IOException;
038import java.util.Map;
039import java.util.concurrent.atomic.LongAdder;
040import org.apache.hadoop.conf.Configuration;
041import org.apache.hadoop.fs.FileSystem;
042import org.apache.hadoop.fs.Path;
043import org.apache.hadoop.hbase.CoordinatedStateManager;
044import org.apache.hadoop.hbase.HBaseClassTestRule;
045import org.apache.hadoop.hbase.HBaseTestingUtil;
046import org.apache.hadoop.hbase.HConstants;
047import org.apache.hadoop.hbase.ServerName;
048import org.apache.hadoop.hbase.SplitLogCounters;
049import org.apache.hadoop.hbase.SplitLogTask;
050import org.apache.hadoop.hbase.Waiter;
051import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination;
052import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager;
053import org.apache.hadoop.hbase.master.SplitLogManager.Task;
054import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch;
055import org.apache.hadoop.hbase.testclassification.LargeTests;
056import org.apache.hadoop.hbase.testclassification.MasterTests;
057import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
058import org.apache.hadoop.hbase.zookeeper.TestMasterAddressTracker.NodeCreationListener;
059import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
060import org.apache.hadoop.hbase.zookeeper.ZKUtil;
061import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
062import org.apache.zookeeper.CreateMode;
063import org.apache.zookeeper.KeeperException;
064import org.apache.zookeeper.ZooDefs.Ids;
065import org.junit.After;
066import org.junit.Assert;
067import org.junit.Before;
068import org.junit.ClassRule;
069import org.junit.Test;
070import org.junit.experimental.categories.Category;
071import org.mockito.Mockito;
072import org.slf4j.Logger;
073import org.slf4j.LoggerFactory;
074
075@Category({MasterTests.class, LargeTests.class})
076public class TestSplitLogManager {
077
078  @ClassRule
079  public static final HBaseClassTestRule CLASS_RULE =
080      HBaseClassTestRule.forClass(TestSplitLogManager.class);
081
082  private static final Logger LOG = LoggerFactory.getLogger(TestSplitLogManager.class);
083
084  private final ServerManager sm = Mockito.mock(ServerManager.class);
085
086  private ZKWatcher zkw;
087  private DummyMasterServices master;
088  private SplitLogManager slm;
089  private Configuration conf;
090  private int to;
091
092  private static HBaseTestingUtil TEST_UTIL;
093
094  class DummyMasterServices extends MockNoopMasterServices {
095    private ZKWatcher zkw;
096    private CoordinatedStateManager cm;
097
098    public DummyMasterServices(ZKWatcher zkw, Configuration conf) {
099      super(conf);
100      this.zkw = zkw;
101      cm = new ZkCoordinatedStateManager(this);
102    }
103
104    @Override
105    public ZKWatcher getZooKeeper() {
106      return zkw;
107    }
108
109    @Override
110    public CoordinatedStateManager getCoordinatedStateManager() {
111      return cm;
112    }
113
114    @Override
115    public ServerManager getServerManager() {
116      return sm;
117    }
118  }
119
120  @Before
121  public void setup() throws Exception {
122    TEST_UTIL = new HBaseTestingUtil();
123    TEST_UTIL.startMiniZKCluster();
124    conf = TEST_UTIL.getConfiguration();
125    // Use a different ZK wrapper instance for each tests.
126    zkw =
127        new ZKWatcher(conf, "split-log-manager-tests" + TEST_UTIL.getRandomUUID().toString(), null);
128    master = new DummyMasterServices(zkw, conf);
129
130    ZKUtil.deleteChildrenRecursively(zkw, zkw.getZNodePaths().baseZNode);
131    ZKUtil.createAndFailSilent(zkw, zkw.getZNodePaths().baseZNode);
132    assertTrue(ZKUtil.checkExists(zkw, zkw.getZNodePaths().baseZNode) != -1);
133    LOG.debug(zkw.getZNodePaths().baseZNode + " created");
134    ZKUtil.createAndFailSilent(zkw, zkw.getZNodePaths().splitLogZNode);
135    assertTrue(ZKUtil.checkExists(zkw, zkw.getZNodePaths().splitLogZNode) != -1);
136    LOG.debug(zkw.getZNodePaths().splitLogZNode + " created");
137
138    resetCounters();
139
140    // By default, we let the test manage the error as before, so the server
141    // does not appear as dead from the master point of view, only from the split log pov.
142    Mockito.when(sm.isServerOnline(Mockito.any())).thenReturn(true);
143
144    to = 12000;
145    conf.setInt(HConstants.HBASE_SPLITLOG_MANAGER_TIMEOUT, to);
146    conf.setInt("hbase.splitlog.manager.unassigned.timeout", 2 * to);
147
148    conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100);
149    to = to + 16 * 100;
150  }
151
152  @After
153  public void teardown() throws IOException, KeeperException {
154    master.stop("");
155    if (slm != null) {
156      slm.stop();
157    }
158    TEST_UTIL.shutdownMiniZKCluster();
159  }
160
161  @Test
162  public void testBatchWaitMillis() {
163    assertEquals(100, SplitLogManager.getBatchWaitTimeMillis(0));
164    assertEquals(100, SplitLogManager.getBatchWaitTimeMillis(1));
165    assertEquals(1000, SplitLogManager.getBatchWaitTimeMillis(10));
166    assertEquals(60_000, SplitLogManager.getBatchWaitTimeMillis(101));
167    assertEquals(60_000, SplitLogManager.getBatchWaitTimeMillis(1011));
168  }
169
170  private interface Expr {
171    long eval();
172  }
173
174  private void waitForCounter(final LongAdder ctr, long oldval, long newval, long timems)
175      throws Exception {
176    Expr e = new Expr() {
177      @Override
178      public long eval() {
179        return ctr.sum();
180      }
181    };
182    waitForCounter(e, oldval, newval, timems);
183    return;
184  }
185
186  private void waitForCounter(final Expr e, final long oldval, long newval, long timems)
187      throws Exception {
188
189    TEST_UTIL.waitFor(timems, 10, new Waiter.Predicate<Exception>() {
190      @Override
191      public boolean evaluate() throws Exception {
192        return (e.eval() != oldval);
193      }
194    });
195
196    assertEquals(newval, e.eval());
197  }
198
199  private Task findOrCreateOrphanTask(String path) {
200    return slm.tasks.computeIfAbsent(path, k -> {
201      LOG.info("creating orphan task " + k);
202      SplitLogCounters.tot_mgr_orphan_task_acquired.increment();
203      return new Task();
204    });
205  }
206
207  private String submitTaskAndWait(TaskBatch batch, String name) throws KeeperException,
208      InterruptedException {
209    String tasknode = ZKSplitLog.getEncodedNodeName(zkw, name);
210    NodeCreationListener listener = new NodeCreationListener(zkw, tasknode);
211    zkw.registerListener(listener);
212    ZKUtil.watchAndCheckExists(zkw, tasknode);
213
214    slm.enqueueSplitTask(name, batch);
215    assertEquals(1, batch.installed);
216    assertTrue(findOrCreateOrphanTask(tasknode).batch == batch);
217    assertEquals(1L, tot_mgr_node_create_queued.sum());
218
219    LOG.debug("waiting for task node creation");
220    listener.waitForCreation();
221    LOG.debug("task created");
222    return tasknode;
223  }
224
225  /**
226   * Test whether the splitlog correctly creates a task in zookeeper
227   */
228  @Test
229  public void testTaskCreation() throws Exception {
230
231    LOG.info("TestTaskCreation - test the creation of a task in zk");
232    slm = new SplitLogManager(master, conf);
233    TaskBatch batch = new TaskBatch();
234
235    String tasknode = submitTaskAndWait(batch, "foo/1");
236
237    byte[] data = ZKUtil.getData(zkw, tasknode);
238    SplitLogTask slt = SplitLogTask.parseFrom(data);
239    LOG.info("Task node created " + slt.toString());
240    assertTrue(slt.isUnassigned(master.getServerName()));
241  }
242
243  @Test
244  public void testOrphanTaskAcquisition() throws Exception {
245    LOG.info("TestOrphanTaskAcquisition");
246
247    String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash");
248    SplitLogTask slt = new SplitLogTask.Owned(master.getServerName());
249    zkw.getRecoverableZooKeeper().create(tasknode, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
250        CreateMode.PERSISTENT);
251
252    slm = new SplitLogManager(master, conf);
253    waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2);
254    Task task = findOrCreateOrphanTask(tasknode);
255    assertTrue(task.isOrphan());
256    waitForCounter(tot_mgr_heartbeat, 0, 1, to/2);
257    assertFalse(task.isUnassigned());
258    long curt = EnvironmentEdgeManager.currentTime();
259    assertTrue((task.last_update <= curt) &&
260        (task.last_update > (curt - 1000)));
261    LOG.info("waiting for manager to resubmit the orphan task");
262    waitForCounter(tot_mgr_resubmit, 0, 1, to + to/2);
263    assertTrue(task.isUnassigned());
264    waitForCounter(tot_mgr_rescan, 0, 1, to + to/2);
265  }
266
267  @Test
268  public void testUnassignedOrphan() throws Exception {
269    LOG.info("TestUnassignedOrphan - an unassigned task is resubmitted at" +
270        " startup");
271    String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash");
272    //create an unassigned orphan task
273    SplitLogTask slt = new SplitLogTask.Unassigned(master.getServerName());
274    zkw.getRecoverableZooKeeper().create(tasknode, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
275        CreateMode.PERSISTENT);
276    int version = ZKUtil.checkExists(zkw, tasknode);
277
278    slm = new SplitLogManager(master, conf);
279    waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2);
280    Task task = findOrCreateOrphanTask(tasknode);
281    assertTrue(task.isOrphan());
282    assertTrue(task.isUnassigned());
283    // wait for RESCAN node to be created
284    waitForCounter(tot_mgr_rescan, 0, 1, to / 2);
285    Task task2 = findOrCreateOrphanTask(tasknode);
286    assertTrue(task == task2);
287    LOG.debug("task = " + task);
288    assertEquals(1L, tot_mgr_resubmit.sum());
289    assertEquals(1, task.incarnation.get());
290    assertEquals(0, task.unforcedResubmits.get());
291    assertTrue(task.isOrphan());
292    assertTrue(task.isUnassigned());
293    assertTrue(ZKUtil.checkExists(zkw, tasknode) > version);
294  }
295
296  @Test
297  public void testMultipleResubmits() throws Exception {
298    LOG.info("TestMultipleResbmits - no indefinite resubmissions");
299    conf.setInt("hbase.splitlog.max.resubmit", 2);
300    slm = new SplitLogManager(master, conf);
301    TaskBatch batch = new TaskBatch();
302
303    String tasknode = submitTaskAndWait(batch, "foo/1");
304    int version = ZKUtil.checkExists(zkw, tasknode);
305    final ServerName worker1 = ServerName.valueOf("worker1,1,1");
306    final ServerName worker2 = ServerName.valueOf("worker2,1,1");
307    final ServerName worker3 = ServerName.valueOf("worker3,1,1");
308    SplitLogTask slt = new SplitLogTask.Owned(worker1);
309    ZKUtil.setData(zkw, tasknode, slt.toByteArray());
310    waitForCounter(tot_mgr_heartbeat, 0, 1, to/2);
311    waitForCounter(tot_mgr_resubmit, 0, 1, to + to/2);
312    int version1 = ZKUtil.checkExists(zkw, tasknode);
313    assertTrue(version1 > version);
314    slt = new SplitLogTask.Owned(worker2);
315    ZKUtil.setData(zkw, tasknode, slt.toByteArray());
316    waitForCounter(tot_mgr_heartbeat, 1, 2, to/2);
317    waitForCounter(tot_mgr_resubmit, 1, 2, to + to/2);
318    int version2 = ZKUtil.checkExists(zkw, tasknode);
319    assertTrue(version2 > version1);
320    slt = new SplitLogTask.Owned(worker3);
321    ZKUtil.setData(zkw, tasknode, slt.toByteArray());
322    waitForCounter(tot_mgr_heartbeat, 2, 3, to/2);
323    waitForCounter(tot_mgr_resubmit_threshold_reached, 0, 1, to + to/2);
324    Thread.sleep(to + to/2);
325    assertEquals(2L, tot_mgr_resubmit.sum() - tot_mgr_resubmit_force.sum());
326  }
327
328  @Test
329  public void testRescanCleanup() throws Exception {
330    LOG.info("TestRescanCleanup - ensure RESCAN nodes are cleaned up");
331
332    slm = new SplitLogManager(master, conf);
333    TaskBatch batch = new TaskBatch();
334
335    String tasknode = submitTaskAndWait(batch, "foo/1");
336    int version = ZKUtil.checkExists(zkw, tasknode);
337    final ServerName worker1 = ServerName.valueOf("worker1,1,1");
338    SplitLogTask slt = new SplitLogTask.Owned(worker1);
339    ZKUtil.setData(zkw, tasknode, slt.toByteArray());
340    waitForCounter(tot_mgr_heartbeat, 0, 1, to/2);
341    waitForCounter(new Expr() {
342      @Override
343      public long eval() {
344        return (tot_mgr_resubmit.sum() + tot_mgr_resubmit_failed.sum());
345      }
346    }, 0, 1, 5*60000); // wait long enough
347    Assert.assertEquals("Could not run test. Lost ZK connection?",
348      0, tot_mgr_resubmit_failed.sum());
349    int version1 = ZKUtil.checkExists(zkw, tasknode);
350    assertTrue(version1 > version);
351    byte[] taskstate = ZKUtil.getData(zkw, tasknode);
352    slt = SplitLogTask.parseFrom(taskstate);
353    assertTrue(slt.isUnassigned(master.getServerName()));
354
355    waitForCounter(tot_mgr_rescan_deleted, 0, 1, to/2);
356  }
357
358  @Test
359  public void testTaskDone() throws Exception {
360    LOG.info("TestTaskDone - cleanup task node once in DONE state");
361
362    slm = new SplitLogManager(master, conf);
363    TaskBatch batch = new TaskBatch();
364    String tasknode = submitTaskAndWait(batch, "foo/1");
365    final ServerName worker1 = ServerName.valueOf("worker1,1,1");
366    SplitLogTask slt = new SplitLogTask.Done(worker1);
367    ZKUtil.setData(zkw, tasknode, slt.toByteArray());
368    synchronized (batch) {
369      while (batch.installed != batch.done) {
370        batch.wait();
371      }
372    }
373    waitForCounter(tot_mgr_task_deleted, 0, 1, to/2);
374    assertTrue(ZKUtil.checkExists(zkw, tasknode) == -1);
375  }
376
377  @Test
378  public void testTaskErr() throws Exception {
379    LOG.info("TestTaskErr - cleanup task node once in ERR state");
380
381    conf.setInt("hbase.splitlog.max.resubmit", 0);
382    slm = new SplitLogManager(master, conf);
383    TaskBatch batch = new TaskBatch();
384
385    String tasknode = submitTaskAndWait(batch, "foo/1");
386    final ServerName worker1 = ServerName.valueOf("worker1,1,1");
387    SplitLogTask slt = new SplitLogTask.Err(worker1);
388    ZKUtil.setData(zkw, tasknode, slt.toByteArray());
389
390    synchronized (batch) {
391      while (batch.installed != batch.error) {
392        batch.wait();
393      }
394    }
395    waitForCounter(tot_mgr_task_deleted, 0, 1, to/2);
396    assertTrue(ZKUtil.checkExists(zkw, tasknode) == -1);
397    conf.setInt("hbase.splitlog.max.resubmit", ZKSplitLogManagerCoordination.DEFAULT_MAX_RESUBMIT);
398  }
399
400  @Test
401  public void testTaskResigned() throws Exception {
402    LOG.info("TestTaskResigned - resubmit task node once in RESIGNED state");
403    assertEquals(0, tot_mgr_resubmit.sum());
404    slm = new SplitLogManager(master, conf);
405    assertEquals(0, tot_mgr_resubmit.sum());
406    TaskBatch batch = new TaskBatch();
407    String tasknode = submitTaskAndWait(batch, "foo/1");
408    assertEquals(0, tot_mgr_resubmit.sum());
409    final ServerName worker1 = ServerName.valueOf("worker1,1,1");
410    assertEquals(0, tot_mgr_resubmit.sum());
411    SplitLogTask slt = new SplitLogTask.Resigned(worker1);
412    assertEquals(0, tot_mgr_resubmit.sum());
413    ZKUtil.setData(zkw, tasknode, slt.toByteArray());
414    ZKUtil.checkExists(zkw, tasknode);
415    // Could be small race here.
416    if (tot_mgr_resubmit.sum() == 0) {
417      waitForCounter(tot_mgr_resubmit, 0, 1, to/2);
418    }
419    assertEquals(1, tot_mgr_resubmit.sum());
420
421    byte[] taskstate = ZKUtil.getData(zkw, tasknode);
422    slt = SplitLogTask.parseFrom(taskstate);
423    assertTrue(slt.isUnassigned(master.getServerName()));
424  }
425
426  @Test
427  public void testUnassignedTimeout() throws Exception {
428    LOG.info("TestUnassignedTimeout - iff all tasks are unassigned then" +
429        " resubmit");
430
431    // create an orphan task in OWNED state
432    String tasknode1 = ZKSplitLog.getEncodedNodeName(zkw, "orphan/1");
433    final ServerName worker1 = ServerName.valueOf("worker1,1,1");
434    SplitLogTask slt = new SplitLogTask.Owned(worker1);
435    zkw.getRecoverableZooKeeper().create(tasknode1, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
436        CreateMode.PERSISTENT);
437
438    slm = new SplitLogManager(master, conf);
439    waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2);
440
441    // submit another task which will stay in unassigned mode
442    TaskBatch batch = new TaskBatch();
443    submitTaskAndWait(batch, "foo/1");
444
445    // keep updating the orphan owned node every to/2 seconds
446    for (int i = 0; i < (3 * to)/100; i++) {
447      Thread.sleep(100);
448      final ServerName worker2 = ServerName.valueOf("worker1,1,1");
449      slt = new SplitLogTask.Owned(worker2);
450      ZKUtil.setData(zkw, tasknode1, slt.toByteArray());
451    }
452
453    // since we have stopped heartbeating the owned node therefore it should
454    // get resubmitted
455    LOG.info("waiting for manager to resubmit the orphan task");
456    waitForCounter(tot_mgr_resubmit, 0, 1, to + to/2);
457
458    // now all the nodes are unassigned. manager should post another rescan
459    waitForCounter(tot_mgr_resubmit_unassigned, 0, 1, 2 * to + to/2);
460  }
461
462  @Test
463  public void testDeadWorker() throws Exception {
464    LOG.info("testDeadWorker");
465
466    conf.setLong("hbase.splitlog.max.resubmit", 0);
467    slm = new SplitLogManager(master, conf);
468    TaskBatch batch = new TaskBatch();
469
470    String tasknode = submitTaskAndWait(batch, "foo/1");
471    int version = ZKUtil.checkExists(zkw, tasknode);
472    final ServerName worker1 = ServerName.valueOf("worker1,1,1");
473    SplitLogTask slt = new SplitLogTask.Owned(worker1);
474    ZKUtil.setData(zkw, tasknode, slt.toByteArray());
475    if (tot_mgr_heartbeat.sum() == 0) {
476      waitForCounter(tot_mgr_heartbeat, 0, 1, to/2);
477    }
478    slm.handleDeadWorker(worker1);
479    if (tot_mgr_resubmit.sum() == 0) {
480      waitForCounter(tot_mgr_resubmit, 0, 1, to+to/2);
481    }
482    if (tot_mgr_resubmit_dead_server_task.sum() == 0) {
483      waitForCounter(tot_mgr_resubmit_dead_server_task, 0, 1, to + to/2);
484    }
485
486    int version1 = ZKUtil.checkExists(zkw, tasknode);
487    assertTrue(version1 > version);
488    byte[] taskstate = ZKUtil.getData(zkw, tasknode);
489    slt = SplitLogTask.parseFrom(taskstate);
490    assertTrue(slt.isUnassigned(master.getServerName()));
491    return;
492  }
493
494  @Test
495  public void testWorkerCrash() throws Exception {
496    slm = new SplitLogManager(master, conf);
497    TaskBatch batch = new TaskBatch();
498
499    String tasknode = submitTaskAndWait(batch, "foo/1");
500    final ServerName worker1 = ServerName.valueOf("worker1,1,1");
501
502    SplitLogTask slt = new SplitLogTask.Owned(worker1);
503    ZKUtil.setData(zkw, tasknode, slt.toByteArray());
504    if (tot_mgr_heartbeat.sum() == 0) {
505      waitForCounter(tot_mgr_heartbeat, 0, 1, to/2);
506    }
507
508    // Not yet resubmitted.
509    Assert.assertEquals(0, tot_mgr_resubmit.sum());
510
511    // This server becomes dead
512    Mockito.when(sm.isServerOnline(worker1)).thenReturn(false);
513
514    Thread.sleep(1300); // The timeout checker is done every 1000 ms (hardcoded).
515
516    // It has been resubmitted
517    Assert.assertEquals(1, tot_mgr_resubmit.sum());
518  }
519
520  @Test
521  public void testEmptyLogDir() throws Exception {
522    LOG.info("testEmptyLogDir");
523    slm = new SplitLogManager(master, conf);
524    FileSystem fs = TEST_UTIL.getTestFileSystem();
525    Path emptyLogDirPath = new Path(new Path(fs.getWorkingDirectory(),
526      HConstants.HREGION_LOGDIR_NAME),
527        ServerName.valueOf("emptyLogDir", 1, 1).toString());
528    fs.mkdirs(emptyLogDirPath);
529    slm.splitLogDistributed(emptyLogDirPath);
530    assertFalse(fs.exists(emptyLogDirPath));
531  }
532
533  @Test
534  public void testLogFilesAreArchived() throws Exception {
535    LOG.info("testLogFilesAreArchived");
536    slm = new SplitLogManager(master, conf);
537    FileSystem fs = TEST_UTIL.getTestFileSystem();
538    Path dir = TEST_UTIL.getDataTestDirOnTestFS("testLogFilesAreArchived");
539    conf.set(HConstants.HBASE_DIR, dir.toString());
540    String serverName = ServerName.valueOf("foo", 1, 1).toString();
541    Path logDirPath = new Path(new Path(dir, HConstants.HREGION_LOGDIR_NAME), serverName);
542    fs.mkdirs(logDirPath);
543    // create an empty log file
544    String logFile = new Path(logDirPath, TEST_UTIL.getRandomUUID().toString()).toString();
545    fs.create(new Path(logDirPath, logFile)).close();
546
547    // spin up a thread mocking split done.
548    new Thread() {
549      @Override
550      public void run() {
551        boolean done = false;
552        while (!done) {
553          for (Map.Entry<String, Task> entry : slm.getTasks().entrySet()) {
554            final ServerName worker1 = ServerName.valueOf("worker1,1,1");
555            SplitLogTask slt = new SplitLogTask.Done(worker1);
556            boolean encounteredZKException = false;
557            try {
558              ZKUtil.setData(zkw, entry.getKey(), slt.toByteArray());
559            } catch (KeeperException e) {
560              LOG.warn(e.toString(), e);
561              encounteredZKException = true;
562            }
563            if (!encounteredZKException) {
564              done = true;
565            }
566          }
567        }
568      }
569    }.start();
570
571    slm.splitLogDistributed(logDirPath);
572
573    assertFalse(fs.exists(logDirPath));
574  }
575}