001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER;
021import static org.hamcrest.CoreMatchers.is;
022import static org.hamcrest.CoreMatchers.not;
023import static org.hamcrest.MatcherAssert.assertThat;
024import static org.junit.jupiter.api.Assertions.assertEquals;
025import static org.junit.jupiter.api.Assertions.assertTrue;
026import static org.mockito.Mockito.mock;
027import static org.mockito.Mockito.when;
028
029import java.util.List;
030import java.util.Objects;
031import java.util.concurrent.atomic.LongAdder;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.hbase.CoordinatedStateManager;
034import org.apache.hadoop.hbase.HBaseConfiguration;
035import org.apache.hadoop.hbase.HBaseTestingUtil;
036import org.apache.hadoop.hbase.ServerName;
037import org.apache.hadoop.hbase.SplitLogCounters;
038import org.apache.hadoop.hbase.SplitLogTask;
039import org.apache.hadoop.hbase.Waiter;
040import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager;
041import org.apache.hadoop.hbase.executor.ExecutorService;
042import org.apache.hadoop.hbase.executor.ExecutorType;
043import org.apache.hadoop.hbase.testclassification.MediumTests;
044import org.apache.hadoop.hbase.testclassification.RegionServerTests;
045import org.apache.hadoop.hbase.util.CancelableProgressable;
046import org.apache.hadoop.hbase.util.MockServer;
047import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
048import org.apache.hadoop.hbase.zookeeper.ZKUtil;
049import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
050import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
051import org.apache.zookeeper.CreateMode;
052import org.apache.zookeeper.ZooDefs.Ids;
053import org.junit.jupiter.api.AfterEach;
054import org.junit.jupiter.api.BeforeEach;
055import org.junit.jupiter.api.Tag;
056import org.junit.jupiter.api.Test;
057import org.slf4j.Logger;
058import org.slf4j.LoggerFactory;
059
060@Tag(RegionServerTests.TAG)
061@Tag(MediumTests.TAG)
062public class TestSplitLogWorker {
063
064  private static final Logger LOG = LoggerFactory.getLogger(TestSplitLogWorker.class);
065  private static final int WAIT_TIME = 15000;
066  private final ServerName MANAGER = ServerName.valueOf("manager,1,1");
067  private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
068  private DummyServer ds;
069  private ZKWatcher zkw;
070  private SplitLogWorker slw;
071  private ExecutorService executorService;
072
073  static class DummyServer extends MockServer {
074    private ZKWatcher zkw;
075    private Configuration conf;
076    private CoordinatedStateManager cm;
077
078    public DummyServer(ZKWatcher zkw, Configuration conf) {
079      this.zkw = zkw;
080      this.conf = conf;
081      cm = new ZkCoordinatedStateManager(this);
082    }
083
084    @Override
085    public Configuration getConfiguration() {
086      return conf;
087    }
088
089    @Override
090    public ZKWatcher getZooKeeper() {
091      return zkw;
092    }
093
094    @Override
095    public CoordinatedStateManager getCoordinatedStateManager() {
096      return cm;
097    }
098  }
099
100  private void waitForCounter(LongAdder ctr, long oldval, long newval, long timems)
101    throws Exception {
102    assertTrue(waitForCounterBoolean(ctr, oldval, newval, timems),
103      "ctr=" + ctr.sum() + ", oldval=" + oldval + ", newval=" + newval);
104  }
105
106  private boolean waitForCounterBoolean(final LongAdder ctr, final long oldval, long newval,
107    long timems) throws Exception {
108
109    return waitForCounterBoolean(ctr, oldval, newval, timems, true);
110  }
111
112  private boolean waitForCounterBoolean(final LongAdder ctr, final long oldval, final long newval,
113    long timems, boolean failIfTimeout) throws Exception {
114
115    long timeWaited =
116      TEST_UTIL.waitFor(timems, 10, failIfTimeout, new Waiter.Predicate<Exception>() {
117        @Override
118        public boolean evaluate() throws Exception {
119          return (ctr.sum() >= newval);
120        }
121      });
122
123    if (timeWaited > 0) {
124      // when not timed out
125      assertEquals(newval, ctr.sum());
126    }
127    return true;
128  }
129
130  @BeforeEach
131  public void setup() throws Exception {
132    TEST_UTIL.startMiniZKCluster();
133    Configuration conf = TEST_UTIL.getConfiguration();
134    zkw = new ZKWatcher(TEST_UTIL.getConfiguration(), "split-log-worker-tests", null);
135    ds = new DummyServer(zkw, conf);
136    ZKUtil.deleteChildrenRecursively(zkw, zkw.getZNodePaths().baseZNode);
137    ZKUtil.createAndFailSilent(zkw, zkw.getZNodePaths().baseZNode);
138    assertThat(ZKUtil.checkExists(zkw, zkw.getZNodePaths().baseZNode), not(is(-1)));
139    LOG.debug(zkw.getZNodePaths().baseZNode + " created");
140    ZKUtil.createAndFailSilent(zkw, zkw.getZNodePaths().splitLogZNode);
141    assertThat(ZKUtil.checkExists(zkw, zkw.getZNodePaths().splitLogZNode), not(is(-1)));
142
143    LOG.debug(zkw.getZNodePaths().splitLogZNode + " created");
144    ZKUtil.createAndFailSilent(zkw, zkw.getZNodePaths().rsZNode);
145    assertThat(ZKUtil.checkExists(zkw, zkw.getZNodePaths().rsZNode), not(is(-1)));
146
147    SplitLogCounters.resetCounters();
148    executorService = new ExecutorService("TestSplitLogWorker");
149    executorService.startExecutorService(executorService.new ExecutorConfig()
150      .setExecutorType(ExecutorType.RS_LOG_REPLAY_OPS).setCorePoolSize(10));
151  }
152
153  @AfterEach
154  public void teardown() throws Exception {
155    if (executorService != null) {
156      executorService.shutdown();
157    }
158    TEST_UTIL.shutdownMiniZKCluster();
159  }
160
161  SplitLogWorker.TaskExecutor neverEndingTask = new SplitLogWorker.TaskExecutor() {
162
163    @Override
164    public Status exec(String name, CancelableProgressable p) {
165      while (true) {
166        try {
167          Thread.sleep(1000);
168        } catch (InterruptedException e) {
169          return Status.PREEMPTED;
170        }
171        if (!p.progress()) {
172          return Status.PREEMPTED;
173        }
174      }
175    }
176
177  };
178
179  @Test
180  public void testAcquireTaskAtStartup() throws Exception {
181    LOG.info("testAcquireTaskAtStartup");
182    SplitLogCounters.resetCounters();
183    final String TATAS = "tatas";
184    final ServerName RS = ServerName.valueOf("rs,1,1");
185    RegionServerServices mockedRS = getRegionServer(RS);
186    zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS),
187      new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1")).toByteArray(), Ids.OPEN_ACL_UNSAFE,
188      CreateMode.PERSISTENT);
189
190    SplitLogWorker slw =
191      new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask);
192    slw.start();
193    try {
194      waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME);
195      byte[] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TATAS));
196      SplitLogTask slt = SplitLogTask.parseFrom(bytes);
197      assertTrue(slt.isOwned(RS));
198    } finally {
199      stopSplitLogWorker(slw);
200    }
201  }
202
203  private void stopSplitLogWorker(final SplitLogWorker slw) throws InterruptedException {
204    if (slw != null) {
205      slw.stop();
206      slw.worker.join(WAIT_TIME);
207      if (slw.worker.isAlive()) {
208        assertTrue(("Could not stop the worker thread slw=" + slw) == null);
209      }
210    }
211  }
212
213  @Test
214  public void testRaceForTask() throws Exception {
215    LOG.info("testRaceForTask");
216    SplitLogCounters.resetCounters();
217    final String TRFT = "trft";
218    final ServerName SVR1 = ServerName.valueOf("svr1,1,1");
219    final ServerName SVR2 = ServerName.valueOf("svr2,1,1");
220    zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TRFT),
221      new SplitLogTask.Unassigned(MANAGER).toByteArray(), Ids.OPEN_ACL_UNSAFE,
222      CreateMode.PERSISTENT);
223    RegionServerServices mockedRS1 = getRegionServer(SVR1);
224    RegionServerServices mockedRS2 = getRegionServer(SVR2);
225    SplitLogWorker slw1 =
226      new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS1, neverEndingTask);
227    SplitLogWorker slw2 =
228      new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS2, neverEndingTask);
229    slw1.start();
230    slw2.start();
231    try {
232      waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME);
233      // Assert that either the tot_wkr_failed_to_grab_task_owned count was set of if
234      // not it, that we fell through to the next counter in line and it was set.
235      assertTrue(waitForCounterBoolean(SplitLogCounters.tot_wkr_failed_to_grab_task_owned, 0, 1,
236        WAIT_TIME, false) || SplitLogCounters.tot_wkr_failed_to_grab_task_lost_race.sum() == 1);
237      byte[] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TRFT));
238      SplitLogTask slt = SplitLogTask.parseFrom(bytes);
239      assertTrue(slt.isOwned(SVR1) || slt.isOwned(SVR2));
240    } finally {
241      stopSplitLogWorker(slw1);
242      stopSplitLogWorker(slw2);
243    }
244  }
245
246  @Test
247  public void testPreemptTask() throws Exception {
248    LOG.info("testPreemptTask");
249    SplitLogCounters.resetCounters();
250    final ServerName SRV = ServerName.valueOf("tpt_svr,1,1");
251    final String PATH = ZKSplitLog.getEncodedNodeName(zkw, "tpt_task");
252    RegionServerServices mockedRS = getRegionServer(SRV);
253    SplitLogWorker slw =
254      new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask);
255    slw.start();
256    try {
257      Thread.yield(); // let the worker start
258      Thread.sleep(1000);
259      waitForCounter(SplitLogCounters.tot_wkr_task_grabing, 0, 1, WAIT_TIME);
260
261      // this time create a task node after starting the splitLogWorker
262      zkw.getRecoverableZooKeeper().create(PATH, new SplitLogTask.Unassigned(MANAGER).toByteArray(),
263        Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
264
265      waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME);
266      assertEquals(1, slw.getTaskReadySeq());
267      byte[] bytes = ZKUtil.getData(zkw, PATH);
268      SplitLogTask slt = SplitLogTask.parseFrom(bytes);
269      assertTrue(slt.isOwned(SRV));
270      slt = new SplitLogTask.Owned(MANAGER);
271      ZKUtil.setData(zkw, PATH, slt.toByteArray());
272      waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0, 1, WAIT_TIME);
273    } finally {
274      stopSplitLogWorker(slw);
275    }
276  }
277
278  @Test
279  public void testMultipleTasks() throws Exception {
280    LOG.info("testMultipleTasks");
281    SplitLogCounters.resetCounters();
282    final ServerName SRV = ServerName.valueOf("tmt_svr,1,1");
283    final String PATH1 = ZKSplitLog.getEncodedNodeName(zkw, "tmt_task");
284    RegionServerServices mockedRS = getRegionServer(SRV);
285    SplitLogWorker slw =
286      new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask);
287    slw.start();
288    try {
289      Thread.yield(); // let the worker start
290      Thread.sleep(100);
291      waitForCounter(SplitLogCounters.tot_wkr_task_grabing, 0, 1, WAIT_TIME);
292
293      SplitLogTask unassignedManager = new SplitLogTask.Unassigned(MANAGER);
294      zkw.getRecoverableZooKeeper().create(PATH1, unassignedManager.toByteArray(),
295        Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
296
297      waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME);
298      // now the worker is busy doing the above task
299
300      // create another task
301      final String PATH2 = ZKSplitLog.getEncodedNodeName(zkw, "tmt_task_2");
302      zkw.getRecoverableZooKeeper().create(PATH2, unassignedManager.toByteArray(),
303        Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
304
305      // preempt the first task, have it owned by another worker
306      final ServerName anotherWorker = ServerName.valueOf("another-worker,1,1");
307      SplitLogTask slt = new SplitLogTask.Owned(anotherWorker);
308      ZKUtil.setData(zkw, PATH1, slt.toByteArray());
309      waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0, 1, WAIT_TIME);
310
311      waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 1, 2, WAIT_TIME);
312      assertEquals(2, slw.getTaskReadySeq());
313      byte[] bytes = ZKUtil.getData(zkw, PATH2);
314      slt = SplitLogTask.parseFrom(bytes);
315      assertTrue(slt.isOwned(SRV));
316    } finally {
317      stopSplitLogWorker(slw);
318    }
319  }
320
321  @Test
322  public void testRescan() throws Exception {
323    LOG.info("testRescan");
324    SplitLogCounters.resetCounters();
325    final ServerName SRV = ServerName.valueOf("svr,1,1");
326    RegionServerServices mockedRS = getRegionServer(SRV);
327    slw = new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask);
328    slw.start();
329    Thread.yield(); // let the worker start
330    Thread.sleep(100);
331
332    String task = ZKSplitLog.getEncodedNodeName(zkw, "task");
333    SplitLogTask slt = new SplitLogTask.Unassigned(MANAGER);
334    zkw.getRecoverableZooKeeper().create(task, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
335      CreateMode.PERSISTENT);
336
337    waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME);
338    // now the worker is busy doing the above task
339
340    // preempt the task, have it owned by another worker
341    ZKUtil.setData(zkw, task, slt.toByteArray());
342    waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0, 1, WAIT_TIME);
343
344    // create a RESCAN node
345    String rescan = ZKSplitLog.getEncodedNodeName(zkw, "RESCAN");
346    rescan = zkw.getRecoverableZooKeeper().create(rescan, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
347      CreateMode.PERSISTENT_SEQUENTIAL);
348
349    waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 1, 2, WAIT_TIME);
350    // RESCAN node might not have been processed if the worker became busy
351    // with the above task. preempt the task again so that now the RESCAN
352    // node is processed
353    ZKUtil.setData(zkw, task, slt.toByteArray());
354    waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 1, 2, WAIT_TIME);
355    waitForCounter(SplitLogCounters.tot_wkr_task_acquired_rescan, 0, 1, WAIT_TIME);
356
357    List<String> nodes = ZKUtil.listChildrenNoWatch(zkw, zkw.getZNodePaths().splitLogZNode);
358    LOG.debug(Objects.toString(nodes));
359    int num = 0;
360    for (String node : nodes) {
361      num++;
362      if (node.startsWith("RESCAN")) {
363        String name = ZKSplitLog.getEncodedNodeName(zkw, node);
364        String fn = ZKSplitLog.getFileName(name);
365        byte[] data =
366          ZKUtil.getData(zkw, ZNodePaths.joinZNode(zkw.getZNodePaths().splitLogZNode, fn));
367        slt = SplitLogTask.parseFrom(data);
368        assertTrue(slt.isDone(SRV), slt.toString());
369      }
370    }
371    assertEquals(2, num);
372  }
373
374  @Test
375  public void testAcquireMultiTasks() throws Exception {
376    LOG.info("testAcquireMultiTasks");
377    SplitLogCounters.resetCounters();
378    final String TATAS = "tatas";
379    final ServerName RS = ServerName.valueOf("rs,1,1");
380    final int maxTasks = 3;
381    Configuration testConf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
382    testConf.setInt(HBASE_SPLIT_WAL_MAX_SPLITTER, maxTasks);
383    RegionServerServices mockedRS = getRegionServer(RS);
384    for (int i = 0; i < maxTasks; i++) {
385      zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS + i),
386        new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1")).toByteArray(),
387        Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
388    }
389
390    SplitLogWorker slw = new SplitLogWorker(ds, testConf, mockedRS, neverEndingTask);
391    slw.start();
392    try {
393      waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, maxTasks, WAIT_TIME);
394      for (int i = 0; i < maxTasks; i++) {
395        byte[] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TATAS + i));
396        SplitLogTask slt = SplitLogTask.parseFrom(bytes);
397        assertTrue(slt.isOwned(RS));
398      }
399    } finally {
400      stopSplitLogWorker(slw);
401    }
402  }
403
404  /**
405   * Create a mocked region server service instance
406   */
407  private RegionServerServices getRegionServer(ServerName name) {
408
409    RegionServerServices mockedServer = mock(RegionServerServices.class);
410    when(mockedServer.getConfiguration()).thenReturn(TEST_UTIL.getConfiguration());
411    when(mockedServer.getServerName()).thenReturn(name);
412    when(mockedServer.getZooKeeper()).thenReturn(zkw);
413    when(mockedServer.isStopped()).thenReturn(false);
414    when(mockedServer.getExecutorService()).thenReturn(executorService);
415
416    return mockedServer;
417  }
418
419}