001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER;
021import static org.hamcrest.CoreMatchers.is;
022import static org.hamcrest.CoreMatchers.not;
023import static org.hamcrest.MatcherAssert.assertThat;
024import static org.junit.Assert.assertEquals;
025import static org.junit.Assert.assertTrue;
026import static org.mockito.Mockito.mock;
027import static org.mockito.Mockito.when;
028
029import java.util.List;
030import java.util.Objects;
031import java.util.concurrent.atomic.LongAdder;
032
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.hbase.CoordinatedStateManager;
035import org.apache.hadoop.hbase.HBaseClassTestRule;
036import org.apache.hadoop.hbase.HBaseConfiguration;
037import org.apache.hadoop.hbase.HBaseTestingUtil;
038import org.apache.hadoop.hbase.ServerName;
039import org.apache.hadoop.hbase.SplitLogCounters;
040import org.apache.hadoop.hbase.SplitLogTask;
041import org.apache.hadoop.hbase.Waiter;
042import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager;
043import org.apache.hadoop.hbase.executor.ExecutorService;
044import org.apache.hadoop.hbase.executor.ExecutorType;
045import org.apache.hadoop.hbase.testclassification.MediumTests;
046import org.apache.hadoop.hbase.testclassification.RegionServerTests;
047import org.apache.hadoop.hbase.util.CancelableProgressable;
048import org.apache.hadoop.hbase.util.MockServer;
049import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
050import org.apache.hadoop.hbase.zookeeper.ZKUtil;
051import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
052import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
053import org.apache.zookeeper.CreateMode;
054import org.apache.zookeeper.ZooDefs.Ids;
055import org.junit.After;
056import org.junit.Before;
057import org.junit.ClassRule;
058import org.junit.Test;
059import org.junit.experimental.categories.Category;
060import org.slf4j.Logger;
061import org.slf4j.LoggerFactory;
062
063@Category({RegionServerTests.class, MediumTests.class})
064public class TestSplitLogWorker {
065
066  @ClassRule
067  public static final HBaseClassTestRule CLASS_RULE =
068      HBaseClassTestRule.forClass(TestSplitLogWorker.class);
069
070  private static final Logger LOG = LoggerFactory.getLogger(TestSplitLogWorker.class);
071  private static final int WAIT_TIME = 15000;
072  private final ServerName MANAGER = ServerName.valueOf("manager,1,1");
073  private final static HBaseTestingUtil TEST_UTIL =
074    new HBaseTestingUtil();
075  private DummyServer ds;
076  private ZKWatcher zkw;
077  private SplitLogWorker slw;
078  private ExecutorService executorService;
079
080  static class DummyServer extends MockServer {
081    private ZKWatcher zkw;
082    private Configuration conf;
083    private CoordinatedStateManager cm;
084
085    public DummyServer(ZKWatcher zkw, Configuration conf) {
086      this.zkw = zkw;
087      this.conf = conf;
088      cm = new ZkCoordinatedStateManager(this);
089    }
090
091    @Override
092    public Configuration getConfiguration() {
093      return conf;
094    }
095
096    @Override
097    public ZKWatcher getZooKeeper() {
098      return zkw;
099    }
100
101    @Override
102    public CoordinatedStateManager getCoordinatedStateManager() {
103      return cm;
104    }
105  }
106
107  private void waitForCounter(LongAdder ctr, long oldval, long newval, long timems)
108      throws Exception {
109    assertTrue("ctr=" + ctr.sum() + ", oldval=" + oldval + ", newval=" + newval,
110      waitForCounterBoolean(ctr, oldval, newval, timems));
111  }
112
113  private boolean waitForCounterBoolean(final LongAdder ctr, final long oldval, long newval,
114      long timems) throws Exception {
115
116    return waitForCounterBoolean(ctr, oldval, newval, timems, true);
117  }
118
119  private boolean waitForCounterBoolean(final LongAdder ctr, final long oldval, final long newval,
120      long timems, boolean failIfTimeout) throws Exception {
121
122    long timeWaited = TEST_UTIL.waitFor(timems, 10, failIfTimeout,
123      new Waiter.Predicate<Exception>() {
124      @Override
125      public boolean evaluate() throws Exception {
126            return (ctr.sum() >= newval);
127      }
128    });
129
130    if( timeWaited > 0) {
131      // when not timed out
132      assertEquals(newval, ctr.sum());
133    }
134    return true;
135  }
136
137  @Before
138  public void setup() throws Exception {
139    TEST_UTIL.startMiniZKCluster();
140    Configuration conf = TEST_UTIL.getConfiguration();
141    zkw = new ZKWatcher(TEST_UTIL.getConfiguration(),
142        "split-log-worker-tests", null);
143    ds = new DummyServer(zkw, conf);
144    ZKUtil.deleteChildrenRecursively(zkw, zkw.getZNodePaths().baseZNode);
145    ZKUtil.createAndFailSilent(zkw, zkw.getZNodePaths().baseZNode);
146    assertThat(ZKUtil.checkExists(zkw, zkw.getZNodePaths().baseZNode), not(is(-1)));
147    LOG.debug(zkw.getZNodePaths().baseZNode + " created");
148    ZKUtil.createAndFailSilent(zkw, zkw.getZNodePaths().splitLogZNode);
149    assertThat(ZKUtil.checkExists(zkw, zkw.getZNodePaths().splitLogZNode), not(is(-1)));
150
151    LOG.debug(zkw.getZNodePaths().splitLogZNode + " created");
152    ZKUtil.createAndFailSilent(zkw, zkw.getZNodePaths().rsZNode);
153    assertThat(ZKUtil.checkExists(zkw, zkw.getZNodePaths().rsZNode), not(is(-1)));
154
155    SplitLogCounters.resetCounters();
156    executorService = new ExecutorService("TestSplitLogWorker");
157    executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
158        ExecutorType.RS_LOG_REPLAY_OPS).setCorePoolSize(10));
159  }
160
161  @After
162  public void teardown() throws Exception {
163    if (executorService != null) {
164      executorService.shutdown();
165    }
166    TEST_UTIL.shutdownMiniZKCluster();
167  }
168
169  SplitLogWorker.TaskExecutor neverEndingTask =
170    new SplitLogWorker.TaskExecutor() {
171
172      @Override
173      public Status exec(String name, CancelableProgressable p) {
174        while (true) {
175          try {
176            Thread.sleep(1000);
177          } catch (InterruptedException e) {
178            return Status.PREEMPTED;
179          }
180          if (!p.progress()) {
181            return Status.PREEMPTED;
182          }
183        }
184      }
185
186  };
187
188  @Test
189  public void testAcquireTaskAtStartup() throws Exception {
190    LOG.info("testAcquireTaskAtStartup");
191    SplitLogCounters.resetCounters();
192    final String TATAS = "tatas";
193    final ServerName RS = ServerName.valueOf("rs,1,1");
194    RegionServerServices mockedRS = getRegionServer(RS);
195    zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS),
196      new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1")).toByteArray(),
197        Ids.OPEN_ACL_UNSAFE,
198        CreateMode.PERSISTENT);
199
200    SplitLogWorker slw =
201        new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask);
202    slw.start();
203    try {
204      waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME);
205      byte [] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TATAS));
206      SplitLogTask slt = SplitLogTask.parseFrom(bytes);
207      assertTrue(slt.isOwned(RS));
208    } finally {
209     stopSplitLogWorker(slw);
210    }
211  }
212
213  private void stopSplitLogWorker(final SplitLogWorker slw)
214  throws InterruptedException {
215    if (slw != null) {
216      slw.stop();
217      slw.worker.join(WAIT_TIME);
218      if (slw.worker.isAlive()) {
219        assertTrue(("Could not stop the worker thread slw=" + slw) == null);
220      }
221    }
222  }
223
224  @Test
225  public void testRaceForTask() throws Exception {
226    LOG.info("testRaceForTask");
227    SplitLogCounters.resetCounters();
228    final String TRFT = "trft";
229    final ServerName SVR1 = ServerName.valueOf("svr1,1,1");
230    final ServerName SVR2 = ServerName.valueOf("svr2,1,1");
231    zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TRFT),
232      new SplitLogTask.Unassigned(MANAGER).toByteArray(),
233        Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
234    RegionServerServices mockedRS1 = getRegionServer(SVR1);
235    RegionServerServices mockedRS2 = getRegionServer(SVR2);
236    SplitLogWorker slw1 =
237        new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS1, neverEndingTask);
238    SplitLogWorker slw2 =
239        new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS2, neverEndingTask);
240    slw1.start();
241    slw2.start();
242    try {
243      waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME);
244      // Assert that either the tot_wkr_failed_to_grab_task_owned count was set of if
245      // not it, that we fell through to the next counter in line and it was set.
246      assertTrue(waitForCounterBoolean(SplitLogCounters.tot_wkr_failed_to_grab_task_owned, 0, 1,
247          WAIT_TIME, false) ||
248        SplitLogCounters.tot_wkr_failed_to_grab_task_lost_race.sum() == 1);
249      byte [] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TRFT));
250      SplitLogTask slt = SplitLogTask.parseFrom(bytes);
251      assertTrue(slt.isOwned(SVR1) || slt.isOwned(SVR2));
252    } finally {
253      stopSplitLogWorker(slw1);
254      stopSplitLogWorker(slw2);
255    }
256  }
257
258  @Test
259  public void testPreemptTask() throws Exception {
260    LOG.info("testPreemptTask");
261    SplitLogCounters.resetCounters();
262    final ServerName SRV = ServerName.valueOf("tpt_svr,1,1");
263    final String PATH = ZKSplitLog.getEncodedNodeName(zkw, "tpt_task");
264    RegionServerServices mockedRS = getRegionServer(SRV);
265    SplitLogWorker slw =
266        new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask);
267    slw.start();
268    try {
269      Thread.yield(); // let the worker start
270      Thread.sleep(1000);
271      waitForCounter(SplitLogCounters.tot_wkr_task_grabing, 0, 1, WAIT_TIME);
272
273      // this time create a task node after starting the splitLogWorker
274      zkw.getRecoverableZooKeeper().create(PATH,
275        new SplitLogTask.Unassigned(MANAGER).toByteArray(),
276        Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
277
278      waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME);
279      assertEquals(1, slw.getTaskReadySeq());
280      byte [] bytes = ZKUtil.getData(zkw, PATH);
281      SplitLogTask slt = SplitLogTask.parseFrom(bytes);
282      assertTrue(slt.isOwned(SRV));
283      slt = new SplitLogTask.Owned(MANAGER);
284      ZKUtil.setData(zkw, PATH, slt.toByteArray());
285      waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0, 1, WAIT_TIME);
286    } finally {
287      stopSplitLogWorker(slw);
288    }
289  }
290
291  @Test
292  public void testMultipleTasks() throws Exception {
293    LOG.info("testMultipleTasks");
294    SplitLogCounters.resetCounters();
295    final ServerName SRV = ServerName.valueOf("tmt_svr,1,1");
296    final String PATH1 = ZKSplitLog.getEncodedNodeName(zkw, "tmt_task");
297    RegionServerServices mockedRS = getRegionServer(SRV);
298    SplitLogWorker slw =
299        new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask);
300    slw.start();
301    try {
302      Thread.yield(); // let the worker start
303      Thread.sleep(100);
304      waitForCounter(SplitLogCounters.tot_wkr_task_grabing, 0, 1, WAIT_TIME);
305
306      SplitLogTask unassignedManager =
307        new SplitLogTask.Unassigned(MANAGER);
308      zkw.getRecoverableZooKeeper().create(PATH1, unassignedManager.toByteArray(),
309        Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
310
311      waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME);
312      // now the worker is busy doing the above task
313
314      // create another task
315      final String PATH2 = ZKSplitLog.getEncodedNodeName(zkw, "tmt_task_2");
316      zkw.getRecoverableZooKeeper().create(PATH2, unassignedManager.toByteArray(),
317        Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
318
319      // preempt the first task, have it owned by another worker
320      final ServerName anotherWorker = ServerName.valueOf("another-worker,1,1");
321      SplitLogTask slt = new SplitLogTask.Owned(anotherWorker);
322      ZKUtil.setData(zkw, PATH1, slt.toByteArray());
323      waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0, 1, WAIT_TIME);
324
325      waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 1, 2, WAIT_TIME);
326      assertEquals(2, slw.getTaskReadySeq());
327      byte [] bytes = ZKUtil.getData(zkw, PATH2);
328      slt = SplitLogTask.parseFrom(bytes);
329      assertTrue(slt.isOwned(SRV));
330    } finally {
331      stopSplitLogWorker(slw);
332    }
333  }
334
335  @Test
336  public void testRescan() throws Exception {
337    LOG.info("testRescan");
338    SplitLogCounters.resetCounters();
339    final ServerName SRV = ServerName.valueOf("svr,1,1");
340    RegionServerServices mockedRS = getRegionServer(SRV);
341    slw = new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask);
342    slw.start();
343    Thread.yield(); // let the worker start
344    Thread.sleep(100);
345
346    String task = ZKSplitLog.getEncodedNodeName(zkw, "task");
347    SplitLogTask slt = new SplitLogTask.Unassigned(MANAGER);
348    zkw.getRecoverableZooKeeper().create(task,slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
349      CreateMode.PERSISTENT);
350
351    waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME);
352    // now the worker is busy doing the above task
353
354    // preempt the task, have it owned by another worker
355    ZKUtil.setData(zkw, task, slt.toByteArray());
356    waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0, 1, WAIT_TIME);
357
358    // create a RESCAN node
359    String rescan = ZKSplitLog.getEncodedNodeName(zkw, "RESCAN");
360    rescan = zkw.getRecoverableZooKeeper().create(rescan, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
361      CreateMode.PERSISTENT_SEQUENTIAL);
362
363    waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 1, 2, WAIT_TIME);
364    // RESCAN node might not have been processed if the worker became busy
365    // with the above task. preempt the task again so that now the RESCAN
366    // node is processed
367    ZKUtil.setData(zkw, task, slt.toByteArray());
368    waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 1, 2, WAIT_TIME);
369    waitForCounter(SplitLogCounters.tot_wkr_task_acquired_rescan, 0, 1, WAIT_TIME);
370
371    List<String> nodes = ZKUtil.listChildrenNoWatch(zkw, zkw.getZNodePaths().splitLogZNode);
372    LOG.debug(Objects.toString(nodes));
373    int num = 0;
374    for (String node : nodes) {
375      num++;
376      if (node.startsWith("RESCAN")) {
377        String name = ZKSplitLog.getEncodedNodeName(zkw, node);
378        String fn = ZKSplitLog.getFileName(name);
379        byte [] data = ZKUtil.getData(zkw,
380                ZNodePaths.joinZNode(zkw.getZNodePaths().splitLogZNode, fn));
381        slt = SplitLogTask.parseFrom(data);
382        assertTrue(slt.toString(), slt.isDone(SRV));
383      }
384    }
385    assertEquals(2, num);
386  }
387
388  @Test
389  public void testAcquireMultiTasks() throws Exception {
390    LOG.info("testAcquireMultiTasks");
391    SplitLogCounters.resetCounters();
392    final String TATAS = "tatas";
393    final ServerName RS = ServerName.valueOf("rs,1,1");
394    final int maxTasks = 3;
395    Configuration testConf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
396    testConf.setInt(HBASE_SPLIT_WAL_MAX_SPLITTER, maxTasks);
397    RegionServerServices mockedRS = getRegionServer(RS);
398    for (int i = 0; i < maxTasks; i++) {
399      zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS + i),
400        new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1")).toByteArray(),
401          Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
402    }
403
404    SplitLogWorker slw = new SplitLogWorker(ds, testConf, mockedRS, neverEndingTask);
405    slw.start();
406    try {
407      waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, maxTasks, WAIT_TIME);
408      for (int i = 0; i < maxTasks; i++) {
409        byte[] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TATAS + i));
410        SplitLogTask slt = SplitLogTask.parseFrom(bytes);
411        assertTrue(slt.isOwned(RS));
412      }
413    } finally {
414      stopSplitLogWorker(slw);
415    }
416  }
417
418  /**
419   * Create a mocked region server service instance
420   */
421  private RegionServerServices getRegionServer(ServerName name) {
422
423    RegionServerServices mockedServer = mock(RegionServerServices.class);
424    when(mockedServer.getConfiguration()).thenReturn(TEST_UTIL.getConfiguration());
425    when(mockedServer.getServerName()).thenReturn(name);
426    when(mockedServer.getZooKeeper()).thenReturn(zkw);
427    when(mockedServer.isStopped()).thenReturn(false);
428    when(mockedServer.getExecutorService()).thenReturn(executorService);
429
430    return mockedServer;
431  }
432
433}