001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.hamcrest.CoreMatchers.is;
021import static org.hamcrest.CoreMatchers.not;
022import static org.junit.Assert.assertEquals;
023import static org.junit.Assert.assertThat;
024import static org.junit.Assert.assertTrue;
025import static org.mockito.Mockito.mock;
026import static org.mockito.Mockito.when;
027
028import java.io.IOException;
029import java.util.List;
030import java.util.Objects;
031import java.util.concurrent.atomic.LongAdder;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.fs.FileSystem;
034import org.apache.hadoop.hbase.ChoreService;
035import org.apache.hadoop.hbase.CoordinatedStateManager;
036import org.apache.hadoop.hbase.HBaseClassTestRule;
037import org.apache.hadoop.hbase.HBaseConfiguration;
038import org.apache.hadoop.hbase.HBaseTestingUtility;
039import org.apache.hadoop.hbase.Server;
040import org.apache.hadoop.hbase.ServerName;
041import org.apache.hadoop.hbase.SplitLogCounters;
042import org.apache.hadoop.hbase.SplitLogTask;
043import org.apache.hadoop.hbase.Waiter;
044import org.apache.hadoop.hbase.client.ClusterConnection;
045import org.apache.hadoop.hbase.client.Connection;
046import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager;
047import org.apache.hadoop.hbase.executor.ExecutorService;
048import org.apache.hadoop.hbase.executor.ExecutorType;
049import org.apache.hadoop.hbase.testclassification.MediumTests;
050import org.apache.hadoop.hbase.testclassification.RegionServerTests;
051import org.apache.hadoop.hbase.util.CancelableProgressable;
052import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
053import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
054import org.apache.hadoop.hbase.zookeeper.ZKUtil;
055import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
056import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
057import org.apache.zookeeper.CreateMode;
058import org.apache.zookeeper.ZooDefs.Ids;
059import org.junit.After;
060import org.junit.Before;
061import org.junit.ClassRule;
062import org.junit.Test;
063import org.junit.experimental.categories.Category;
064import org.slf4j.Logger;
065import org.slf4j.LoggerFactory;
066
067@Category({RegionServerTests.class, MediumTests.class})
068public class TestSplitLogWorker {
069
070  @ClassRule
071  public static final HBaseClassTestRule CLASS_RULE =
072      HBaseClassTestRule.forClass(TestSplitLogWorker.class);
073
074  private static final Logger LOG = LoggerFactory.getLogger(TestSplitLogWorker.class);
075  private static final int WAIT_TIME = 15000;
076  private final ServerName MANAGER = ServerName.valueOf("manager,1,1");
077  private final static HBaseTestingUtility TEST_UTIL =
078    new HBaseTestingUtility();
079  private DummyServer ds;
080  private ZKWatcher zkw;
081  private SplitLogWorker slw;
082  private ExecutorService executorService;
083
084  static class DummyServer implements Server {
085    private ZKWatcher zkw;
086    private Configuration conf;
087    private CoordinatedStateManager cm;
088
089    public DummyServer(ZKWatcher zkw, Configuration conf) {
090      this.zkw = zkw;
091      this.conf = conf;
092      cm = new ZkCoordinatedStateManager(this);
093    }
094
095    @Override
096    public void abort(String why, Throwable e) {
097    }
098
099    @Override
100    public boolean isAborted() {
101      return false;
102    }
103
104    @Override
105    public void stop(String why) {
106    }
107
108    @Override
109    public boolean isStopped() {
110      return false;
111    }
112
113    @Override
114    public Configuration getConfiguration() {
115      return conf;
116    }
117
118    @Override
119    public ZKWatcher getZooKeeper() {
120      return zkw;
121    }
122
123    @Override
124    public ServerName getServerName() {
125      return null;
126    }
127
128    @Override
129    public CoordinatedStateManager getCoordinatedStateManager() {
130      return cm;
131    }
132
133    @Override
134    public ClusterConnection getConnection() {
135      return null;
136    }
137
138    @Override
139    public MetaTableLocator getMetaTableLocator() {
140      return null;
141    }
142
143    @Override
144    public ChoreService getChoreService() {
145      return null;
146    }
147
148    @Override
149    public ClusterConnection getClusterConnection() {
150      // TODO Auto-generated method stub
151      return null;
152    }
153
154    @Override
155    public FileSystem getFileSystem() {
156      return null;
157    }
158
159    @Override
160    public boolean isStopping() {
161      return false;
162    }
163
164    @Override
165    public Connection createConnection(Configuration conf) throws IOException {
166      return null;
167    }
168  }
169
170  private void waitForCounter(LongAdder ctr, long oldval, long newval, long timems)
171      throws Exception {
172    assertTrue("ctr=" + ctr.sum() + ", oldval=" + oldval + ", newval=" + newval,
173      waitForCounterBoolean(ctr, oldval, newval, timems));
174  }
175
176  private boolean waitForCounterBoolean(final LongAdder ctr, final long oldval, long newval,
177      long timems) throws Exception {
178
179    return waitForCounterBoolean(ctr, oldval, newval, timems, true);
180  }
181
182  private boolean waitForCounterBoolean(final LongAdder ctr, final long oldval, final long newval,
183      long timems, boolean failIfTimeout) throws Exception {
184
185    long timeWaited = TEST_UTIL.waitFor(timems, 10, failIfTimeout,
186      new Waiter.Predicate<Exception>() {
187      @Override
188      public boolean evaluate() throws Exception {
189            return (ctr.sum() >= newval);
190      }
191    });
192
193    if( timeWaited > 0) {
194      // when not timed out
195      assertEquals(newval, ctr.sum());
196    }
197    return true;
198  }
199
200  @Before
201  public void setup() throws Exception {
202    TEST_UTIL.startMiniZKCluster();
203    Configuration conf = TEST_UTIL.getConfiguration();
204    zkw = new ZKWatcher(TEST_UTIL.getConfiguration(),
205        "split-log-worker-tests", null);
206    ds = new DummyServer(zkw, conf);
207    ZKUtil.deleteChildrenRecursively(zkw, zkw.getZNodePaths().baseZNode);
208    ZKUtil.createAndFailSilent(zkw, zkw.getZNodePaths().baseZNode);
209    assertThat(ZKUtil.checkExists(zkw, zkw.getZNodePaths().baseZNode), not(is(-1)));
210    LOG.debug(zkw.getZNodePaths().baseZNode + " created");
211    ZKUtil.createAndFailSilent(zkw, zkw.getZNodePaths().splitLogZNode);
212    assertThat(ZKUtil.checkExists(zkw, zkw.getZNodePaths().splitLogZNode), not(is(-1)));
213
214    LOG.debug(zkw.getZNodePaths().splitLogZNode + " created");
215    ZKUtil.createAndFailSilent(zkw, zkw.getZNodePaths().rsZNode);
216    assertThat(ZKUtil.checkExists(zkw, zkw.getZNodePaths().rsZNode), not(is(-1)));
217
218    SplitLogCounters.resetCounters();
219    executorService = new ExecutorService("TestSplitLogWorker");
220    executorService.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, 10);
221  }
222
223  @After
224  public void teardown() throws Exception {
225    if (executorService != null) {
226      executorService.shutdown();
227    }
228    TEST_UTIL.shutdownMiniZKCluster();
229  }
230
231  SplitLogWorker.TaskExecutor neverEndingTask =
232    new SplitLogWorker.TaskExecutor() {
233
234      @Override
235      public Status exec(String name, CancelableProgressable p) {
236        while (true) {
237          try {
238            Thread.sleep(1000);
239          } catch (InterruptedException e) {
240            return Status.PREEMPTED;
241          }
242          if (!p.progress()) {
243            return Status.PREEMPTED;
244          }
245        }
246      }
247
248  };
249
250  @Test
251  public void testAcquireTaskAtStartup() throws Exception {
252    LOG.info("testAcquireTaskAtStartup");
253    SplitLogCounters.resetCounters();
254    final String TATAS = "tatas";
255    final ServerName RS = ServerName.valueOf("rs,1,1");
256    RegionServerServices mockedRS = getRegionServer(RS);
257    zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS),
258      new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1")).toByteArray(),
259        Ids.OPEN_ACL_UNSAFE,
260        CreateMode.PERSISTENT);
261
262    SplitLogWorker slw =
263        new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask);
264    slw.start();
265    try {
266      waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME);
267      byte [] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TATAS));
268      SplitLogTask slt = SplitLogTask.parseFrom(bytes);
269      assertTrue(slt.isOwned(RS));
270    } finally {
271     stopSplitLogWorker(slw);
272    }
273  }
274
275  private void stopSplitLogWorker(final SplitLogWorker slw)
276  throws InterruptedException {
277    if (slw != null) {
278      slw.stop();
279      slw.worker.join(WAIT_TIME);
280      if (slw.worker.isAlive()) {
281        assertTrue(("Could not stop the worker thread slw=" + slw) == null);
282      }
283    }
284  }
285
286  @Test
287  public void testRaceForTask() throws Exception {
288    LOG.info("testRaceForTask");
289    SplitLogCounters.resetCounters();
290    final String TRFT = "trft";
291    final ServerName SVR1 = ServerName.valueOf("svr1,1,1");
292    final ServerName SVR2 = ServerName.valueOf("svr2,1,1");
293    zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TRFT),
294      new SplitLogTask.Unassigned(MANAGER).toByteArray(),
295        Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
296    RegionServerServices mockedRS1 = getRegionServer(SVR1);
297    RegionServerServices mockedRS2 = getRegionServer(SVR2);
298    SplitLogWorker slw1 =
299        new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS1, neverEndingTask);
300    SplitLogWorker slw2 =
301        new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS2, neverEndingTask);
302    slw1.start();
303    slw2.start();
304    try {
305      waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME);
306      // Assert that either the tot_wkr_failed_to_grab_task_owned count was set of if
307      // not it, that we fell through to the next counter in line and it was set.
308      assertTrue(waitForCounterBoolean(SplitLogCounters.tot_wkr_failed_to_grab_task_owned, 0, 1,
309          WAIT_TIME, false) ||
310        SplitLogCounters.tot_wkr_failed_to_grab_task_lost_race.sum() == 1);
311      byte [] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TRFT));
312      SplitLogTask slt = SplitLogTask.parseFrom(bytes);
313      assertTrue(slt.isOwned(SVR1) || slt.isOwned(SVR2));
314    } finally {
315      stopSplitLogWorker(slw1);
316      stopSplitLogWorker(slw2);
317    }
318  }
319
320  @Test
321  public void testPreemptTask() throws Exception {
322    LOG.info("testPreemptTask");
323    SplitLogCounters.resetCounters();
324    final ServerName SRV = ServerName.valueOf("tpt_svr,1,1");
325    final String PATH = ZKSplitLog.getEncodedNodeName(zkw, "tpt_task");
326    RegionServerServices mockedRS = getRegionServer(SRV);
327    SplitLogWorker slw =
328        new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask);
329    slw.start();
330    try {
331      Thread.yield(); // let the worker start
332      Thread.sleep(1000);
333      waitForCounter(SplitLogCounters.tot_wkr_task_grabing, 0, 1, WAIT_TIME);
334
335      // this time create a task node after starting the splitLogWorker
336      zkw.getRecoverableZooKeeper().create(PATH,
337        new SplitLogTask.Unassigned(MANAGER).toByteArray(),
338        Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
339
340      waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME);
341      assertEquals(1, slw.getTaskReadySeq());
342      byte [] bytes = ZKUtil.getData(zkw, PATH);
343      SplitLogTask slt = SplitLogTask.parseFrom(bytes);
344      assertTrue(slt.isOwned(SRV));
345      slt = new SplitLogTask.Owned(MANAGER);
346      ZKUtil.setData(zkw, PATH, slt.toByteArray());
347      waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0, 1, WAIT_TIME);
348    } finally {
349      stopSplitLogWorker(slw);
350    }
351  }
352
353  @Test
354  public void testMultipleTasks() throws Exception {
355    LOG.info("testMultipleTasks");
356    SplitLogCounters.resetCounters();
357    final ServerName SRV = ServerName.valueOf("tmt_svr,1,1");
358    final String PATH1 = ZKSplitLog.getEncodedNodeName(zkw, "tmt_task");
359    RegionServerServices mockedRS = getRegionServer(SRV);
360    SplitLogWorker slw =
361        new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask);
362    slw.start();
363    try {
364      Thread.yield(); // let the worker start
365      Thread.sleep(100);
366      waitForCounter(SplitLogCounters.tot_wkr_task_grabing, 0, 1, WAIT_TIME);
367
368      SplitLogTask unassignedManager =
369        new SplitLogTask.Unassigned(MANAGER);
370      zkw.getRecoverableZooKeeper().create(PATH1, unassignedManager.toByteArray(),
371        Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
372
373      waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME);
374      // now the worker is busy doing the above task
375
376      // create another task
377      final String PATH2 = ZKSplitLog.getEncodedNodeName(zkw, "tmt_task_2");
378      zkw.getRecoverableZooKeeper().create(PATH2, unassignedManager.toByteArray(),
379        Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
380
381      // preempt the first task, have it owned by another worker
382      final ServerName anotherWorker = ServerName.valueOf("another-worker,1,1");
383      SplitLogTask slt = new SplitLogTask.Owned(anotherWorker);
384      ZKUtil.setData(zkw, PATH1, slt.toByteArray());
385      waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0, 1, WAIT_TIME);
386
387      waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 1, 2, WAIT_TIME);
388      assertEquals(2, slw.getTaskReadySeq());
389      byte [] bytes = ZKUtil.getData(zkw, PATH2);
390      slt = SplitLogTask.parseFrom(bytes);
391      assertTrue(slt.isOwned(SRV));
392    } finally {
393      stopSplitLogWorker(slw);
394    }
395  }
396
397  @Test
398  public void testRescan() throws Exception {
399    LOG.info("testRescan");
400    SplitLogCounters.resetCounters();
401    final ServerName SRV = ServerName.valueOf("svr,1,1");
402    RegionServerServices mockedRS = getRegionServer(SRV);
403    slw = new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask);
404    slw.start();
405    Thread.yield(); // let the worker start
406    Thread.sleep(100);
407
408    String task = ZKSplitLog.getEncodedNodeName(zkw, "task");
409    SplitLogTask slt = new SplitLogTask.Unassigned(MANAGER);
410    zkw.getRecoverableZooKeeper().create(task,slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
411      CreateMode.PERSISTENT);
412
413    waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME);
414    // now the worker is busy doing the above task
415
416    // preempt the task, have it owned by another worker
417    ZKUtil.setData(zkw, task, slt.toByteArray());
418    waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0, 1, WAIT_TIME);
419
420    // create a RESCAN node
421    String rescan = ZKSplitLog.getEncodedNodeName(zkw, "RESCAN");
422    rescan = zkw.getRecoverableZooKeeper().create(rescan, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
423      CreateMode.PERSISTENT_SEQUENTIAL);
424
425    waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 1, 2, WAIT_TIME);
426    // RESCAN node might not have been processed if the worker became busy
427    // with the above task. preempt the task again so that now the RESCAN
428    // node is processed
429    ZKUtil.setData(zkw, task, slt.toByteArray());
430    waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 1, 2, WAIT_TIME);
431    waitForCounter(SplitLogCounters.tot_wkr_task_acquired_rescan, 0, 1, WAIT_TIME);
432
433    List<String> nodes = ZKUtil.listChildrenNoWatch(zkw, zkw.getZNodePaths().splitLogZNode);
434    LOG.debug(Objects.toString(nodes));
435    int num = 0;
436    for (String node : nodes) {
437      num++;
438      if (node.startsWith("RESCAN")) {
439        String name = ZKSplitLog.getEncodedNodeName(zkw, node);
440        String fn = ZKSplitLog.getFileName(name);
441        byte [] data = ZKUtil.getData(zkw,
442                ZNodePaths.joinZNode(zkw.getZNodePaths().splitLogZNode, fn));
443        slt = SplitLogTask.parseFrom(data);
444        assertTrue(slt.toString(), slt.isDone(SRV));
445      }
446    }
447    assertEquals(2, num);
448  }
449
450  @Test
451  public void testAcquireMultiTasks() throws Exception {
452    LOG.info("testAcquireMultiTasks");
453    SplitLogCounters.resetCounters();
454    final String TATAS = "tatas";
455    final ServerName RS = ServerName.valueOf("rs,1,1");
456    final int maxTasks = 3;
457    Configuration testConf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
458    testConf.setInt("hbase.regionserver.wal.max.splitters", maxTasks);
459    RegionServerServices mockedRS = getRegionServer(RS);
460    for (int i = 0; i < maxTasks; i++) {
461      zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS + i),
462        new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1")).toByteArray(),
463          Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
464    }
465
466    SplitLogWorker slw = new SplitLogWorker(ds, testConf, mockedRS, neverEndingTask);
467    slw.start();
468    try {
469      waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, maxTasks, WAIT_TIME);
470      for (int i = 0; i < maxTasks; i++) {
471        byte[] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TATAS + i));
472        SplitLogTask slt = SplitLogTask.parseFrom(bytes);
473        assertTrue(slt.isOwned(RS));
474      }
475    } finally {
476      stopSplitLogWorker(slw);
477    }
478  }
479
480  /**
481   * Create a mocked region server service instance
482   */
483  private RegionServerServices getRegionServer(ServerName name) {
484
485    RegionServerServices mockedServer = mock(RegionServerServices.class);
486    when(mockedServer.getConfiguration()).thenReturn(TEST_UTIL.getConfiguration());
487    when(mockedServer.getServerName()).thenReturn(name);
488    when(mockedServer.getZooKeeper()).thenReturn(zkw);
489    when(mockedServer.isStopped()).thenReturn(false);
490    when(mockedServer.getExecutorService()).thenReturn(executorService);
491
492    return mockedServer;
493  }
494
495}