001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER; 021import static org.hamcrest.CoreMatchers.is; 022import static org.hamcrest.CoreMatchers.not; 023import static org.hamcrest.MatcherAssert.assertThat; 024import static org.junit.jupiter.api.Assertions.assertEquals; 025import static org.junit.jupiter.api.Assertions.assertTrue; 026import static org.mockito.Mockito.mock; 027import static org.mockito.Mockito.when; 028 029import java.util.List; 030import java.util.Objects; 031import java.util.concurrent.atomic.LongAdder; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.hbase.CoordinatedStateManager; 034import org.apache.hadoop.hbase.HBaseConfiguration; 035import org.apache.hadoop.hbase.HBaseTestingUtil; 036import org.apache.hadoop.hbase.ServerName; 037import org.apache.hadoop.hbase.SplitLogCounters; 038import org.apache.hadoop.hbase.SplitLogTask; 039import org.apache.hadoop.hbase.Waiter; 040import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager; 041import org.apache.hadoop.hbase.executor.ExecutorService; 042import org.apache.hadoop.hbase.executor.ExecutorType; 043import org.apache.hadoop.hbase.testclassification.MediumTests; 044import org.apache.hadoop.hbase.testclassification.RegionServerTests; 045import org.apache.hadoop.hbase.util.CancelableProgressable; 046import org.apache.hadoop.hbase.util.MockServer; 047import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; 048import org.apache.hadoop.hbase.zookeeper.ZKUtil; 049import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 050import org.apache.hadoop.hbase.zookeeper.ZNodePaths; 051import org.apache.zookeeper.CreateMode; 052import org.apache.zookeeper.ZooDefs.Ids; 053import org.junit.jupiter.api.AfterEach; 054import org.junit.jupiter.api.BeforeEach; 055import org.junit.jupiter.api.Tag; 056import org.junit.jupiter.api.Test; 057import org.slf4j.Logger; 058import org.slf4j.LoggerFactory; 059 060@Tag(RegionServerTests.TAG) 061@Tag(MediumTests.TAG) 062public class TestSplitLogWorker { 063 064 private static final Logger LOG = LoggerFactory.getLogger(TestSplitLogWorker.class); 065 private static final int WAIT_TIME = 15000; 066 private final ServerName MANAGER = ServerName.valueOf("manager,1,1"); 067 private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 068 private DummyServer ds; 069 private ZKWatcher zkw; 070 private SplitLogWorker slw; 071 private ExecutorService executorService; 072 073 static class DummyServer extends MockServer { 074 private ZKWatcher zkw; 075 private Configuration conf; 076 private CoordinatedStateManager cm; 077 078 public DummyServer(ZKWatcher zkw, Configuration conf) { 079 this.zkw = zkw; 080 this.conf = conf; 081 cm = new ZkCoordinatedStateManager(this); 082 } 083 084 @Override 085 public Configuration getConfiguration() { 086 return conf; 087 } 088 089 @Override 090 public ZKWatcher getZooKeeper() { 091 return zkw; 092 } 093 094 @Override 095 public CoordinatedStateManager getCoordinatedStateManager() { 096 return cm; 097 } 098 } 099 100 private void waitForCounter(LongAdder ctr, long oldval, long newval, long timems) 101 throws Exception { 102 assertTrue(waitForCounterBoolean(ctr, oldval, newval, timems), 103 "ctr=" + ctr.sum() + ", oldval=" + oldval + ", newval=" + newval); 104 } 105 106 private boolean waitForCounterBoolean(final LongAdder ctr, final long oldval, long newval, 107 long timems) throws Exception { 108 109 return waitForCounterBoolean(ctr, oldval, newval, timems, true); 110 } 111 112 private boolean waitForCounterBoolean(final LongAdder ctr, final long oldval, final long newval, 113 long timems, boolean failIfTimeout) throws Exception { 114 115 long timeWaited = 116 TEST_UTIL.waitFor(timems, 10, failIfTimeout, new Waiter.Predicate<Exception>() { 117 @Override 118 public boolean evaluate() throws Exception { 119 return (ctr.sum() >= newval); 120 } 121 }); 122 123 if (timeWaited > 0) { 124 // when not timed out 125 assertEquals(newval, ctr.sum()); 126 } 127 return true; 128 } 129 130 @BeforeEach 131 public void setup() throws Exception { 132 TEST_UTIL.startMiniZKCluster(); 133 Configuration conf = TEST_UTIL.getConfiguration(); 134 zkw = new ZKWatcher(TEST_UTIL.getConfiguration(), "split-log-worker-tests", null); 135 ds = new DummyServer(zkw, conf); 136 ZKUtil.deleteChildrenRecursively(zkw, zkw.getZNodePaths().baseZNode); 137 ZKUtil.createAndFailSilent(zkw, zkw.getZNodePaths().baseZNode); 138 assertThat(ZKUtil.checkExists(zkw, zkw.getZNodePaths().baseZNode), not(is(-1))); 139 LOG.debug(zkw.getZNodePaths().baseZNode + " created"); 140 ZKUtil.createAndFailSilent(zkw, zkw.getZNodePaths().splitLogZNode); 141 assertThat(ZKUtil.checkExists(zkw, zkw.getZNodePaths().splitLogZNode), not(is(-1))); 142 143 LOG.debug(zkw.getZNodePaths().splitLogZNode + " created"); 144 ZKUtil.createAndFailSilent(zkw, zkw.getZNodePaths().rsZNode); 145 assertThat(ZKUtil.checkExists(zkw, zkw.getZNodePaths().rsZNode), not(is(-1))); 146 147 SplitLogCounters.resetCounters(); 148 executorService = new ExecutorService("TestSplitLogWorker"); 149 executorService.startExecutorService(executorService.new ExecutorConfig() 150 .setExecutorType(ExecutorType.RS_LOG_REPLAY_OPS).setCorePoolSize(10)); 151 } 152 153 @AfterEach 154 public void teardown() throws Exception { 155 if (executorService != null) { 156 executorService.shutdown(); 157 } 158 TEST_UTIL.shutdownMiniZKCluster(); 159 } 160 161 SplitLogWorker.TaskExecutor neverEndingTask = new SplitLogWorker.TaskExecutor() { 162 163 @Override 164 public Status exec(String name, CancelableProgressable p) { 165 while (true) { 166 try { 167 Thread.sleep(1000); 168 } catch (InterruptedException e) { 169 return Status.PREEMPTED; 170 } 171 if (!p.progress()) { 172 return Status.PREEMPTED; 173 } 174 } 175 } 176 177 }; 178 179 @Test 180 public void testAcquireTaskAtStartup() throws Exception { 181 LOG.info("testAcquireTaskAtStartup"); 182 SplitLogCounters.resetCounters(); 183 final String TATAS = "tatas"; 184 final ServerName RS = ServerName.valueOf("rs,1,1"); 185 RegionServerServices mockedRS = getRegionServer(RS); 186 zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS), 187 new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1")).toByteArray(), Ids.OPEN_ACL_UNSAFE, 188 CreateMode.PERSISTENT); 189 190 SplitLogWorker slw = 191 new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask); 192 slw.start(); 193 try { 194 waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME); 195 byte[] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TATAS)); 196 SplitLogTask slt = SplitLogTask.parseFrom(bytes); 197 assertTrue(slt.isOwned(RS)); 198 } finally { 199 stopSplitLogWorker(slw); 200 } 201 } 202 203 private void stopSplitLogWorker(final SplitLogWorker slw) throws InterruptedException { 204 if (slw != null) { 205 slw.stop(); 206 slw.worker.join(WAIT_TIME); 207 if (slw.worker.isAlive()) { 208 assertTrue(("Could not stop the worker thread slw=" + slw) == null); 209 } 210 } 211 } 212 213 @Test 214 public void testRaceForTask() throws Exception { 215 LOG.info("testRaceForTask"); 216 SplitLogCounters.resetCounters(); 217 final String TRFT = "trft"; 218 final ServerName SVR1 = ServerName.valueOf("svr1,1,1"); 219 final ServerName SVR2 = ServerName.valueOf("svr2,1,1"); 220 zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TRFT), 221 new SplitLogTask.Unassigned(MANAGER).toByteArray(), Ids.OPEN_ACL_UNSAFE, 222 CreateMode.PERSISTENT); 223 RegionServerServices mockedRS1 = getRegionServer(SVR1); 224 RegionServerServices mockedRS2 = getRegionServer(SVR2); 225 SplitLogWorker slw1 = 226 new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS1, neverEndingTask); 227 SplitLogWorker slw2 = 228 new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS2, neverEndingTask); 229 slw1.start(); 230 slw2.start(); 231 try { 232 waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME); 233 // Assert that either the tot_wkr_failed_to_grab_task_owned count was set of if 234 // not it, that we fell through to the next counter in line and it was set. 235 assertTrue(waitForCounterBoolean(SplitLogCounters.tot_wkr_failed_to_grab_task_owned, 0, 1, 236 WAIT_TIME, false) || SplitLogCounters.tot_wkr_failed_to_grab_task_lost_race.sum() == 1); 237 byte[] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TRFT)); 238 SplitLogTask slt = SplitLogTask.parseFrom(bytes); 239 assertTrue(slt.isOwned(SVR1) || slt.isOwned(SVR2)); 240 } finally { 241 stopSplitLogWorker(slw1); 242 stopSplitLogWorker(slw2); 243 } 244 } 245 246 @Test 247 public void testPreemptTask() throws Exception { 248 LOG.info("testPreemptTask"); 249 SplitLogCounters.resetCounters(); 250 final ServerName SRV = ServerName.valueOf("tpt_svr,1,1"); 251 final String PATH = ZKSplitLog.getEncodedNodeName(zkw, "tpt_task"); 252 RegionServerServices mockedRS = getRegionServer(SRV); 253 SplitLogWorker slw = 254 new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask); 255 slw.start(); 256 try { 257 Thread.yield(); // let the worker start 258 Thread.sleep(1000); 259 waitForCounter(SplitLogCounters.tot_wkr_task_grabing, 0, 1, WAIT_TIME); 260 261 // this time create a task node after starting the splitLogWorker 262 zkw.getRecoverableZooKeeper().create(PATH, new SplitLogTask.Unassigned(MANAGER).toByteArray(), 263 Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 264 265 waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME); 266 assertEquals(1, slw.getTaskReadySeq()); 267 byte[] bytes = ZKUtil.getData(zkw, PATH); 268 SplitLogTask slt = SplitLogTask.parseFrom(bytes); 269 assertTrue(slt.isOwned(SRV)); 270 slt = new SplitLogTask.Owned(MANAGER); 271 ZKUtil.setData(zkw, PATH, slt.toByteArray()); 272 waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0, 1, WAIT_TIME); 273 } finally { 274 stopSplitLogWorker(slw); 275 } 276 } 277 278 @Test 279 public void testMultipleTasks() throws Exception { 280 LOG.info("testMultipleTasks"); 281 SplitLogCounters.resetCounters(); 282 final ServerName SRV = ServerName.valueOf("tmt_svr,1,1"); 283 final String PATH1 = ZKSplitLog.getEncodedNodeName(zkw, "tmt_task"); 284 RegionServerServices mockedRS = getRegionServer(SRV); 285 SplitLogWorker slw = 286 new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask); 287 slw.start(); 288 try { 289 Thread.yield(); // let the worker start 290 Thread.sleep(100); 291 waitForCounter(SplitLogCounters.tot_wkr_task_grabing, 0, 1, WAIT_TIME); 292 293 SplitLogTask unassignedManager = new SplitLogTask.Unassigned(MANAGER); 294 zkw.getRecoverableZooKeeper().create(PATH1, unassignedManager.toByteArray(), 295 Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 296 297 waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME); 298 // now the worker is busy doing the above task 299 300 // create another task 301 final String PATH2 = ZKSplitLog.getEncodedNodeName(zkw, "tmt_task_2"); 302 zkw.getRecoverableZooKeeper().create(PATH2, unassignedManager.toByteArray(), 303 Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 304 305 // preempt the first task, have it owned by another worker 306 final ServerName anotherWorker = ServerName.valueOf("another-worker,1,1"); 307 SplitLogTask slt = new SplitLogTask.Owned(anotherWorker); 308 ZKUtil.setData(zkw, PATH1, slt.toByteArray()); 309 waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0, 1, WAIT_TIME); 310 311 waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 1, 2, WAIT_TIME); 312 assertEquals(2, slw.getTaskReadySeq()); 313 byte[] bytes = ZKUtil.getData(zkw, PATH2); 314 slt = SplitLogTask.parseFrom(bytes); 315 assertTrue(slt.isOwned(SRV)); 316 } finally { 317 stopSplitLogWorker(slw); 318 } 319 } 320 321 @Test 322 public void testRescan() throws Exception { 323 LOG.info("testRescan"); 324 SplitLogCounters.resetCounters(); 325 final ServerName SRV = ServerName.valueOf("svr,1,1"); 326 RegionServerServices mockedRS = getRegionServer(SRV); 327 slw = new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask); 328 slw.start(); 329 Thread.yield(); // let the worker start 330 Thread.sleep(100); 331 332 String task = ZKSplitLog.getEncodedNodeName(zkw, "task"); 333 SplitLogTask slt = new SplitLogTask.Unassigned(MANAGER); 334 zkw.getRecoverableZooKeeper().create(task, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE, 335 CreateMode.PERSISTENT); 336 337 waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME); 338 // now the worker is busy doing the above task 339 340 // preempt the task, have it owned by another worker 341 ZKUtil.setData(zkw, task, slt.toByteArray()); 342 waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0, 1, WAIT_TIME); 343 344 // create a RESCAN node 345 String rescan = ZKSplitLog.getEncodedNodeName(zkw, "RESCAN"); 346 rescan = zkw.getRecoverableZooKeeper().create(rescan, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE, 347 CreateMode.PERSISTENT_SEQUENTIAL); 348 349 waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 1, 2, WAIT_TIME); 350 // RESCAN node might not have been processed if the worker became busy 351 // with the above task. preempt the task again so that now the RESCAN 352 // node is processed 353 ZKUtil.setData(zkw, task, slt.toByteArray()); 354 waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 1, 2, WAIT_TIME); 355 waitForCounter(SplitLogCounters.tot_wkr_task_acquired_rescan, 0, 1, WAIT_TIME); 356 357 List<String> nodes = ZKUtil.listChildrenNoWatch(zkw, zkw.getZNodePaths().splitLogZNode); 358 LOG.debug(Objects.toString(nodes)); 359 int num = 0; 360 for (String node : nodes) { 361 num++; 362 if (node.startsWith("RESCAN")) { 363 String name = ZKSplitLog.getEncodedNodeName(zkw, node); 364 String fn = ZKSplitLog.getFileName(name); 365 byte[] data = 366 ZKUtil.getData(zkw, ZNodePaths.joinZNode(zkw.getZNodePaths().splitLogZNode, fn)); 367 slt = SplitLogTask.parseFrom(data); 368 assertTrue(slt.isDone(SRV), slt.toString()); 369 } 370 } 371 assertEquals(2, num); 372 } 373 374 @Test 375 public void testAcquireMultiTasks() throws Exception { 376 LOG.info("testAcquireMultiTasks"); 377 SplitLogCounters.resetCounters(); 378 final String TATAS = "tatas"; 379 final ServerName RS = ServerName.valueOf("rs,1,1"); 380 final int maxTasks = 3; 381 Configuration testConf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); 382 testConf.setInt(HBASE_SPLIT_WAL_MAX_SPLITTER, maxTasks); 383 RegionServerServices mockedRS = getRegionServer(RS); 384 for (int i = 0; i < maxTasks; i++) { 385 zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS + i), 386 new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1")).toByteArray(), 387 Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 388 } 389 390 SplitLogWorker slw = new SplitLogWorker(ds, testConf, mockedRS, neverEndingTask); 391 slw.start(); 392 try { 393 waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, maxTasks, WAIT_TIME); 394 for (int i = 0; i < maxTasks; i++) { 395 byte[] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TATAS + i)); 396 SplitLogTask slt = SplitLogTask.parseFrom(bytes); 397 assertTrue(slt.isOwned(RS)); 398 } 399 } finally { 400 stopSplitLogWorker(slw); 401 } 402 } 403 404 /** 405 * Create a mocked region server service instance 406 */ 407 private RegionServerServices getRegionServer(ServerName name) { 408 409 RegionServerServices mockedServer = mock(RegionServerServices.class); 410 when(mockedServer.getConfiguration()).thenReturn(TEST_UTIL.getConfiguration()); 411 when(mockedServer.getServerName()).thenReturn(name); 412 when(mockedServer.getZooKeeper()).thenReturn(zkw); 413 when(mockedServer.isStopped()).thenReturn(false); 414 when(mockedServer.getExecutorService()).thenReturn(executorService); 415 416 return mockedServer; 417 } 418 419}