001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER;
021import static org.hamcrest.CoreMatchers.is;
022import static org.hamcrest.CoreMatchers.not;
023import static org.hamcrest.MatcherAssert.assertThat;
024import static org.junit.Assert.assertEquals;
025import static org.junit.Assert.assertTrue;
026import static org.mockito.Mockito.mock;
027import static org.mockito.Mockito.when;
028
029import java.util.List;
030import java.util.Objects;
031import java.util.concurrent.atomic.LongAdder;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.hbase.CoordinatedStateManager;
034import org.apache.hadoop.hbase.HBaseClassTestRule;
035import org.apache.hadoop.hbase.HBaseConfiguration;
036import org.apache.hadoop.hbase.HBaseTestingUtil;
037import org.apache.hadoop.hbase.ServerName;
038import org.apache.hadoop.hbase.SplitLogCounters;
039import org.apache.hadoop.hbase.SplitLogTask;
040import org.apache.hadoop.hbase.Waiter;
041import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager;
042import org.apache.hadoop.hbase.executor.ExecutorService;
043import org.apache.hadoop.hbase.executor.ExecutorType;
044import org.apache.hadoop.hbase.testclassification.MediumTests;
045import org.apache.hadoop.hbase.testclassification.RegionServerTests;
046import org.apache.hadoop.hbase.util.CancelableProgressable;
047import org.apache.hadoop.hbase.util.MockServer;
048import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
049import org.apache.hadoop.hbase.zookeeper.ZKUtil;
050import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
051import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
052import org.apache.zookeeper.CreateMode;
053import org.apache.zookeeper.ZooDefs.Ids;
054import org.junit.After;
055import org.junit.Before;
056import org.junit.ClassRule;
057import org.junit.Test;
058import org.junit.experimental.categories.Category;
059import org.slf4j.Logger;
060import org.slf4j.LoggerFactory;
061
062@Category({ RegionServerTests.class, MediumTests.class })
063public class TestSplitLogWorker {
064
065  @ClassRule
066  public static final HBaseClassTestRule CLASS_RULE =
067    HBaseClassTestRule.forClass(TestSplitLogWorker.class);
068
069  private static final Logger LOG = LoggerFactory.getLogger(TestSplitLogWorker.class);
070  private static final int WAIT_TIME = 15000;
071  private final ServerName MANAGER = ServerName.valueOf("manager,1,1");
072  private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
073  private DummyServer ds;
074  private ZKWatcher zkw;
075  private SplitLogWorker slw;
076  private ExecutorService executorService;
077
078  static class DummyServer extends MockServer {
079    private ZKWatcher zkw;
080    private Configuration conf;
081    private CoordinatedStateManager cm;
082
083    public DummyServer(ZKWatcher zkw, Configuration conf) {
084      this.zkw = zkw;
085      this.conf = conf;
086      cm = new ZkCoordinatedStateManager(this);
087    }
088
089    @Override
090    public Configuration getConfiguration() {
091      return conf;
092    }
093
094    @Override
095    public ZKWatcher getZooKeeper() {
096      return zkw;
097    }
098
099    @Override
100    public CoordinatedStateManager getCoordinatedStateManager() {
101      return cm;
102    }
103  }
104
105  private void waitForCounter(LongAdder ctr, long oldval, long newval, long timems)
106    throws Exception {
107    assertTrue("ctr=" + ctr.sum() + ", oldval=" + oldval + ", newval=" + newval,
108      waitForCounterBoolean(ctr, oldval, newval, timems));
109  }
110
111  private boolean waitForCounterBoolean(final LongAdder ctr, final long oldval, long newval,
112    long timems) throws Exception {
113
114    return waitForCounterBoolean(ctr, oldval, newval, timems, true);
115  }
116
117  private boolean waitForCounterBoolean(final LongAdder ctr, final long oldval, final long newval,
118    long timems, boolean failIfTimeout) throws Exception {
119
120    long timeWaited =
121      TEST_UTIL.waitFor(timems, 10, failIfTimeout, new Waiter.Predicate<Exception>() {
122        @Override
123        public boolean evaluate() throws Exception {
124          return (ctr.sum() >= newval);
125        }
126      });
127
128    if (timeWaited > 0) {
129      // when not timed out
130      assertEquals(newval, ctr.sum());
131    }
132    return true;
133  }
134
135  @Before
136  public void setup() throws Exception {
137    TEST_UTIL.startMiniZKCluster();
138    Configuration conf = TEST_UTIL.getConfiguration();
139    zkw = new ZKWatcher(TEST_UTIL.getConfiguration(), "split-log-worker-tests", null);
140    ds = new DummyServer(zkw, conf);
141    ZKUtil.deleteChildrenRecursively(zkw, zkw.getZNodePaths().baseZNode);
142    ZKUtil.createAndFailSilent(zkw, zkw.getZNodePaths().baseZNode);
143    assertThat(ZKUtil.checkExists(zkw, zkw.getZNodePaths().baseZNode), not(is(-1)));
144    LOG.debug(zkw.getZNodePaths().baseZNode + " created");
145    ZKUtil.createAndFailSilent(zkw, zkw.getZNodePaths().splitLogZNode);
146    assertThat(ZKUtil.checkExists(zkw, zkw.getZNodePaths().splitLogZNode), not(is(-1)));
147
148    LOG.debug(zkw.getZNodePaths().splitLogZNode + " created");
149    ZKUtil.createAndFailSilent(zkw, zkw.getZNodePaths().rsZNode);
150    assertThat(ZKUtil.checkExists(zkw, zkw.getZNodePaths().rsZNode), not(is(-1)));
151
152    SplitLogCounters.resetCounters();
153    executorService = new ExecutorService("TestSplitLogWorker");
154    executorService.startExecutorService(executorService.new ExecutorConfig()
155      .setExecutorType(ExecutorType.RS_LOG_REPLAY_OPS).setCorePoolSize(10));
156  }
157
158  @After
159  public void teardown() throws Exception {
160    if (executorService != null) {
161      executorService.shutdown();
162    }
163    TEST_UTIL.shutdownMiniZKCluster();
164  }
165
166  SplitLogWorker.TaskExecutor neverEndingTask = new SplitLogWorker.TaskExecutor() {
167
168    @Override
169    public Status exec(String name, CancelableProgressable p) {
170      while (true) {
171        try {
172          Thread.sleep(1000);
173        } catch (InterruptedException e) {
174          return Status.PREEMPTED;
175        }
176        if (!p.progress()) {
177          return Status.PREEMPTED;
178        }
179      }
180    }
181
182  };
183
184  @Test
185  public void testAcquireTaskAtStartup() throws Exception {
186    LOG.info("testAcquireTaskAtStartup");
187    SplitLogCounters.resetCounters();
188    final String TATAS = "tatas";
189    final ServerName RS = ServerName.valueOf("rs,1,1");
190    RegionServerServices mockedRS = getRegionServer(RS);
191    zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS),
192      new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1")).toByteArray(), Ids.OPEN_ACL_UNSAFE,
193      CreateMode.PERSISTENT);
194
195    SplitLogWorker slw =
196      new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask);
197    slw.start();
198    try {
199      waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME);
200      byte[] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TATAS));
201      SplitLogTask slt = SplitLogTask.parseFrom(bytes);
202      assertTrue(slt.isOwned(RS));
203    } finally {
204      stopSplitLogWorker(slw);
205    }
206  }
207
208  private void stopSplitLogWorker(final SplitLogWorker slw) throws InterruptedException {
209    if (slw != null) {
210      slw.stop();
211      slw.worker.join(WAIT_TIME);
212      if (slw.worker.isAlive()) {
213        assertTrue(("Could not stop the worker thread slw=" + slw) == null);
214      }
215    }
216  }
217
218  @Test
219  public void testRaceForTask() throws Exception {
220    LOG.info("testRaceForTask");
221    SplitLogCounters.resetCounters();
222    final String TRFT = "trft";
223    final ServerName SVR1 = ServerName.valueOf("svr1,1,1");
224    final ServerName SVR2 = ServerName.valueOf("svr2,1,1");
225    zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TRFT),
226      new SplitLogTask.Unassigned(MANAGER).toByteArray(), Ids.OPEN_ACL_UNSAFE,
227      CreateMode.PERSISTENT);
228    RegionServerServices mockedRS1 = getRegionServer(SVR1);
229    RegionServerServices mockedRS2 = getRegionServer(SVR2);
230    SplitLogWorker slw1 =
231      new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS1, neverEndingTask);
232    SplitLogWorker slw2 =
233      new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS2, neverEndingTask);
234    slw1.start();
235    slw2.start();
236    try {
237      waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME);
238      // Assert that either the tot_wkr_failed_to_grab_task_owned count was set of if
239      // not it, that we fell through to the next counter in line and it was set.
240      assertTrue(waitForCounterBoolean(SplitLogCounters.tot_wkr_failed_to_grab_task_owned, 0, 1,
241        WAIT_TIME, false) || SplitLogCounters.tot_wkr_failed_to_grab_task_lost_race.sum() == 1);
242      byte[] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TRFT));
243      SplitLogTask slt = SplitLogTask.parseFrom(bytes);
244      assertTrue(slt.isOwned(SVR1) || slt.isOwned(SVR2));
245    } finally {
246      stopSplitLogWorker(slw1);
247      stopSplitLogWorker(slw2);
248    }
249  }
250
251  @Test
252  public void testPreemptTask() throws Exception {
253    LOG.info("testPreemptTask");
254    SplitLogCounters.resetCounters();
255    final ServerName SRV = ServerName.valueOf("tpt_svr,1,1");
256    final String PATH = ZKSplitLog.getEncodedNodeName(zkw, "tpt_task");
257    RegionServerServices mockedRS = getRegionServer(SRV);
258    SplitLogWorker slw =
259      new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask);
260    slw.start();
261    try {
262      Thread.yield(); // let the worker start
263      Thread.sleep(1000);
264      waitForCounter(SplitLogCounters.tot_wkr_task_grabing, 0, 1, WAIT_TIME);
265
266      // this time create a task node after starting the splitLogWorker
267      zkw.getRecoverableZooKeeper().create(PATH, new SplitLogTask.Unassigned(MANAGER).toByteArray(),
268        Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
269
270      waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME);
271      assertEquals(1, slw.getTaskReadySeq());
272      byte[] bytes = ZKUtil.getData(zkw, PATH);
273      SplitLogTask slt = SplitLogTask.parseFrom(bytes);
274      assertTrue(slt.isOwned(SRV));
275      slt = new SplitLogTask.Owned(MANAGER);
276      ZKUtil.setData(zkw, PATH, slt.toByteArray());
277      waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0, 1, WAIT_TIME);
278    } finally {
279      stopSplitLogWorker(slw);
280    }
281  }
282
283  @Test
284  public void testMultipleTasks() throws Exception {
285    LOG.info("testMultipleTasks");
286    SplitLogCounters.resetCounters();
287    final ServerName SRV = ServerName.valueOf("tmt_svr,1,1");
288    final String PATH1 = ZKSplitLog.getEncodedNodeName(zkw, "tmt_task");
289    RegionServerServices mockedRS = getRegionServer(SRV);
290    SplitLogWorker slw =
291      new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask);
292    slw.start();
293    try {
294      Thread.yield(); // let the worker start
295      Thread.sleep(100);
296      waitForCounter(SplitLogCounters.tot_wkr_task_grabing, 0, 1, WAIT_TIME);
297
298      SplitLogTask unassignedManager = new SplitLogTask.Unassigned(MANAGER);
299      zkw.getRecoverableZooKeeper().create(PATH1, unassignedManager.toByteArray(),
300        Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
301
302      waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME);
303      // now the worker is busy doing the above task
304
305      // create another task
306      final String PATH2 = ZKSplitLog.getEncodedNodeName(zkw, "tmt_task_2");
307      zkw.getRecoverableZooKeeper().create(PATH2, unassignedManager.toByteArray(),
308        Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
309
310      // preempt the first task, have it owned by another worker
311      final ServerName anotherWorker = ServerName.valueOf("another-worker,1,1");
312      SplitLogTask slt = new SplitLogTask.Owned(anotherWorker);
313      ZKUtil.setData(zkw, PATH1, slt.toByteArray());
314      waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0, 1, WAIT_TIME);
315
316      waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 1, 2, WAIT_TIME);
317      assertEquals(2, slw.getTaskReadySeq());
318      byte[] bytes = ZKUtil.getData(zkw, PATH2);
319      slt = SplitLogTask.parseFrom(bytes);
320      assertTrue(slt.isOwned(SRV));
321    } finally {
322      stopSplitLogWorker(slw);
323    }
324  }
325
326  @Test
327  public void testRescan() throws Exception {
328    LOG.info("testRescan");
329    SplitLogCounters.resetCounters();
330    final ServerName SRV = ServerName.valueOf("svr,1,1");
331    RegionServerServices mockedRS = getRegionServer(SRV);
332    slw = new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask);
333    slw.start();
334    Thread.yield(); // let the worker start
335    Thread.sleep(100);
336
337    String task = ZKSplitLog.getEncodedNodeName(zkw, "task");
338    SplitLogTask slt = new SplitLogTask.Unassigned(MANAGER);
339    zkw.getRecoverableZooKeeper().create(task, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
340      CreateMode.PERSISTENT);
341
342    waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME);
343    // now the worker is busy doing the above task
344
345    // preempt the task, have it owned by another worker
346    ZKUtil.setData(zkw, task, slt.toByteArray());
347    waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0, 1, WAIT_TIME);
348
349    // create a RESCAN node
350    String rescan = ZKSplitLog.getEncodedNodeName(zkw, "RESCAN");
351    rescan = zkw.getRecoverableZooKeeper().create(rescan, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
352      CreateMode.PERSISTENT_SEQUENTIAL);
353
354    waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 1, 2, WAIT_TIME);
355    // RESCAN node might not have been processed if the worker became busy
356    // with the above task. preempt the task again so that now the RESCAN
357    // node is processed
358    ZKUtil.setData(zkw, task, slt.toByteArray());
359    waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 1, 2, WAIT_TIME);
360    waitForCounter(SplitLogCounters.tot_wkr_task_acquired_rescan, 0, 1, WAIT_TIME);
361
362    List<String> nodes = ZKUtil.listChildrenNoWatch(zkw, zkw.getZNodePaths().splitLogZNode);
363    LOG.debug(Objects.toString(nodes));
364    int num = 0;
365    for (String node : nodes) {
366      num++;
367      if (node.startsWith("RESCAN")) {
368        String name = ZKSplitLog.getEncodedNodeName(zkw, node);
369        String fn = ZKSplitLog.getFileName(name);
370        byte[] data =
371          ZKUtil.getData(zkw, ZNodePaths.joinZNode(zkw.getZNodePaths().splitLogZNode, fn));
372        slt = SplitLogTask.parseFrom(data);
373        assertTrue(slt.toString(), slt.isDone(SRV));
374      }
375    }
376    assertEquals(2, num);
377  }
378
379  @Test
380  public void testAcquireMultiTasks() throws Exception {
381    LOG.info("testAcquireMultiTasks");
382    SplitLogCounters.resetCounters();
383    final String TATAS = "tatas";
384    final ServerName RS = ServerName.valueOf("rs,1,1");
385    final int maxTasks = 3;
386    Configuration testConf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
387    testConf.setInt(HBASE_SPLIT_WAL_MAX_SPLITTER, maxTasks);
388    RegionServerServices mockedRS = getRegionServer(RS);
389    for (int i = 0; i < maxTasks; i++) {
390      zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS + i),
391        new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1")).toByteArray(),
392        Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
393    }
394
395    SplitLogWorker slw = new SplitLogWorker(ds, testConf, mockedRS, neverEndingTask);
396    slw.start();
397    try {
398      waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, maxTasks, WAIT_TIME);
399      for (int i = 0; i < maxTasks; i++) {
400        byte[] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TATAS + i));
401        SplitLogTask slt = SplitLogTask.parseFrom(bytes);
402        assertTrue(slt.isOwned(RS));
403      }
404    } finally {
405      stopSplitLogWorker(slw);
406    }
407  }
408
409  /**
410   * Create a mocked region server service instance
411   */
412  private RegionServerServices getRegionServer(ServerName name) {
413
414    RegionServerServices mockedServer = mock(RegionServerServices.class);
415    when(mockedServer.getConfiguration()).thenReturn(TEST_UTIL.getConfiguration());
416    when(mockedServer.getServerName()).thenReturn(name);
417    when(mockedServer.getZooKeeper()).thenReturn(zkw);
418    when(mockedServer.isStopped()).thenReturn(false);
419    when(mockedServer.getExecutorService()).thenReturn(executorService);
420
421    return mockedServer;
422  }
423
424}