001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master;
019
020import static org.apache.hadoop.hbase.SplitLogCounters.resetCounters;
021import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_heartbeat;
022import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_node_create_queued;
023import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_orphan_task_acquired;
024import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_rescan;
025import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_rescan_deleted;
026import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit;
027import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_dead_server_task;
028import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_failed;
029import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_force;
030import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_threshold_reached;
031import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_unassigned;
032import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_task_deleted;
033import static org.junit.Assert.assertEquals;
034import static org.junit.Assert.assertFalse;
035import static org.junit.Assert.assertTrue;
036
037import java.io.IOException;
038import java.util.Map;
039import java.util.concurrent.atomic.LongAdder;
040import org.apache.hadoop.conf.Configuration;
041import org.apache.hadoop.fs.FileSystem;
042import org.apache.hadoop.fs.Path;
043import org.apache.hadoop.hbase.CoordinatedStateManager;
044import org.apache.hadoop.hbase.HBaseClassTestRule;
045import org.apache.hadoop.hbase.HBaseTestingUtility;
046import org.apache.hadoop.hbase.HConstants;
047import org.apache.hadoop.hbase.ServerName;
048import org.apache.hadoop.hbase.SplitLogCounters;
049import org.apache.hadoop.hbase.SplitLogTask;
050import org.apache.hadoop.hbase.Waiter;
051import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination;
052import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager;
053import org.apache.hadoop.hbase.master.SplitLogManager.Task;
054import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch;
055import org.apache.hadoop.hbase.testclassification.LargeTests;
056import org.apache.hadoop.hbase.testclassification.MasterTests;
057import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
058import org.apache.hadoop.hbase.zookeeper.TestMasterAddressTracker.NodeCreationListener;
059import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
060import org.apache.hadoop.hbase.zookeeper.ZKUtil;
061import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
062import org.apache.zookeeper.CreateMode;
063import org.apache.zookeeper.KeeperException;
064import org.apache.zookeeper.ZooDefs.Ids;
065import org.junit.After;
066import org.junit.Assert;
067import org.junit.Before;
068import org.junit.ClassRule;
069import org.junit.Test;
070import org.junit.experimental.categories.Category;
071import org.mockito.Mockito;
072import org.slf4j.Logger;
073import org.slf4j.LoggerFactory;
074
075@Category({ MasterTests.class, LargeTests.class })
076public class TestSplitLogManager {
077
078  @ClassRule
079  public static final HBaseClassTestRule CLASS_RULE =
080    HBaseClassTestRule.forClass(TestSplitLogManager.class);
081
082  private static final Logger LOG = LoggerFactory.getLogger(TestSplitLogManager.class);
083
084  private final ServerManager sm = Mockito.mock(ServerManager.class);
085
086  private ZKWatcher zkw;
087  private DummyMasterServices master;
088  private SplitLogManager slm;
089  private Configuration conf;
090  private int to;
091
092  private static HBaseTestingUtility TEST_UTIL;
093
094  class DummyMasterServices extends MockNoopMasterServices {
095    private ZKWatcher zkw;
096    private CoordinatedStateManager cm;
097
098    public DummyMasterServices(ZKWatcher zkw, Configuration conf) {
099      super(conf);
100      this.zkw = zkw;
101      cm = new ZkCoordinatedStateManager(this);
102    }
103
104    @Override
105    public ZKWatcher getZooKeeper() {
106      return zkw;
107    }
108
109    @Override
110    public CoordinatedStateManager getCoordinatedStateManager() {
111      return cm;
112    }
113
114    @Override
115    public ServerManager getServerManager() {
116      return sm;
117    }
118  }
119
120  @Before
121  public void setup() throws Exception {
122    TEST_UTIL = new HBaseTestingUtility();
123    TEST_UTIL.startMiniZKCluster();
124    conf = TEST_UTIL.getConfiguration();
125    // Use a different ZK wrapper instance for each tests.
126    zkw =
127      new ZKWatcher(conf, "split-log-manager-tests" + TEST_UTIL.getRandomUUID().toString(), null);
128    master = new DummyMasterServices(zkw, conf);
129
130    ZKUtil.deleteChildrenRecursively(zkw, zkw.getZNodePaths().baseZNode);
131    ZKUtil.createAndFailSilent(zkw, zkw.getZNodePaths().baseZNode);
132    assertTrue(ZKUtil.checkExists(zkw, zkw.getZNodePaths().baseZNode) != -1);
133    LOG.debug(zkw.getZNodePaths().baseZNode + " created");
134    ZKUtil.createAndFailSilent(zkw, zkw.getZNodePaths().splitLogZNode);
135    assertTrue(ZKUtil.checkExists(zkw, zkw.getZNodePaths().splitLogZNode) != -1);
136    LOG.debug(zkw.getZNodePaths().splitLogZNode + " created");
137
138    resetCounters();
139
140    // By default, we let the test manage the error as before, so the server
141    // does not appear as dead from the master point of view, only from the split log pov.
142    Mockito.when(sm.isServerOnline(Mockito.any())).thenReturn(true);
143
144    to = 12000;
145    conf.setInt(HConstants.HBASE_SPLITLOG_MANAGER_TIMEOUT, to);
146    conf.setInt("hbase.splitlog.manager.unassigned.timeout", 2 * to);
147
148    conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100);
149    to = to + 16 * 100;
150  }
151
152  @After
153  public void teardown() throws IOException, KeeperException {
154    master.stop("");
155    if (slm != null) {
156      slm.stop();
157    }
158    TEST_UTIL.shutdownMiniZKCluster();
159  }
160
161  @Test
162  public void testBatchWaitMillis() {
163    assertEquals(100, SplitLogManager.getBatchWaitTimeMillis(0));
164    assertEquals(100, SplitLogManager.getBatchWaitTimeMillis(1));
165    assertEquals(1000, SplitLogManager.getBatchWaitTimeMillis(10));
166    assertEquals(60_000, SplitLogManager.getBatchWaitTimeMillis(101));
167    assertEquals(60_000, SplitLogManager.getBatchWaitTimeMillis(1011));
168  }
169
170  private interface Expr {
171    long eval();
172  }
173
174  private void waitForCounter(final LongAdder ctr, long oldval, long newval, long timems)
175    throws Exception {
176    Expr e = new Expr() {
177      @Override
178      public long eval() {
179        return ctr.sum();
180      }
181    };
182    waitForCounter(e, oldval, newval, timems);
183    return;
184  }
185
186  private void waitForCounter(final Expr e, final long oldval, long newval, long timems)
187    throws Exception {
188
189    TEST_UTIL.waitFor(timems, 10, new Waiter.Predicate<Exception>() {
190      @Override
191      public boolean evaluate() throws Exception {
192        return (e.eval() != oldval);
193      }
194    });
195
196    assertEquals(newval, e.eval());
197  }
198
199  private Task findOrCreateOrphanTask(String path) {
200    return slm.tasks.computeIfAbsent(path, k -> {
201      LOG.info("creating orphan task " + k);
202      SplitLogCounters.tot_mgr_orphan_task_acquired.increment();
203      return new Task();
204    });
205  }
206
207  private String submitTaskAndWait(TaskBatch batch, String name)
208    throws KeeperException, InterruptedException {
209    String tasknode = ZKSplitLog.getEncodedNodeName(zkw, name);
210    NodeCreationListener listener = new NodeCreationListener(zkw, tasknode);
211    zkw.registerListener(listener);
212    ZKUtil.watchAndCheckExists(zkw, tasknode);
213
214    slm.enqueueSplitTask(name, batch);
215    assertEquals(1, batch.installed);
216    assertTrue(findOrCreateOrphanTask(tasknode).batch == batch);
217    assertEquals(1L, tot_mgr_node_create_queued.sum());
218
219    LOG.debug("waiting for task node creation");
220    listener.waitForCreation();
221    LOG.debug("task created");
222    return tasknode;
223  }
224
225  /**
226   * Test whether the splitlog correctly creates a task in zookeeper
227   */
228  @Test
229  public void testTaskCreation() throws Exception {
230
231    LOG.info("TestTaskCreation - test the creation of a task in zk");
232    slm = new SplitLogManager(master, conf);
233    TaskBatch batch = new TaskBatch();
234
235    String tasknode = submitTaskAndWait(batch, "foo/1");
236
237    byte[] data = ZKUtil.getData(zkw, tasknode);
238    SplitLogTask slt = SplitLogTask.parseFrom(data);
239    LOG.info("Task node created " + slt.toString());
240    assertTrue(slt.isUnassigned(master.getServerName()));
241  }
242
243  @Test
244  public void testOrphanTaskAcquisition() throws Exception {
245    LOG.info("TestOrphanTaskAcquisition");
246
247    String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash");
248    SplitLogTask slt = new SplitLogTask.Owned(master.getServerName());
249    zkw.getRecoverableZooKeeper().create(tasknode, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
250      CreateMode.PERSISTENT);
251
252    slm = new SplitLogManager(master, conf);
253    waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to / 2);
254    Task task = findOrCreateOrphanTask(tasknode);
255    assertTrue(task.isOrphan());
256    waitForCounter(tot_mgr_heartbeat, 0, 1, to / 2);
257    assertFalse(task.isUnassigned());
258    long curt = EnvironmentEdgeManager.currentTime();
259    assertTrue((task.last_update <= curt) && (task.last_update > (curt - 1000)));
260    LOG.info("waiting for manager to resubmit the orphan task");
261    waitForCounter(tot_mgr_resubmit, 0, 1, to + to / 2);
262    assertTrue(task.isUnassigned());
263    waitForCounter(tot_mgr_rescan, 0, 1, to + to / 2);
264  }
265
266  @Test
267  public void testUnassignedOrphan() throws Exception {
268    LOG.info("TestUnassignedOrphan - an unassigned task is resubmitted at" + " startup");
269    String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash");
270    // create an unassigned orphan task
271    SplitLogTask slt = new SplitLogTask.Unassigned(master.getServerName());
272    zkw.getRecoverableZooKeeper().create(tasknode, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
273      CreateMode.PERSISTENT);
274    int version = ZKUtil.checkExists(zkw, tasknode);
275
276    slm = new SplitLogManager(master, conf);
277    waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to / 2);
278    Task task = findOrCreateOrphanTask(tasknode);
279    assertTrue(task.isOrphan());
280    assertTrue(task.isUnassigned());
281    // wait for RESCAN node to be created
282    waitForCounter(tot_mgr_rescan, 0, 1, to / 2);
283    Task task2 = findOrCreateOrphanTask(tasknode);
284    assertTrue(task == task2);
285    LOG.debug("task = " + task);
286    assertEquals(1L, tot_mgr_resubmit.sum());
287    assertEquals(1, task.incarnation.get());
288    assertEquals(0, task.unforcedResubmits.get());
289    assertTrue(task.isOrphan());
290    assertTrue(task.isUnassigned());
291    assertTrue(ZKUtil.checkExists(zkw, tasknode) > version);
292  }
293
294  @Test
295  public void testMultipleResubmits() throws Exception {
296    LOG.info("TestMultipleResbmits - no indefinite resubmissions");
297    conf.setInt("hbase.splitlog.max.resubmit", 2);
298    slm = new SplitLogManager(master, conf);
299    TaskBatch batch = new TaskBatch();
300
301    String tasknode = submitTaskAndWait(batch, "foo/1");
302    int version = ZKUtil.checkExists(zkw, tasknode);
303    final ServerName worker1 = ServerName.valueOf("worker1,1,1");
304    final ServerName worker2 = ServerName.valueOf("worker2,1,1");
305    final ServerName worker3 = ServerName.valueOf("worker3,1,1");
306    SplitLogTask slt = new SplitLogTask.Owned(worker1);
307    ZKUtil.setData(zkw, tasknode, slt.toByteArray());
308    waitForCounter(tot_mgr_heartbeat, 0, 1, to / 2);
309    waitForCounter(tot_mgr_resubmit, 0, 1, to + to / 2);
310    int version1 = ZKUtil.checkExists(zkw, tasknode);
311    assertTrue(version1 > version);
312    slt = new SplitLogTask.Owned(worker2);
313    ZKUtil.setData(zkw, tasknode, slt.toByteArray());
314    waitForCounter(tot_mgr_heartbeat, 1, 2, to / 2);
315    waitForCounter(tot_mgr_resubmit, 1, 2, to + to / 2);
316    int version2 = ZKUtil.checkExists(zkw, tasknode);
317    assertTrue(version2 > version1);
318    slt = new SplitLogTask.Owned(worker3);
319    ZKUtil.setData(zkw, tasknode, slt.toByteArray());
320    waitForCounter(tot_mgr_heartbeat, 2, 3, to / 2);
321    waitForCounter(tot_mgr_resubmit_threshold_reached, 0, 1, to + to / 2);
322    Thread.sleep(to + to / 2);
323    assertEquals(2L, tot_mgr_resubmit.sum() - tot_mgr_resubmit_force.sum());
324  }
325
326  @Test
327  public void testRescanCleanup() throws Exception {
328    LOG.info("TestRescanCleanup - ensure RESCAN nodes are cleaned up");
329
330    slm = new SplitLogManager(master, conf);
331    TaskBatch batch = new TaskBatch();
332
333    String tasknode = submitTaskAndWait(batch, "foo/1");
334    int version = ZKUtil.checkExists(zkw, tasknode);
335    final ServerName worker1 = ServerName.valueOf("worker1,1,1");
336    SplitLogTask slt = new SplitLogTask.Owned(worker1);
337    ZKUtil.setData(zkw, tasknode, slt.toByteArray());
338    waitForCounter(tot_mgr_heartbeat, 0, 1, to / 2);
339    waitForCounter(new Expr() {
340      @Override
341      public long eval() {
342        return (tot_mgr_resubmit.sum() + tot_mgr_resubmit_failed.sum());
343      }
344    }, 0, 1, 5 * 60000); // wait long enough
345    Assert.assertEquals("Could not run test. Lost ZK connection?", 0,
346      tot_mgr_resubmit_failed.sum());
347    int version1 = ZKUtil.checkExists(zkw, tasknode);
348    assertTrue(version1 > version);
349    byte[] taskstate = ZKUtil.getData(zkw, tasknode);
350    slt = SplitLogTask.parseFrom(taskstate);
351    assertTrue(slt.isUnassigned(master.getServerName()));
352
353    waitForCounter(tot_mgr_rescan_deleted, 0, 1, to / 2);
354  }
355
356  @Test
357  public void testTaskDone() throws Exception {
358    LOG.info("TestTaskDone - cleanup task node once in DONE state");
359
360    slm = new SplitLogManager(master, conf);
361    TaskBatch batch = new TaskBatch();
362    String tasknode = submitTaskAndWait(batch, "foo/1");
363    final ServerName worker1 = ServerName.valueOf("worker1,1,1");
364    SplitLogTask slt = new SplitLogTask.Done(worker1);
365    ZKUtil.setData(zkw, tasknode, slt.toByteArray());
366    synchronized (batch) {
367      while (batch.installed != batch.done) {
368        batch.wait();
369      }
370    }
371    waitForCounter(tot_mgr_task_deleted, 0, 1, to / 2);
372    assertTrue(ZKUtil.checkExists(zkw, tasknode) == -1);
373  }
374
375  @Test
376  public void testTaskErr() throws Exception {
377    LOG.info("TestTaskErr - cleanup task node once in ERR state");
378
379    conf.setInt("hbase.splitlog.max.resubmit", 0);
380    slm = new SplitLogManager(master, conf);
381    TaskBatch batch = new TaskBatch();
382
383    String tasknode = submitTaskAndWait(batch, "foo/1");
384    final ServerName worker1 = ServerName.valueOf("worker1,1,1");
385    SplitLogTask slt = new SplitLogTask.Err(worker1);
386    ZKUtil.setData(zkw, tasknode, slt.toByteArray());
387
388    synchronized (batch) {
389      while (batch.installed != batch.error) {
390        batch.wait();
391      }
392    }
393    waitForCounter(tot_mgr_task_deleted, 0, 1, to / 2);
394    assertTrue(ZKUtil.checkExists(zkw, tasknode) == -1);
395    conf.setInt("hbase.splitlog.max.resubmit", ZKSplitLogManagerCoordination.DEFAULT_MAX_RESUBMIT);
396  }
397
398  @Test
399  public void testTaskResigned() throws Exception {
400    LOG.info("TestTaskResigned - resubmit task node once in RESIGNED state");
401    assertEquals(0, tot_mgr_resubmit.sum());
402    slm = new SplitLogManager(master, conf);
403    assertEquals(0, tot_mgr_resubmit.sum());
404    TaskBatch batch = new TaskBatch();
405    String tasknode = submitTaskAndWait(batch, "foo/1");
406    assertEquals(0, tot_mgr_resubmit.sum());
407    final ServerName worker1 = ServerName.valueOf("worker1,1,1");
408    assertEquals(0, tot_mgr_resubmit.sum());
409    SplitLogTask slt = new SplitLogTask.Resigned(worker1);
410    assertEquals(0, tot_mgr_resubmit.sum());
411    ZKUtil.setData(zkw, tasknode, slt.toByteArray());
412    ZKUtil.checkExists(zkw, tasknode);
413    // Could be small race here.
414    if (tot_mgr_resubmit.sum() == 0) {
415      waitForCounter(tot_mgr_resubmit, 0, 1, to / 2);
416    }
417    assertEquals(1, tot_mgr_resubmit.sum());
418
419    byte[] taskstate = ZKUtil.getData(zkw, tasknode);
420    slt = SplitLogTask.parseFrom(taskstate);
421    assertTrue(slt.isUnassigned(master.getServerName()));
422  }
423
424  @Test
425  public void testUnassignedTimeout() throws Exception {
426    LOG.info("TestUnassignedTimeout - iff all tasks are unassigned then" + " resubmit");
427
428    // create an orphan task in OWNED state
429    String tasknode1 = ZKSplitLog.getEncodedNodeName(zkw, "orphan/1");
430    final ServerName worker1 = ServerName.valueOf("worker1,1,1");
431    SplitLogTask slt = new SplitLogTask.Owned(worker1);
432    zkw.getRecoverableZooKeeper().create(tasknode1, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
433      CreateMode.PERSISTENT);
434
435    slm = new SplitLogManager(master, conf);
436    waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to / 2);
437
438    // submit another task which will stay in unassigned mode
439    TaskBatch batch = new TaskBatch();
440    submitTaskAndWait(batch, "foo/1");
441
442    // keep updating the orphan owned node every to/2 seconds
443    for (int i = 0; i < (3 * to) / 100; i++) {
444      Thread.sleep(100);
445      final ServerName worker2 = ServerName.valueOf("worker1,1,1");
446      slt = new SplitLogTask.Owned(worker2);
447      ZKUtil.setData(zkw, tasknode1, slt.toByteArray());
448    }
449
450    // since we have stopped heartbeating the owned node therefore it should
451    // get resubmitted
452    LOG.info("waiting for manager to resubmit the orphan task");
453    waitForCounter(tot_mgr_resubmit, 0, 1, to + to / 2);
454
455    // now all the nodes are unassigned. manager should post another rescan
456    waitForCounter(tot_mgr_resubmit_unassigned, 0, 1, 2 * to + to / 2);
457  }
458
459  @Test
460  public void testDeadWorker() throws Exception {
461    LOG.info("testDeadWorker");
462
463    conf.setLong("hbase.splitlog.max.resubmit", 0);
464    slm = new SplitLogManager(master, conf);
465    TaskBatch batch = new TaskBatch();
466
467    String tasknode = submitTaskAndWait(batch, "foo/1");
468    int version = ZKUtil.checkExists(zkw, tasknode);
469    final ServerName worker1 = ServerName.valueOf("worker1,1,1");
470    SplitLogTask slt = new SplitLogTask.Owned(worker1);
471    ZKUtil.setData(zkw, tasknode, slt.toByteArray());
472    if (tot_mgr_heartbeat.sum() == 0) {
473      waitForCounter(tot_mgr_heartbeat, 0, 1, to / 2);
474    }
475    slm.handleDeadWorker(worker1);
476    if (tot_mgr_resubmit.sum() == 0) {
477      waitForCounter(tot_mgr_resubmit, 0, 1, to + to / 2);
478    }
479    if (tot_mgr_resubmit_dead_server_task.sum() == 0) {
480      waitForCounter(tot_mgr_resubmit_dead_server_task, 0, 1, to + to / 2);
481    }
482
483    int version1 = ZKUtil.checkExists(zkw, tasknode);
484    assertTrue(version1 > version);
485    byte[] taskstate = ZKUtil.getData(zkw, tasknode);
486    slt = SplitLogTask.parseFrom(taskstate);
487    assertTrue(slt.isUnassigned(master.getServerName()));
488    return;
489  }
490
491  @Test
492  public void testWorkerCrash() throws Exception {
493    slm = new SplitLogManager(master, conf);
494    TaskBatch batch = new TaskBatch();
495
496    String tasknode = submitTaskAndWait(batch, "foo/1");
497    final ServerName worker1 = ServerName.valueOf("worker1,1,1");
498
499    SplitLogTask slt = new SplitLogTask.Owned(worker1);
500    ZKUtil.setData(zkw, tasknode, slt.toByteArray());
501    if (tot_mgr_heartbeat.sum() == 0) {
502      waitForCounter(tot_mgr_heartbeat, 0, 1, to / 2);
503    }
504
505    // Not yet resubmitted.
506    Assert.assertEquals(0, tot_mgr_resubmit.sum());
507
508    // This server becomes dead
509    Mockito.when(sm.isServerOnline(worker1)).thenReturn(false);
510
511    Thread.sleep(1300); // The timeout checker is done every 1000 ms (hardcoded).
512
513    // It has been resubmitted
514    Assert.assertEquals(1, tot_mgr_resubmit.sum());
515  }
516
517  @Test
518  public void testEmptyLogDir() throws Exception {
519    LOG.info("testEmptyLogDir");
520    slm = new SplitLogManager(master, conf);
521    FileSystem fs = TEST_UTIL.getTestFileSystem();
522    Path emptyLogDirPath =
523      new Path(new Path(fs.getWorkingDirectory(), HConstants.HREGION_LOGDIR_NAME),
524        ServerName.valueOf("emptyLogDir", 1, 1).toString());
525    fs.mkdirs(emptyLogDirPath);
526    slm.splitLogDistributed(emptyLogDirPath);
527    assertFalse(fs.exists(emptyLogDirPath));
528  }
529
530  @Test
531  public void testLogFilesAreArchived() throws Exception {
532    LOG.info("testLogFilesAreArchived");
533    slm = new SplitLogManager(master, conf);
534    FileSystem fs = TEST_UTIL.getTestFileSystem();
535    Path dir = TEST_UTIL.getDataTestDirOnTestFS("testLogFilesAreArchived");
536    conf.set(HConstants.HBASE_DIR, dir.toString());
537    String serverName = ServerName.valueOf("foo", 1, 1).toString();
538    Path logDirPath = new Path(new Path(dir, HConstants.HREGION_LOGDIR_NAME), serverName);
539    fs.mkdirs(logDirPath);
540    // create an empty log file
541    String logFile = new Path(logDirPath, TEST_UTIL.getRandomUUID().toString()).toString();
542    fs.create(new Path(logDirPath, logFile)).close();
543
544    // spin up a thread mocking split done.
545    new Thread() {
546      @Override
547      public void run() {
548        boolean done = false;
549        while (!done) {
550          for (Map.Entry<String, Task> entry : slm.getTasks().entrySet()) {
551            final ServerName worker1 = ServerName.valueOf("worker1,1,1");
552            SplitLogTask slt = new SplitLogTask.Done(worker1);
553            boolean encounteredZKException = false;
554            try {
555              ZKUtil.setData(zkw, entry.getKey(), slt.toByteArray());
556            } catch (KeeperException e) {
557              LOG.warn(e.toString(), e);
558              encounteredZKException = true;
559            }
560            if (!encounteredZKException) {
561              done = true;
562            }
563          }
564        }
565      };
566    }.start();
567
568    slm.splitLogDistributed(logDirPath);
569
570    assertFalse(fs.exists(logDirPath));
571  }
572}