001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER;
021import static org.hamcrest.CoreMatchers.is;
022import static org.hamcrest.CoreMatchers.not;
023import static org.hamcrest.MatcherAssert.assertThat;
024import static org.junit.Assert.assertEquals;
025import static org.junit.Assert.assertTrue;
026import static org.mockito.Mockito.mock;
027import static org.mockito.Mockito.when;
028
029import java.io.IOException;
030import java.util.List;
031import java.util.Objects;
032import java.util.concurrent.atomic.LongAdder;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.fs.FileSystem;
035import org.apache.hadoop.hbase.ChoreService;
036import org.apache.hadoop.hbase.CoordinatedStateManager;
037import org.apache.hadoop.hbase.HBaseClassTestRule;
038import org.apache.hadoop.hbase.HBaseConfiguration;
039import org.apache.hadoop.hbase.HBaseTestingUtility;
040import org.apache.hadoop.hbase.Server;
041import org.apache.hadoop.hbase.ServerName;
042import org.apache.hadoop.hbase.SplitLogCounters;
043import org.apache.hadoop.hbase.SplitLogTask;
044import org.apache.hadoop.hbase.Waiter;
045import org.apache.hadoop.hbase.client.ClusterConnection;
046import org.apache.hadoop.hbase.client.Connection;
047import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager;
048import org.apache.hadoop.hbase.executor.ExecutorService;
049import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorConfig;
050import org.apache.hadoop.hbase.executor.ExecutorType;
051import org.apache.hadoop.hbase.testclassification.MediumTests;
052import org.apache.hadoop.hbase.testclassification.RegionServerTests;
053import org.apache.hadoop.hbase.util.CancelableProgressable;
054import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
055import org.apache.hadoop.hbase.zookeeper.ZKUtil;
056import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
057import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
058import org.apache.zookeeper.CreateMode;
059import org.apache.zookeeper.ZooDefs.Ids;
060import org.junit.After;
061import org.junit.Before;
062import org.junit.ClassRule;
063import org.junit.Test;
064import org.junit.experimental.categories.Category;
065import org.slf4j.Logger;
066import org.slf4j.LoggerFactory;
067
068@Category({ RegionServerTests.class, MediumTests.class })
069public class TestSplitLogWorker {
070
071  @ClassRule
072  public static final HBaseClassTestRule CLASS_RULE =
073    HBaseClassTestRule.forClass(TestSplitLogWorker.class);
074
075  private static final Logger LOG = LoggerFactory.getLogger(TestSplitLogWorker.class);
076  private static final int WAIT_TIME = 15000;
077  private final ServerName MANAGER = ServerName.valueOf("manager,1,1");
078  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
079  private DummyServer ds;
080  private ZKWatcher zkw;
081  private SplitLogWorker slw;
082  private ExecutorService executorService;
083
084  static class DummyServer implements Server {
085    private ZKWatcher zkw;
086    private Configuration conf;
087    private CoordinatedStateManager cm;
088
089    public DummyServer(ZKWatcher zkw, Configuration conf) {
090      this.zkw = zkw;
091      this.conf = conf;
092      cm = new ZkCoordinatedStateManager(this);
093    }
094
095    @Override
096    public void abort(String why, Throwable e) {
097    }
098
099    @Override
100    public boolean isAborted() {
101      return false;
102    }
103
104    @Override
105    public void stop(String why) {
106    }
107
108    @Override
109    public boolean isStopped() {
110      return false;
111    }
112
113    @Override
114    public Configuration getConfiguration() {
115      return conf;
116    }
117
118    @Override
119    public ZKWatcher getZooKeeper() {
120      return zkw;
121    }
122
123    @Override
124    public ServerName getServerName() {
125      return null;
126    }
127
128    @Override
129    public CoordinatedStateManager getCoordinatedStateManager() {
130      return cm;
131    }
132
133    @Override
134    public ClusterConnection getConnection() {
135      return null;
136    }
137
138    @Override
139    public ChoreService getChoreService() {
140      return null;
141    }
142
143    @Override
144    public ClusterConnection getClusterConnection() {
145      // TODO Auto-generated method stub
146      return null;
147    }
148
149    @Override
150    public FileSystem getFileSystem() {
151      return null;
152    }
153
154    @Override
155    public boolean isStopping() {
156      return false;
157    }
158
159    @Override
160    public Connection createConnection(Configuration conf) throws IOException {
161      return null;
162    }
163  }
164
165  private void waitForCounter(LongAdder ctr, long oldval, long newval, long timems)
166    throws Exception {
167    assertTrue("ctr=" + ctr.sum() + ", oldval=" + oldval + ", newval=" + newval,
168      waitForCounterBoolean(ctr, oldval, newval, timems));
169  }
170
171  private boolean waitForCounterBoolean(final LongAdder ctr, final long oldval, long newval,
172    long timems) throws Exception {
173
174    return waitForCounterBoolean(ctr, oldval, newval, timems, true);
175  }
176
177  private boolean waitForCounterBoolean(final LongAdder ctr, final long oldval, final long newval,
178    long timems, boolean failIfTimeout) throws Exception {
179
180    long timeWaited =
181      TEST_UTIL.waitFor(timems, 10, failIfTimeout, new Waiter.Predicate<Exception>() {
182        @Override
183        public boolean evaluate() throws Exception {
184          return (ctr.sum() >= newval);
185        }
186      });
187
188    if (timeWaited > 0) {
189      // when not timed out
190      assertEquals(newval, ctr.sum());
191    }
192    return true;
193  }
194
195  @Before
196  public void setup() throws Exception {
197    TEST_UTIL.startMiniZKCluster();
198    Configuration conf = TEST_UTIL.getConfiguration();
199    zkw = new ZKWatcher(TEST_UTIL.getConfiguration(), "split-log-worker-tests", null);
200    ds = new DummyServer(zkw, conf);
201    ZKUtil.deleteChildrenRecursively(zkw, zkw.getZNodePaths().baseZNode);
202    ZKUtil.createAndFailSilent(zkw, zkw.getZNodePaths().baseZNode);
203    assertThat(ZKUtil.checkExists(zkw, zkw.getZNodePaths().baseZNode), not(is(-1)));
204    LOG.debug(zkw.getZNodePaths().baseZNode + " created");
205    ZKUtil.createAndFailSilent(zkw, zkw.getZNodePaths().splitLogZNode);
206    assertThat(ZKUtil.checkExists(zkw, zkw.getZNodePaths().splitLogZNode), not(is(-1)));
207
208    LOG.debug(zkw.getZNodePaths().splitLogZNode + " created");
209    ZKUtil.createAndFailSilent(zkw, zkw.getZNodePaths().rsZNode);
210    assertThat(ZKUtil.checkExists(zkw, zkw.getZNodePaths().rsZNode), not(is(-1)));
211
212    SplitLogCounters.resetCounters();
213    executorService = new ExecutorService("TestSplitLogWorker");
214    executorService.startExecutorService(executorService.new ExecutorConfig()
215      .setExecutorType(ExecutorType.RS_LOG_REPLAY_OPS).setCorePoolSize(10));
216  }
217
218  @After
219  public void teardown() throws Exception {
220    if (executorService != null) {
221      executorService.shutdown();
222    }
223    TEST_UTIL.shutdownMiniZKCluster();
224  }
225
226  SplitLogWorker.TaskExecutor neverEndingTask = new SplitLogWorker.TaskExecutor() {
227
228    @Override
229    public Status exec(String name, CancelableProgressable p) {
230      while (true) {
231        try {
232          Thread.sleep(1000);
233        } catch (InterruptedException e) {
234          return Status.PREEMPTED;
235        }
236        if (!p.progress()) {
237          return Status.PREEMPTED;
238        }
239      }
240    }
241
242  };
243
244  @Test
245  public void testAcquireTaskAtStartup() throws Exception {
246    LOG.info("testAcquireTaskAtStartup");
247    SplitLogCounters.resetCounters();
248    final String TATAS = "tatas";
249    final ServerName RS = ServerName.valueOf("rs,1,1");
250    RegionServerServices mockedRS = getRegionServer(RS);
251    zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS),
252      new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1")).toByteArray(), Ids.OPEN_ACL_UNSAFE,
253      CreateMode.PERSISTENT);
254
255    SplitLogWorker slw =
256      new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask);
257    slw.start();
258    try {
259      waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME);
260      byte[] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TATAS));
261      SplitLogTask slt = SplitLogTask.parseFrom(bytes);
262      assertTrue(slt.isOwned(RS));
263    } finally {
264      stopSplitLogWorker(slw);
265    }
266  }
267
268  private void stopSplitLogWorker(final SplitLogWorker slw) throws InterruptedException {
269    if (slw != null) {
270      slw.stop();
271      slw.worker.join(WAIT_TIME);
272      if (slw.worker.isAlive()) {
273        assertTrue(("Could not stop the worker thread slw=" + slw) == null);
274      }
275    }
276  }
277
278  @Test
279  public void testRaceForTask() throws Exception {
280    LOG.info("testRaceForTask");
281    SplitLogCounters.resetCounters();
282    final String TRFT = "trft";
283    final ServerName SVR1 = ServerName.valueOf("svr1,1,1");
284    final ServerName SVR2 = ServerName.valueOf("svr2,1,1");
285    zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TRFT),
286      new SplitLogTask.Unassigned(MANAGER).toByteArray(), Ids.OPEN_ACL_UNSAFE,
287      CreateMode.PERSISTENT);
288    RegionServerServices mockedRS1 = getRegionServer(SVR1);
289    RegionServerServices mockedRS2 = getRegionServer(SVR2);
290    SplitLogWorker slw1 =
291      new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS1, neverEndingTask);
292    SplitLogWorker slw2 =
293      new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS2, neverEndingTask);
294    slw1.start();
295    slw2.start();
296    try {
297      waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME);
298      // Assert that either the tot_wkr_failed_to_grab_task_owned count was set of if
299      // not it, that we fell through to the next counter in line and it was set.
300      assertTrue(waitForCounterBoolean(SplitLogCounters.tot_wkr_failed_to_grab_task_owned, 0, 1,
301        WAIT_TIME, false) || SplitLogCounters.tot_wkr_failed_to_grab_task_lost_race.sum() == 1);
302      byte[] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TRFT));
303      SplitLogTask slt = SplitLogTask.parseFrom(bytes);
304      assertTrue(slt.isOwned(SVR1) || slt.isOwned(SVR2));
305    } finally {
306      stopSplitLogWorker(slw1);
307      stopSplitLogWorker(slw2);
308    }
309  }
310
311  @Test
312  public void testPreemptTask() throws Exception {
313    LOG.info("testPreemptTask");
314    SplitLogCounters.resetCounters();
315    final ServerName SRV = ServerName.valueOf("tpt_svr,1,1");
316    final String PATH = ZKSplitLog.getEncodedNodeName(zkw, "tpt_task");
317    RegionServerServices mockedRS = getRegionServer(SRV);
318    SplitLogWorker slw =
319      new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask);
320    slw.start();
321    try {
322      Thread.yield(); // let the worker start
323      Thread.sleep(1000);
324      waitForCounter(SplitLogCounters.tot_wkr_task_grabing, 0, 1, WAIT_TIME);
325
326      // this time create a task node after starting the splitLogWorker
327      zkw.getRecoverableZooKeeper().create(PATH, new SplitLogTask.Unassigned(MANAGER).toByteArray(),
328        Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
329
330      waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME);
331      assertEquals(1, slw.getTaskReadySeq());
332      byte[] bytes = ZKUtil.getData(zkw, PATH);
333      SplitLogTask slt = SplitLogTask.parseFrom(bytes);
334      assertTrue(slt.isOwned(SRV));
335      slt = new SplitLogTask.Owned(MANAGER);
336      ZKUtil.setData(zkw, PATH, slt.toByteArray());
337      waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0, 1, WAIT_TIME);
338    } finally {
339      stopSplitLogWorker(slw);
340    }
341  }
342
343  @Test
344  public void testMultipleTasks() throws Exception {
345    LOG.info("testMultipleTasks");
346    SplitLogCounters.resetCounters();
347    final ServerName SRV = ServerName.valueOf("tmt_svr,1,1");
348    final String PATH1 = ZKSplitLog.getEncodedNodeName(zkw, "tmt_task");
349    RegionServerServices mockedRS = getRegionServer(SRV);
350    SplitLogWorker slw =
351      new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask);
352    slw.start();
353    try {
354      Thread.yield(); // let the worker start
355      Thread.sleep(100);
356      waitForCounter(SplitLogCounters.tot_wkr_task_grabing, 0, 1, WAIT_TIME);
357
358      SplitLogTask unassignedManager = new SplitLogTask.Unassigned(MANAGER);
359      zkw.getRecoverableZooKeeper().create(PATH1, unassignedManager.toByteArray(),
360        Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
361
362      waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME);
363      // now the worker is busy doing the above task
364
365      // create another task
366      final String PATH2 = ZKSplitLog.getEncodedNodeName(zkw, "tmt_task_2");
367      zkw.getRecoverableZooKeeper().create(PATH2, unassignedManager.toByteArray(),
368        Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
369
370      // preempt the first task, have it owned by another worker
371      final ServerName anotherWorker = ServerName.valueOf("another-worker,1,1");
372      SplitLogTask slt = new SplitLogTask.Owned(anotherWorker);
373      ZKUtil.setData(zkw, PATH1, slt.toByteArray());
374      waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0, 1, WAIT_TIME);
375
376      waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 1, 2, WAIT_TIME);
377      assertEquals(2, slw.getTaskReadySeq());
378      byte[] bytes = ZKUtil.getData(zkw, PATH2);
379      slt = SplitLogTask.parseFrom(bytes);
380      assertTrue(slt.isOwned(SRV));
381    } finally {
382      stopSplitLogWorker(slw);
383    }
384  }
385
386  @Test
387  public void testRescan() throws Exception {
388    LOG.info("testRescan");
389    SplitLogCounters.resetCounters();
390    final ServerName SRV = ServerName.valueOf("svr,1,1");
391    RegionServerServices mockedRS = getRegionServer(SRV);
392    slw = new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask);
393    slw.start();
394    Thread.yield(); // let the worker start
395    Thread.sleep(100);
396
397    String task = ZKSplitLog.getEncodedNodeName(zkw, "task");
398    SplitLogTask slt = new SplitLogTask.Unassigned(MANAGER);
399    zkw.getRecoverableZooKeeper().create(task, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
400      CreateMode.PERSISTENT);
401
402    waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME);
403    // now the worker is busy doing the above task
404
405    // preempt the task, have it owned by another worker
406    ZKUtil.setData(zkw, task, slt.toByteArray());
407    waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0, 1, WAIT_TIME);
408
409    // create a RESCAN node
410    String rescan = ZKSplitLog.getEncodedNodeName(zkw, "RESCAN");
411    rescan = zkw.getRecoverableZooKeeper().create(rescan, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
412      CreateMode.PERSISTENT_SEQUENTIAL);
413
414    waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 1, 2, WAIT_TIME);
415    // RESCAN node might not have been processed if the worker became busy
416    // with the above task. preempt the task again so that now the RESCAN
417    // node is processed
418    ZKUtil.setData(zkw, task, slt.toByteArray());
419    waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 1, 2, WAIT_TIME);
420    waitForCounter(SplitLogCounters.tot_wkr_task_acquired_rescan, 0, 1, WAIT_TIME);
421
422    List<String> nodes = ZKUtil.listChildrenNoWatch(zkw, zkw.getZNodePaths().splitLogZNode);
423    LOG.debug(Objects.toString(nodes));
424    int num = 0;
425    for (String node : nodes) {
426      num++;
427      if (node.startsWith("RESCAN")) {
428        String name = ZKSplitLog.getEncodedNodeName(zkw, node);
429        String fn = ZKSplitLog.getFileName(name);
430        byte[] data =
431          ZKUtil.getData(zkw, ZNodePaths.joinZNode(zkw.getZNodePaths().splitLogZNode, fn));
432        slt = SplitLogTask.parseFrom(data);
433        assertTrue(slt.toString(), slt.isDone(SRV));
434      }
435    }
436    assertEquals(2, num);
437  }
438
439  @Test
440  public void testAcquireMultiTasks() throws Exception {
441    LOG.info("testAcquireMultiTasks");
442    SplitLogCounters.resetCounters();
443    final String TATAS = "tatas";
444    final ServerName RS = ServerName.valueOf("rs,1,1");
445    final int maxTasks = 3;
446    Configuration testConf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
447    testConf.setInt(HBASE_SPLIT_WAL_MAX_SPLITTER, maxTasks);
448    RegionServerServices mockedRS = getRegionServer(RS);
449    for (int i = 0; i < maxTasks; i++) {
450      zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS + i),
451        new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1")).toByteArray(),
452        Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
453    }
454
455    SplitLogWorker slw = new SplitLogWorker(ds, testConf, mockedRS, neverEndingTask);
456    slw.start();
457    try {
458      waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, maxTasks, WAIT_TIME);
459      for (int i = 0; i < maxTasks; i++) {
460        byte[] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TATAS + i));
461        SplitLogTask slt = SplitLogTask.parseFrom(bytes);
462        assertTrue(slt.isOwned(RS));
463      }
464    } finally {
465      stopSplitLogWorker(slw);
466    }
467  }
468
469  /**
470   * Create a mocked region server service instance
471   */
472  private RegionServerServices getRegionServer(ServerName name) {
473
474    RegionServerServices mockedServer = mock(RegionServerServices.class);
475    when(mockedServer.getConfiguration()).thenReturn(TEST_UTIL.getConfiguration());
476    when(mockedServer.getServerName()).thenReturn(name);
477    when(mockedServer.getZooKeeper()).thenReturn(zkw);
478    when(mockedServer.isStopped()).thenReturn(false);
479    when(mockedServer.getExecutorService()).thenReturn(executorService);
480
481    return mockedServer;
482  }
483
484}