001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master;
019
020import static org.apache.hadoop.hbase.SplitLogCounters.resetCounters;
021import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_heartbeat;
022import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_node_create_queued;
023import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_orphan_task_acquired;
024import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_rescan;
025import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_rescan_deleted;
026import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit;
027import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_dead_server_task;
028import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_failed;
029import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_force;
030import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_threshold_reached;
031import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_unassigned;
032import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_task_deleted;
033import static org.junit.Assert.assertEquals;
034import static org.junit.Assert.assertFalse;
035import static org.junit.Assert.assertTrue;
036import java.io.IOException;
037import java.util.Map;
038import java.util.concurrent.atomic.LongAdder;
039import org.apache.hadoop.conf.Configuration;
040import org.apache.hadoop.fs.FileSystem;
041import org.apache.hadoop.fs.Path;
042import org.apache.hadoop.hbase.CoordinatedStateManager;
043import org.apache.hadoop.hbase.HBaseClassTestRule;
044import org.apache.hadoop.hbase.HBaseTestingUtility;
045import org.apache.hadoop.hbase.HConstants;
046import org.apache.hadoop.hbase.ServerName;
047import org.apache.hadoop.hbase.SplitLogCounters;
048import org.apache.hadoop.hbase.SplitLogTask;
049import org.apache.hadoop.hbase.Waiter;
050import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination;
051import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager;
052import org.apache.hadoop.hbase.master.SplitLogManager.Task;
053import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch;
054import org.apache.hadoop.hbase.regionserver.TestMasterAddressTracker.NodeCreationListener;
055import org.apache.hadoop.hbase.testclassification.LargeTests;
056import org.apache.hadoop.hbase.testclassification.MasterTests;
057import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
058import org.apache.hadoop.hbase.zookeeper.ZKUtil;
059import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
060import org.apache.zookeeper.CreateMode;
061import org.apache.zookeeper.KeeperException;
062import org.apache.zookeeper.ZooDefs.Ids;
063import org.junit.After;
064import org.junit.Assert;
065import org.junit.Before;
066import org.junit.ClassRule;
067import org.junit.Test;
068import org.junit.experimental.categories.Category;
069import org.mockito.Mockito;
070import org.slf4j.Logger;
071import org.slf4j.LoggerFactory;
072
073@Category({MasterTests.class, LargeTests.class})
074public class TestSplitLogManager {
075
076  @ClassRule
077  public static final HBaseClassTestRule CLASS_RULE =
078      HBaseClassTestRule.forClass(TestSplitLogManager.class);
079
080  private static final Logger LOG = LoggerFactory.getLogger(TestSplitLogManager.class);
081
082  private final ServerManager sm = Mockito.mock(ServerManager.class);
083
084  private ZKWatcher zkw;
085  private DummyMasterServices master;
086  private SplitLogManager slm;
087  private Configuration conf;
088  private int to;
089
090  private static HBaseTestingUtility TEST_UTIL;
091
092  class DummyMasterServices extends MockNoopMasterServices {
093    private ZKWatcher zkw;
094    private CoordinatedStateManager cm;
095
096    public DummyMasterServices(ZKWatcher zkw, Configuration conf) {
097      super(conf);
098      this.zkw = zkw;
099      cm = new ZkCoordinatedStateManager(this);
100    }
101
102    @Override
103    public ZKWatcher getZooKeeper() {
104      return zkw;
105    }
106
107    @Override
108    public CoordinatedStateManager getCoordinatedStateManager() {
109      return cm;
110    }
111
112    @Override
113    public ServerManager getServerManager() {
114      return sm;
115    }
116  }
117
118  @Before
119  public void setup() throws Exception {
120    TEST_UTIL = new HBaseTestingUtility();
121    TEST_UTIL.startMiniZKCluster();
122    conf = TEST_UTIL.getConfiguration();
123    // Use a different ZK wrapper instance for each tests.
124    zkw =
125        new ZKWatcher(conf, "split-log-manager-tests" + TEST_UTIL.getRandomUUID().toString(), null);
126    master = new DummyMasterServices(zkw, conf);
127
128    ZKUtil.deleteChildrenRecursively(zkw, zkw.getZNodePaths().baseZNode);
129    ZKUtil.createAndFailSilent(zkw, zkw.getZNodePaths().baseZNode);
130    assertTrue(ZKUtil.checkExists(zkw, zkw.getZNodePaths().baseZNode) != -1);
131    LOG.debug(zkw.getZNodePaths().baseZNode + " created");
132    ZKUtil.createAndFailSilent(zkw, zkw.getZNodePaths().splitLogZNode);
133    assertTrue(ZKUtil.checkExists(zkw, zkw.getZNodePaths().splitLogZNode) != -1);
134    LOG.debug(zkw.getZNodePaths().splitLogZNode + " created");
135
136    resetCounters();
137
138    // By default, we let the test manage the error as before, so the server
139    // does not appear as dead from the master point of view, only from the split log pov.
140    Mockito.when(sm.isServerOnline(Mockito.any())).thenReturn(true);
141
142    to = 12000;
143    conf.setInt(HConstants.HBASE_SPLITLOG_MANAGER_TIMEOUT, to);
144    conf.setInt("hbase.splitlog.manager.unassigned.timeout", 2 * to);
145
146    conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100);
147    to = to + 16 * 100;
148  }
149
150  @After
151  public void teardown() throws IOException, KeeperException {
152    master.stop("");
153    if (slm != null) {
154      slm.stop();
155    }
156    TEST_UTIL.shutdownMiniZKCluster();
157  }
158
159  @Test
160  public void testBatchWaitMillis() {
161    assertEquals(100, SplitLogManager.getBatchWaitTimeMillis(0));
162    assertEquals(100, SplitLogManager.getBatchWaitTimeMillis(1));
163    assertEquals(1000, SplitLogManager.getBatchWaitTimeMillis(10));
164    assertEquals(60_000, SplitLogManager.getBatchWaitTimeMillis(101));
165    assertEquals(60_000, SplitLogManager.getBatchWaitTimeMillis(1011));
166  }
167
168  private interface Expr {
169    long eval();
170  }
171
172  private void waitForCounter(final LongAdder ctr, long oldval, long newval, long timems)
173      throws Exception {
174    Expr e = new Expr() {
175      @Override
176      public long eval() {
177        return ctr.sum();
178      }
179    };
180    waitForCounter(e, oldval, newval, timems);
181    return;
182  }
183
184  private void waitForCounter(final Expr e, final long oldval, long newval, long timems)
185      throws Exception {
186
187    TEST_UTIL.waitFor(timems, 10, new Waiter.Predicate<Exception>() {
188      @Override
189      public boolean evaluate() throws Exception {
190        return (e.eval() != oldval);
191      }
192    });
193
194    assertEquals(newval, e.eval());
195  }
196
197  private Task findOrCreateOrphanTask(String path) {
198    return slm.tasks.computeIfAbsent(path, k -> {
199      LOG.info("creating orphan task " + k);
200      SplitLogCounters.tot_mgr_orphan_task_acquired.increment();
201      return new Task();
202    });
203  }
204
205  private String submitTaskAndWait(TaskBatch batch, String name) throws KeeperException,
206      InterruptedException {
207    String tasknode = ZKSplitLog.getEncodedNodeName(zkw, name);
208    NodeCreationListener listener = new NodeCreationListener(zkw, tasknode);
209    zkw.registerListener(listener);
210    ZKUtil.watchAndCheckExists(zkw, tasknode);
211
212    slm.enqueueSplitTask(name, batch);
213    assertEquals(1, batch.installed);
214    assertTrue(findOrCreateOrphanTask(tasknode).batch == batch);
215    assertEquals(1L, tot_mgr_node_create_queued.sum());
216
217    LOG.debug("waiting for task node creation");
218    listener.waitForCreation();
219    LOG.debug("task created");
220    return tasknode;
221  }
222
223  /**
224   * Test whether the splitlog correctly creates a task in zookeeper
225   */
226  @Test
227  public void testTaskCreation() throws Exception {
228
229    LOG.info("TestTaskCreation - test the creation of a task in zk");
230    slm = new SplitLogManager(master, conf);
231    TaskBatch batch = new TaskBatch();
232
233    String tasknode = submitTaskAndWait(batch, "foo/1");
234
235    byte[] data = ZKUtil.getData(zkw, tasknode);
236    SplitLogTask slt = SplitLogTask.parseFrom(data);
237    LOG.info("Task node created " + slt.toString());
238    assertTrue(slt.isUnassigned(master.getServerName()));
239  }
240
241  @Test
242  public void testOrphanTaskAcquisition() throws Exception {
243    LOG.info("TestOrphanTaskAcquisition");
244
245    String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash");
246    SplitLogTask slt = new SplitLogTask.Owned(master.getServerName());
247    zkw.getRecoverableZooKeeper().create(tasknode, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
248        CreateMode.PERSISTENT);
249
250    slm = new SplitLogManager(master, conf);
251    waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2);
252    Task task = findOrCreateOrphanTask(tasknode);
253    assertTrue(task.isOrphan());
254    waitForCounter(tot_mgr_heartbeat, 0, 1, to/2);
255    assertFalse(task.isUnassigned());
256    long curt = System.currentTimeMillis();
257    assertTrue((task.last_update <= curt) &&
258        (task.last_update > (curt - 1000)));
259    LOG.info("waiting for manager to resubmit the orphan task");
260    waitForCounter(tot_mgr_resubmit, 0, 1, to + to/2);
261    assertTrue(task.isUnassigned());
262    waitForCounter(tot_mgr_rescan, 0, 1, to + to/2);
263  }
264
265  @Test
266  public void testUnassignedOrphan() throws Exception {
267    LOG.info("TestUnassignedOrphan - an unassigned task is resubmitted at" +
268        " startup");
269    String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash");
270    //create an unassigned orphan task
271    SplitLogTask slt = new SplitLogTask.Unassigned(master.getServerName());
272    zkw.getRecoverableZooKeeper().create(tasknode, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
273        CreateMode.PERSISTENT);
274    int version = ZKUtil.checkExists(zkw, tasknode);
275
276    slm = new SplitLogManager(master, conf);
277    waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2);
278    Task task = findOrCreateOrphanTask(tasknode);
279    assertTrue(task.isOrphan());
280    assertTrue(task.isUnassigned());
281    // wait for RESCAN node to be created
282    waitForCounter(tot_mgr_rescan, 0, 1, to / 2);
283    Task task2 = findOrCreateOrphanTask(tasknode);
284    assertTrue(task == task2);
285    LOG.debug("task = " + task);
286    assertEquals(1L, tot_mgr_resubmit.sum());
287    assertEquals(1, task.incarnation.get());
288    assertEquals(0, task.unforcedResubmits.get());
289    assertTrue(task.isOrphan());
290    assertTrue(task.isUnassigned());
291    assertTrue(ZKUtil.checkExists(zkw, tasknode) > version);
292  }
293
294  @Test
295  public void testMultipleResubmits() throws Exception {
296    LOG.info("TestMultipleResbmits - no indefinite resubmissions");
297    conf.setInt("hbase.splitlog.max.resubmit", 2);
298    slm = new SplitLogManager(master, conf);
299    TaskBatch batch = new TaskBatch();
300
301    String tasknode = submitTaskAndWait(batch, "foo/1");
302    int version = ZKUtil.checkExists(zkw, tasknode);
303    final ServerName worker1 = ServerName.valueOf("worker1,1,1");
304    final ServerName worker2 = ServerName.valueOf("worker2,1,1");
305    final ServerName worker3 = ServerName.valueOf("worker3,1,1");
306    SplitLogTask slt = new SplitLogTask.Owned(worker1);
307    ZKUtil.setData(zkw, tasknode, slt.toByteArray());
308    waitForCounter(tot_mgr_heartbeat, 0, 1, to/2);
309    waitForCounter(tot_mgr_resubmit, 0, 1, to + to/2);
310    int version1 = ZKUtil.checkExists(zkw, tasknode);
311    assertTrue(version1 > version);
312    slt = new SplitLogTask.Owned(worker2);
313    ZKUtil.setData(zkw, tasknode, slt.toByteArray());
314    waitForCounter(tot_mgr_heartbeat, 1, 2, to/2);
315    waitForCounter(tot_mgr_resubmit, 1, 2, to + to/2);
316    int version2 = ZKUtil.checkExists(zkw, tasknode);
317    assertTrue(version2 > version1);
318    slt = new SplitLogTask.Owned(worker3);
319    ZKUtil.setData(zkw, tasknode, slt.toByteArray());
320    waitForCounter(tot_mgr_heartbeat, 2, 3, to/2);
321    waitForCounter(tot_mgr_resubmit_threshold_reached, 0, 1, to + to/2);
322    Thread.sleep(to + to/2);
323    assertEquals(2L, tot_mgr_resubmit.sum() - tot_mgr_resubmit_force.sum());
324  }
325
326  @Test
327  public void testRescanCleanup() throws Exception {
328    LOG.info("TestRescanCleanup - ensure RESCAN nodes are cleaned up");
329
330    slm = new SplitLogManager(master, conf);
331    TaskBatch batch = new TaskBatch();
332
333    String tasknode = submitTaskAndWait(batch, "foo/1");
334    int version = ZKUtil.checkExists(zkw, tasknode);
335    final ServerName worker1 = ServerName.valueOf("worker1,1,1");
336    SplitLogTask slt = new SplitLogTask.Owned(worker1);
337    ZKUtil.setData(zkw, tasknode, slt.toByteArray());
338    waitForCounter(tot_mgr_heartbeat, 0, 1, to/2);
339    waitForCounter(new Expr() {
340      @Override
341      public long eval() {
342        return (tot_mgr_resubmit.sum() + tot_mgr_resubmit_failed.sum());
343      }
344    }, 0, 1, 5*60000); // wait long enough
345    Assert.assertEquals("Could not run test. Lost ZK connection?",
346      0, tot_mgr_resubmit_failed.sum());
347    int version1 = ZKUtil.checkExists(zkw, tasknode);
348    assertTrue(version1 > version);
349    byte[] taskstate = ZKUtil.getData(zkw, tasknode);
350    slt = SplitLogTask.parseFrom(taskstate);
351    assertTrue(slt.isUnassigned(master.getServerName()));
352
353    waitForCounter(tot_mgr_rescan_deleted, 0, 1, to/2);
354  }
355
356  @Test
357  public void testTaskDone() throws Exception {
358    LOG.info("TestTaskDone - cleanup task node once in DONE state");
359
360    slm = new SplitLogManager(master, conf);
361    TaskBatch batch = new TaskBatch();
362    String tasknode = submitTaskAndWait(batch, "foo/1");
363    final ServerName worker1 = ServerName.valueOf("worker1,1,1");
364    SplitLogTask slt = new SplitLogTask.Done(worker1);
365    ZKUtil.setData(zkw, tasknode, slt.toByteArray());
366    synchronized (batch) {
367      while (batch.installed != batch.done) {
368        batch.wait();
369      }
370    }
371    waitForCounter(tot_mgr_task_deleted, 0, 1, to/2);
372    assertTrue(ZKUtil.checkExists(zkw, tasknode) == -1);
373  }
374
375  @Test
376  public void testTaskErr() throws Exception {
377    LOG.info("TestTaskErr - cleanup task node once in ERR state");
378
379    conf.setInt("hbase.splitlog.max.resubmit", 0);
380    slm = new SplitLogManager(master, conf);
381    TaskBatch batch = new TaskBatch();
382
383    String tasknode = submitTaskAndWait(batch, "foo/1");
384    final ServerName worker1 = ServerName.valueOf("worker1,1,1");
385    SplitLogTask slt = new SplitLogTask.Err(worker1);
386    ZKUtil.setData(zkw, tasknode, slt.toByteArray());
387
388    synchronized (batch) {
389      while (batch.installed != batch.error) {
390        batch.wait();
391      }
392    }
393    waitForCounter(tot_mgr_task_deleted, 0, 1, to/2);
394    assertTrue(ZKUtil.checkExists(zkw, tasknode) == -1);
395    conf.setInt("hbase.splitlog.max.resubmit", ZKSplitLogManagerCoordination.DEFAULT_MAX_RESUBMIT);
396  }
397
398  @Test
399  public void testTaskResigned() throws Exception {
400    LOG.info("TestTaskResigned - resubmit task node once in RESIGNED state");
401    assertEquals(0, tot_mgr_resubmit.sum());
402    slm = new SplitLogManager(master, conf);
403    assertEquals(0, tot_mgr_resubmit.sum());
404    TaskBatch batch = new TaskBatch();
405    String tasknode = submitTaskAndWait(batch, "foo/1");
406    assertEquals(0, tot_mgr_resubmit.sum());
407    final ServerName worker1 = ServerName.valueOf("worker1,1,1");
408    assertEquals(0, tot_mgr_resubmit.sum());
409    SplitLogTask slt = new SplitLogTask.Resigned(worker1);
410    assertEquals(0, tot_mgr_resubmit.sum());
411    ZKUtil.setData(zkw, tasknode, slt.toByteArray());
412    ZKUtil.checkExists(zkw, tasknode);
413    // Could be small race here.
414    if (tot_mgr_resubmit.sum() == 0) {
415      waitForCounter(tot_mgr_resubmit, 0, 1, to/2);
416    }
417    assertEquals(1, tot_mgr_resubmit.sum());
418
419    byte[] taskstate = ZKUtil.getData(zkw, tasknode);
420    slt = SplitLogTask.parseFrom(taskstate);
421    assertTrue(slt.isUnassigned(master.getServerName()));
422  }
423
424  @Test
425  public void testUnassignedTimeout() throws Exception {
426    LOG.info("TestUnassignedTimeout - iff all tasks are unassigned then" +
427        " resubmit");
428
429    // create an orphan task in OWNED state
430    String tasknode1 = ZKSplitLog.getEncodedNodeName(zkw, "orphan/1");
431    final ServerName worker1 = ServerName.valueOf("worker1,1,1");
432    SplitLogTask slt = new SplitLogTask.Owned(worker1);
433    zkw.getRecoverableZooKeeper().create(tasknode1, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
434        CreateMode.PERSISTENT);
435
436    slm = new SplitLogManager(master, conf);
437    waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2);
438
439    // submit another task which will stay in unassigned mode
440    TaskBatch batch = new TaskBatch();
441    submitTaskAndWait(batch, "foo/1");
442
443    // keep updating the orphan owned node every to/2 seconds
444    for (int i = 0; i < (3 * to)/100; i++) {
445      Thread.sleep(100);
446      final ServerName worker2 = ServerName.valueOf("worker1,1,1");
447      slt = new SplitLogTask.Owned(worker2);
448      ZKUtil.setData(zkw, tasknode1, slt.toByteArray());
449    }
450
451    // since we have stopped heartbeating the owned node therefore it should
452    // get resubmitted
453    LOG.info("waiting for manager to resubmit the orphan task");
454    waitForCounter(tot_mgr_resubmit, 0, 1, to + to/2);
455
456    // now all the nodes are unassigned. manager should post another rescan
457    waitForCounter(tot_mgr_resubmit_unassigned, 0, 1, 2 * to + to/2);
458  }
459
460  @Test
461  public void testDeadWorker() throws Exception {
462    LOG.info("testDeadWorker");
463
464    conf.setLong("hbase.splitlog.max.resubmit", 0);
465    slm = new SplitLogManager(master, conf);
466    TaskBatch batch = new TaskBatch();
467
468    String tasknode = submitTaskAndWait(batch, "foo/1");
469    int version = ZKUtil.checkExists(zkw, tasknode);
470    final ServerName worker1 = ServerName.valueOf("worker1,1,1");
471    SplitLogTask slt = new SplitLogTask.Owned(worker1);
472    ZKUtil.setData(zkw, tasknode, slt.toByteArray());
473    if (tot_mgr_heartbeat.sum() == 0) {
474      waitForCounter(tot_mgr_heartbeat, 0, 1, to/2);
475    }
476    slm.handleDeadWorker(worker1);
477    if (tot_mgr_resubmit.sum() == 0) {
478      waitForCounter(tot_mgr_resubmit, 0, 1, to+to/2);
479    }
480    if (tot_mgr_resubmit_dead_server_task.sum() == 0) {
481      waitForCounter(tot_mgr_resubmit_dead_server_task, 0, 1, to + to/2);
482    }
483
484    int version1 = ZKUtil.checkExists(zkw, tasknode);
485    assertTrue(version1 > version);
486    byte[] taskstate = ZKUtil.getData(zkw, tasknode);
487    slt = SplitLogTask.parseFrom(taskstate);
488    assertTrue(slt.isUnassigned(master.getServerName()));
489    return;
490  }
491
492  @Test
493  public void testWorkerCrash() throws Exception {
494    slm = new SplitLogManager(master, conf);
495    TaskBatch batch = new TaskBatch();
496
497    String tasknode = submitTaskAndWait(batch, "foo/1");
498    final ServerName worker1 = ServerName.valueOf("worker1,1,1");
499
500    SplitLogTask slt = new SplitLogTask.Owned(worker1);
501    ZKUtil.setData(zkw, tasknode, slt.toByteArray());
502    if (tot_mgr_heartbeat.sum() == 0) {
503      waitForCounter(tot_mgr_heartbeat, 0, 1, to/2);
504    }
505
506    // Not yet resubmitted.
507    Assert.assertEquals(0, tot_mgr_resubmit.sum());
508
509    // This server becomes dead
510    Mockito.when(sm.isServerOnline(worker1)).thenReturn(false);
511
512    Thread.sleep(1300); // The timeout checker is done every 1000 ms (hardcoded).
513
514    // It has been resubmitted
515    Assert.assertEquals(1, tot_mgr_resubmit.sum());
516  }
517
518  @Test
519  public void testEmptyLogDir() throws Exception {
520    LOG.info("testEmptyLogDir");
521    slm = new SplitLogManager(master, conf);
522    FileSystem fs = TEST_UTIL.getTestFileSystem();
523    Path emptyLogDirPath = new Path(new Path(fs.getWorkingDirectory(),
524      HConstants.HREGION_LOGDIR_NAME),
525        ServerName.valueOf("emptyLogDir", 1, 1).toString());
526    fs.mkdirs(emptyLogDirPath);
527    slm.splitLogDistributed(emptyLogDirPath);
528    assertFalse(fs.exists(emptyLogDirPath));
529  }
530
531  @Test
532  public void testLogFilesAreArchived() throws Exception {
533    LOG.info("testLogFilesAreArchived");
534    slm = new SplitLogManager(master, conf);
535    FileSystem fs = TEST_UTIL.getTestFileSystem();
536    Path dir = TEST_UTIL.getDataTestDirOnTestFS("testLogFilesAreArchived");
537    conf.set(HConstants.HBASE_DIR, dir.toString());
538    String serverName = ServerName.valueOf("foo", 1, 1).toString();
539    Path logDirPath = new Path(new Path(dir, HConstants.HREGION_LOGDIR_NAME), serverName);
540    fs.mkdirs(logDirPath);
541    // create an empty log file
542    String logFile = new Path(logDirPath, TEST_UTIL.getRandomUUID().toString()).toString();
543    fs.create(new Path(logDirPath, logFile)).close();
544
545    // spin up a thread mocking split done.
546    new Thread() {
547      @Override
548      public void run() {
549        boolean done = false;
550        while (!done) {
551          for (Map.Entry<String, Task> entry : slm.getTasks().entrySet()) {
552            final ServerName worker1 = ServerName.valueOf("worker1,1,1");
553            SplitLogTask slt = new SplitLogTask.Done(worker1);
554            boolean encounteredZKException = false;
555            try {
556              ZKUtil.setData(zkw, entry.getKey(), slt.toByteArray());
557            } catch (KeeperException e) {
558              LOG.warn(e.toString(), e);
559              encounteredZKException = true;
560            }
561            if (!encounteredZKException) {
562              done = true;
563            }
564          }
565        }
566      };
567    }.start();
568
569    slm.splitLogDistributed(logDirPath);
570
571    assertFalse(fs.exists(logDirPath));
572  }
573}