001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master;
019
020import static org.apache.hadoop.hbase.SplitLogCounters.resetCounters;
021import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_heartbeat;
022import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_node_create_queued;
023import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_orphan_task_acquired;
024import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_rescan;
025import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_rescan_deleted;
026import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit;
027import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_dead_server_task;
028import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_failed;
029import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_force;
030import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_threshold_reached;
031import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_unassigned;
032import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_task_deleted;
033import static org.junit.jupiter.api.Assertions.assertEquals;
034import static org.junit.jupiter.api.Assertions.assertFalse;
035import static org.junit.jupiter.api.Assertions.assertTrue;
036
037import java.io.IOException;
038import java.util.Map;
039import java.util.concurrent.atomic.LongAdder;
040import org.apache.hadoop.conf.Configuration;
041import org.apache.hadoop.fs.FileSystem;
042import org.apache.hadoop.fs.Path;
043import org.apache.hadoop.hbase.CoordinatedStateManager;
044import org.apache.hadoop.hbase.HBaseTestingUtil;
045import org.apache.hadoop.hbase.HConstants;
046import org.apache.hadoop.hbase.ServerName;
047import org.apache.hadoop.hbase.SplitLogCounters;
048import org.apache.hadoop.hbase.SplitLogTask;
049import org.apache.hadoop.hbase.Waiter;
050import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination;
051import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager;
052import org.apache.hadoop.hbase.master.SplitLogManager.Task;
053import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch;
054import org.apache.hadoop.hbase.testclassification.LargeTests;
055import org.apache.hadoop.hbase.testclassification.MasterTests;
056import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
057import org.apache.hadoop.hbase.zookeeper.TestMasterAddressTracker.NodeCreationListener;
058import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
059import org.apache.hadoop.hbase.zookeeper.ZKUtil;
060import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
061import org.apache.zookeeper.CreateMode;
062import org.apache.zookeeper.KeeperException;
063import org.apache.zookeeper.ZooDefs.Ids;
064import org.junit.jupiter.api.AfterEach;
065import org.junit.jupiter.api.BeforeEach;
066import org.junit.jupiter.api.Tag;
067import org.junit.jupiter.api.Test;
068import org.mockito.Mockito;
069import org.slf4j.Logger;
070import org.slf4j.LoggerFactory;
071
072@Tag(MasterTests.TAG)
073@Tag(LargeTests.TAG)
074public class TestSplitLogManager {
075
076  private static final Logger LOG = LoggerFactory.getLogger(TestSplitLogManager.class);
077
078  private final ServerManager sm = Mockito.mock(ServerManager.class);
079
080  private ZKWatcher zkw;
081  private DummyMasterServices master;
082  private SplitLogManager slm;
083  private Configuration conf;
084  private int to;
085
086  private static HBaseTestingUtil TEST_UTIL;
087
088  class DummyMasterServices extends MockNoopMasterServices {
089    private ZKWatcher zkw;
090    private CoordinatedStateManager cm;
091
092    public DummyMasterServices(ZKWatcher zkw, Configuration conf) {
093      super(conf);
094      this.zkw = zkw;
095      cm = new ZkCoordinatedStateManager(this);
096    }
097
098    @Override
099    public ZKWatcher getZooKeeper() {
100      return zkw;
101    }
102
103    @Override
104    public CoordinatedStateManager getCoordinatedStateManager() {
105      return cm;
106    }
107
108    @Override
109    public ServerManager getServerManager() {
110      return sm;
111    }
112  }
113
114  @BeforeEach
115  public void setup() throws Exception {
116    TEST_UTIL = new HBaseTestingUtil();
117    TEST_UTIL.startMiniZKCluster();
118    conf = TEST_UTIL.getConfiguration();
119    // Use a different ZK wrapper instance for each tests.
120    zkw =
121      new ZKWatcher(conf, "split-log-manager-tests" + TEST_UTIL.getRandomUUID().toString(), null);
122    master = new DummyMasterServices(zkw, conf);
123
124    ZKUtil.deleteChildrenRecursively(zkw, zkw.getZNodePaths().baseZNode);
125    ZKUtil.createAndFailSilent(zkw, zkw.getZNodePaths().baseZNode);
126    assertTrue(ZKUtil.checkExists(zkw, zkw.getZNodePaths().baseZNode) != -1);
127    LOG.debug(zkw.getZNodePaths().baseZNode + " created");
128    ZKUtil.createAndFailSilent(zkw, zkw.getZNodePaths().splitLogZNode);
129    assertTrue(ZKUtil.checkExists(zkw, zkw.getZNodePaths().splitLogZNode) != -1);
130    LOG.debug(zkw.getZNodePaths().splitLogZNode + " created");
131
132    resetCounters();
133
134    // By default, we let the test manage the error as before, so the server
135    // does not appear as dead from the master point of view, only from the split log pov.
136    Mockito.when(sm.isServerOnline(Mockito.any())).thenReturn(true);
137
138    to = 12000;
139    conf.setInt(HConstants.HBASE_SPLITLOG_MANAGER_TIMEOUT, to);
140    conf.setInt("hbase.splitlog.manager.unassigned.timeout", 2 * to);
141
142    conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100);
143    to = to + 16 * 100;
144  }
145
146  @AfterEach
147  public void teardown() throws IOException, KeeperException {
148    master.stop("");
149    if (slm != null) {
150      slm.stop();
151    }
152    TEST_UTIL.shutdownMiniZKCluster();
153  }
154
155  @Test
156  public void testBatchWaitMillis() {
157    assertEquals(100, SplitLogManager.getBatchWaitTimeMillis(0));
158    assertEquals(100, SplitLogManager.getBatchWaitTimeMillis(1));
159    assertEquals(1000, SplitLogManager.getBatchWaitTimeMillis(10));
160    assertEquals(60_000, SplitLogManager.getBatchWaitTimeMillis(101));
161    assertEquals(60_000, SplitLogManager.getBatchWaitTimeMillis(1011));
162  }
163
164  private interface Expr {
165    long eval();
166  }
167
168  private void waitForCounter(final LongAdder ctr, long oldval, long newval, long timems)
169    throws Exception {
170    Expr e = new Expr() {
171      @Override
172      public long eval() {
173        return ctr.sum();
174      }
175    };
176    waitForCounter(e, oldval, newval, timems);
177    return;
178  }
179
180  private void waitForCounter(final Expr e, final long oldval, long newval, long timems)
181    throws Exception {
182
183    TEST_UTIL.waitFor(timems, 10, new Waiter.Predicate<Exception>() {
184      @Override
185      public boolean evaluate() throws Exception {
186        return (e.eval() != oldval);
187      }
188    });
189
190    assertEquals(newval, e.eval());
191  }
192
193  private Task findOrCreateOrphanTask(String path) {
194    return slm.tasks.computeIfAbsent(path, k -> {
195      LOG.info("creating orphan task " + k);
196      SplitLogCounters.tot_mgr_orphan_task_acquired.increment();
197      return new Task();
198    });
199  }
200
201  private String submitTaskAndWait(TaskBatch batch, String name)
202    throws KeeperException, InterruptedException {
203    String tasknode = ZKSplitLog.getEncodedNodeName(zkw, name);
204    NodeCreationListener listener = new NodeCreationListener(zkw, tasknode);
205    zkw.registerListener(listener);
206    ZKUtil.watchAndCheckExists(zkw, tasknode);
207
208    slm.enqueueSplitTask(name, batch);
209    assertEquals(1, batch.installed);
210    assertTrue(findOrCreateOrphanTask(tasknode).batch == batch);
211    assertEquals(1L, tot_mgr_node_create_queued.sum());
212
213    LOG.debug("waiting for task node creation");
214    listener.waitForCreation();
215    LOG.debug("task created");
216    return tasknode;
217  }
218
219  /**
220   * Test whether the splitlog correctly creates a task in zookeeper
221   */
222  @Test
223  public void testTaskCreation() throws Exception {
224
225    LOG.info("TestTaskCreation - test the creation of a task in zk");
226    slm = new SplitLogManager(master, conf);
227    TaskBatch batch = new TaskBatch();
228
229    String tasknode = submitTaskAndWait(batch, "foo/1");
230
231    byte[] data = ZKUtil.getData(zkw, tasknode);
232    SplitLogTask slt = SplitLogTask.parseFrom(data);
233    LOG.info("Task node created " + slt.toString());
234    assertTrue(slt.isUnassigned(master.getServerName()));
235  }
236
237  @Test
238  public void testOrphanTaskAcquisition() throws Exception {
239    LOG.info("TestOrphanTaskAcquisition");
240
241    String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash");
242    SplitLogTask slt = new SplitLogTask.Owned(master.getServerName());
243    zkw.getRecoverableZooKeeper().create(tasknode, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
244      CreateMode.PERSISTENT);
245
246    slm = new SplitLogManager(master, conf);
247    waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to / 2);
248    Task task = findOrCreateOrphanTask(tasknode);
249    assertTrue(task.isOrphan());
250    waitForCounter(tot_mgr_heartbeat, 0, 1, to / 2);
251    assertFalse(task.isUnassigned());
252    long curt = EnvironmentEdgeManager.currentTime();
253    assertTrue((task.last_update <= curt) && (task.last_update > (curt - 1000)));
254    LOG.info("waiting for manager to resubmit the orphan task");
255    waitForCounter(tot_mgr_resubmit, 0, 1, to + to / 2);
256    assertTrue(task.isUnassigned());
257    waitForCounter(tot_mgr_rescan, 0, 1, to + to / 2);
258  }
259
260  @Test
261  public void testUnassignedOrphan() throws Exception {
262    LOG.info("TestUnassignedOrphan - an unassigned task is resubmitted at" + " startup");
263    String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash");
264    // create an unassigned orphan task
265    SplitLogTask slt = new SplitLogTask.Unassigned(master.getServerName());
266    zkw.getRecoverableZooKeeper().create(tasknode, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
267      CreateMode.PERSISTENT);
268    int version = ZKUtil.checkExists(zkw, tasknode);
269
270    slm = new SplitLogManager(master, conf);
271    waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to / 2);
272    Task task = findOrCreateOrphanTask(tasknode);
273    assertTrue(task.isOrphan());
274    assertTrue(task.isUnassigned());
275    // wait for RESCAN node to be created
276    waitForCounter(tot_mgr_rescan, 0, 1, to / 2);
277    Task task2 = findOrCreateOrphanTask(tasknode);
278    assertTrue(task == task2);
279    LOG.debug("task = " + task);
280    assertEquals(1L, tot_mgr_resubmit.sum());
281    assertEquals(1, task.incarnation.get());
282    assertEquals(0, task.unforcedResubmits.get());
283    assertTrue(task.isOrphan());
284    assertTrue(task.isUnassigned());
285    assertTrue(ZKUtil.checkExists(zkw, tasknode) > version);
286  }
287
288  @Test
289  public void testMultipleResubmits() throws Exception {
290    LOG.info("TestMultipleResbmits - no indefinite resubmissions");
291    conf.setInt("hbase.splitlog.max.resubmit", 2);
292    slm = new SplitLogManager(master, conf);
293    TaskBatch batch = new TaskBatch();
294
295    String tasknode = submitTaskAndWait(batch, "foo/1");
296    int version = ZKUtil.checkExists(zkw, tasknode);
297    final ServerName worker1 = ServerName.valueOf("worker1,1,1");
298    final ServerName worker2 = ServerName.valueOf("worker2,1,1");
299    final ServerName worker3 = ServerName.valueOf("worker3,1,1");
300    SplitLogTask slt = new SplitLogTask.Owned(worker1);
301    ZKUtil.setData(zkw, tasknode, slt.toByteArray());
302    waitForCounter(tot_mgr_heartbeat, 0, 1, to / 2);
303    waitForCounter(tot_mgr_resubmit, 0, 1, to + to / 2);
304    int version1 = ZKUtil.checkExists(zkw, tasknode);
305    assertTrue(version1 > version);
306    slt = new SplitLogTask.Owned(worker2);
307    ZKUtil.setData(zkw, tasknode, slt.toByteArray());
308    waitForCounter(tot_mgr_heartbeat, 1, 2, to / 2);
309    waitForCounter(tot_mgr_resubmit, 1, 2, to + to / 2);
310    int version2 = ZKUtil.checkExists(zkw, tasknode);
311    assertTrue(version2 > version1);
312    slt = new SplitLogTask.Owned(worker3);
313    ZKUtil.setData(zkw, tasknode, slt.toByteArray());
314    waitForCounter(tot_mgr_heartbeat, 2, 3, to / 2);
315    waitForCounter(tot_mgr_resubmit_threshold_reached, 0, 1, to + to / 2);
316    Thread.sleep(to + to / 2);
317    assertEquals(2L, tot_mgr_resubmit.sum() - tot_mgr_resubmit_force.sum());
318  }
319
320  @Test
321  public void testRescanCleanup() throws Exception {
322    LOG.info("TestRescanCleanup - ensure RESCAN nodes are cleaned up");
323
324    slm = new SplitLogManager(master, conf);
325    TaskBatch batch = new TaskBatch();
326
327    String tasknode = submitTaskAndWait(batch, "foo/1");
328    int version = ZKUtil.checkExists(zkw, tasknode);
329    final ServerName worker1 = ServerName.valueOf("worker1,1,1");
330    SplitLogTask slt = new SplitLogTask.Owned(worker1);
331    ZKUtil.setData(zkw, tasknode, slt.toByteArray());
332    waitForCounter(tot_mgr_heartbeat, 0, 1, to / 2);
333    waitForCounter(new Expr() {
334      @Override
335      public long eval() {
336        return (tot_mgr_resubmit.sum() + tot_mgr_resubmit_failed.sum());
337      }
338    }, 0, 1, 5 * 60000); // wait long enough
339    assertEquals(tot_mgr_resubmit_failed.sum(), 0, "Could not run test. Lost ZK connection?");
340    int version1 = ZKUtil.checkExists(zkw, tasknode);
341    assertTrue(version1 > version);
342    byte[] taskstate = ZKUtil.getData(zkw, tasknode);
343    slt = SplitLogTask.parseFrom(taskstate);
344    assertTrue(slt.isUnassigned(master.getServerName()));
345
346    waitForCounter(tot_mgr_rescan_deleted, 0, 1, to / 2);
347  }
348
349  @Test
350  public void testTaskDone() throws Exception {
351    LOG.info("TestTaskDone - cleanup task node once in DONE state");
352
353    slm = new SplitLogManager(master, conf);
354    TaskBatch batch = new TaskBatch();
355    String tasknode = submitTaskAndWait(batch, "foo/1");
356    final ServerName worker1 = ServerName.valueOf("worker1,1,1");
357    SplitLogTask slt = new SplitLogTask.Done(worker1);
358    ZKUtil.setData(zkw, tasknode, slt.toByteArray());
359    synchronized (batch) {
360      while (batch.installed != batch.done) {
361        batch.wait();
362      }
363    }
364    waitForCounter(tot_mgr_task_deleted, 0, 1, to / 2);
365    assertTrue(ZKUtil.checkExists(zkw, tasknode) == -1);
366  }
367
368  @Test
369  public void testTaskErr() throws Exception {
370    LOG.info("TestTaskErr - cleanup task node once in ERR state");
371
372    conf.setInt("hbase.splitlog.max.resubmit", 0);
373    slm = new SplitLogManager(master, conf);
374    TaskBatch batch = new TaskBatch();
375
376    String tasknode = submitTaskAndWait(batch, "foo/1");
377    final ServerName worker1 = ServerName.valueOf("worker1,1,1");
378    SplitLogTask slt = new SplitLogTask.Err(worker1);
379    ZKUtil.setData(zkw, tasknode, slt.toByteArray());
380
381    synchronized (batch) {
382      while (batch.installed != batch.error) {
383        batch.wait();
384      }
385    }
386    waitForCounter(tot_mgr_task_deleted, 0, 1, to / 2);
387    assertTrue(ZKUtil.checkExists(zkw, tasknode) == -1);
388    conf.setInt("hbase.splitlog.max.resubmit", ZKSplitLogManagerCoordination.DEFAULT_MAX_RESUBMIT);
389  }
390
391  @Test
392  public void testTaskResigned() throws Exception {
393    LOG.info("TestTaskResigned - resubmit task node once in RESIGNED state");
394    assertEquals(0, tot_mgr_resubmit.sum());
395    slm = new SplitLogManager(master, conf);
396    assertEquals(0, tot_mgr_resubmit.sum());
397    TaskBatch batch = new TaskBatch();
398    String tasknode = submitTaskAndWait(batch, "foo/1");
399    assertEquals(0, tot_mgr_resubmit.sum());
400    final ServerName worker1 = ServerName.valueOf("worker1,1,1");
401    assertEquals(0, tot_mgr_resubmit.sum());
402    SplitLogTask slt = new SplitLogTask.Resigned(worker1);
403    assertEquals(0, tot_mgr_resubmit.sum());
404    ZKUtil.setData(zkw, tasknode, slt.toByteArray());
405    ZKUtil.checkExists(zkw, tasknode);
406    // Could be small race here.
407    if (tot_mgr_resubmit.sum() == 0) {
408      waitForCounter(tot_mgr_resubmit, 0, 1, to / 2);
409    }
410    assertEquals(1, tot_mgr_resubmit.sum());
411
412    byte[] taskstate = ZKUtil.getData(zkw, tasknode);
413    slt = SplitLogTask.parseFrom(taskstate);
414    assertTrue(slt.isUnassigned(master.getServerName()));
415  }
416
417  @Test
418  public void testUnassignedTimeout() throws Exception {
419    LOG.info("TestUnassignedTimeout - iff all tasks are unassigned then" + " resubmit");
420
421    // create an orphan task in OWNED state
422    String tasknode1 = ZKSplitLog.getEncodedNodeName(zkw, "orphan/1");
423    final ServerName worker1 = ServerName.valueOf("worker1,1,1");
424    SplitLogTask slt = new SplitLogTask.Owned(worker1);
425    zkw.getRecoverableZooKeeper().create(tasknode1, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
426      CreateMode.PERSISTENT);
427
428    slm = new SplitLogManager(master, conf);
429    waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to / 2);
430
431    // submit another task which will stay in unassigned mode
432    TaskBatch batch = new TaskBatch();
433    submitTaskAndWait(batch, "foo/1");
434
435    // keep updating the orphan owned node every to/2 seconds
436    for (int i = 0; i < (3 * to) / 100; i++) {
437      Thread.sleep(100);
438      final ServerName worker2 = ServerName.valueOf("worker1,1,1");
439      slt = new SplitLogTask.Owned(worker2);
440      ZKUtil.setData(zkw, tasknode1, slt.toByteArray());
441    }
442
443    // since we have stopped heartbeating the owned node therefore it should
444    // get resubmitted
445    LOG.info("waiting for manager to resubmit the orphan task");
446    waitForCounter(tot_mgr_resubmit, 0, 1, to + to / 2);
447
448    // now all the nodes are unassigned. manager should post another rescan
449    waitForCounter(tot_mgr_resubmit_unassigned, 0, 1, 2 * to + to / 2);
450  }
451
452  @Test
453  public void testDeadWorker() throws Exception {
454    LOG.info("testDeadWorker");
455
456    conf.setLong("hbase.splitlog.max.resubmit", 0);
457    slm = new SplitLogManager(master, conf);
458    TaskBatch batch = new TaskBatch();
459
460    String tasknode = submitTaskAndWait(batch, "foo/1");
461    int version = ZKUtil.checkExists(zkw, tasknode);
462    final ServerName worker1 = ServerName.valueOf("worker1,1,1");
463    SplitLogTask slt = new SplitLogTask.Owned(worker1);
464    ZKUtil.setData(zkw, tasknode, slt.toByteArray());
465    if (tot_mgr_heartbeat.sum() == 0) {
466      waitForCounter(tot_mgr_heartbeat, 0, 1, to / 2);
467    }
468    slm.handleDeadWorker(worker1);
469    if (tot_mgr_resubmit.sum() == 0) {
470      waitForCounter(tot_mgr_resubmit, 0, 1, to + to / 2);
471    }
472    if (tot_mgr_resubmit_dead_server_task.sum() == 0) {
473      waitForCounter(tot_mgr_resubmit_dead_server_task, 0, 1, to + to / 2);
474    }
475
476    int version1 = ZKUtil.checkExists(zkw, tasknode);
477    assertTrue(version1 > version);
478    byte[] taskstate = ZKUtil.getData(zkw, tasknode);
479    slt = SplitLogTask.parseFrom(taskstate);
480    assertTrue(slt.isUnassigned(master.getServerName()));
481    return;
482  }
483
484  @Test
485  public void testWorkerCrash() throws Exception {
486    slm = new SplitLogManager(master, conf);
487    TaskBatch batch = new TaskBatch();
488
489    String tasknode = submitTaskAndWait(batch, "foo/1");
490    final ServerName worker1 = ServerName.valueOf("worker1,1,1");
491
492    SplitLogTask slt = new SplitLogTask.Owned(worker1);
493    ZKUtil.setData(zkw, tasknode, slt.toByteArray());
494    if (tot_mgr_heartbeat.sum() == 0) {
495      waitForCounter(tot_mgr_heartbeat, 0, 1, to / 2);
496    }
497
498    // Not yet resubmitted.
499    assertEquals(0, tot_mgr_resubmit.sum());
500
501    // This server becomes dead
502    Mockito.when(sm.isServerOnline(worker1)).thenReturn(false);
503
504    Thread.sleep(1300); // The timeout checker is done every 1000 ms (hardcoded).
505
506    // It has been resubmitted
507    assertEquals(1, tot_mgr_resubmit.sum());
508  }
509
510  @Test
511  public void testEmptyLogDir() throws Exception {
512    LOG.info("testEmptyLogDir");
513    slm = new SplitLogManager(master, conf);
514    FileSystem fs = TEST_UTIL.getTestFileSystem();
515    Path emptyLogDirPath =
516      new Path(new Path(fs.getWorkingDirectory(), HConstants.HREGION_LOGDIR_NAME),
517        ServerName.valueOf("emptyLogDir", 1, 1).toString());
518    fs.mkdirs(emptyLogDirPath);
519    slm.splitLogDistributed(emptyLogDirPath);
520    assertFalse(fs.exists(emptyLogDirPath));
521  }
522
523  @Test
524  public void testLogFilesAreArchived() throws Exception {
525    LOG.info("testLogFilesAreArchived");
526    slm = new SplitLogManager(master, conf);
527    FileSystem fs = TEST_UTIL.getTestFileSystem();
528    Path dir = TEST_UTIL.getDataTestDirOnTestFS("testLogFilesAreArchived");
529    conf.set(HConstants.HBASE_DIR, dir.toString());
530    String serverName = ServerName.valueOf("foo", 1, 1).toString();
531    Path logDirPath = new Path(new Path(dir, HConstants.HREGION_LOGDIR_NAME), serverName);
532    fs.mkdirs(logDirPath);
533    // create an empty log file
534    String logFile = new Path(logDirPath, TEST_UTIL.getRandomUUID().toString()).toString();
535    fs.create(new Path(logDirPath, logFile)).close();
536
537    // spin up a thread mocking split done.
538    new Thread() {
539      @Override
540      public void run() {
541        boolean done = false;
542        while (!done) {
543          for (Map.Entry<String, Task> entry : slm.getTasks().entrySet()) {
544            final ServerName worker1 = ServerName.valueOf("worker1,1,1");
545            SplitLogTask slt = new SplitLogTask.Done(worker1);
546            boolean encounteredZKException = false;
547            try {
548              ZKUtil.setData(zkw, entry.getKey(), slt.toByteArray());
549            } catch (KeeperException e) {
550              LOG.warn(e.toString(), e);
551              encounteredZKException = true;
552            }
553            if (!encounteredZKException) {
554              done = true;
555            }
556          }
557        }
558      }
559    }.start();
560
561    slm.splitLogDistributed(logDirPath);
562
563    assertFalse(fs.exists(logDirPath));
564  }
565}