001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.replication; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertSame; 022import static org.junit.jupiter.api.Assertions.fail; 023import static org.mockito.ArgumentMatchers.any; 024import static org.mockito.Mockito.doAnswer; 025import static org.mockito.Mockito.mock; 026import static org.mockito.Mockito.when; 027 028import java.io.IOException; 029import java.util.ArrayDeque; 030import java.util.ArrayList; 031import java.util.HashSet; 032import java.util.Iterator; 033import java.util.List; 034import java.util.Queue; 035import java.util.Set; 036import java.util.function.Function; 037import java.util.stream.Collectors; 038import org.apache.hadoop.fs.Path; 039import org.apache.hadoop.hbase.HBaseTestingUtil; 040import org.apache.hadoop.hbase.ServerMetrics; 041import org.apache.hadoop.hbase.ServerName; 042import org.apache.hadoop.hbase.master.MasterFileSystem; 043import org.apache.hadoop.hbase.master.MasterServices; 044import org.apache.hadoop.hbase.master.ServerListener; 045import org.apache.hadoop.hbase.master.ServerManager; 046import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; 047import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler; 048import org.apache.hadoop.hbase.procedure2.Procedure; 049import org.apache.hadoop.hbase.procedure2.ProcedureEvent; 050import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; 051import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; 052import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.NoopProcedure; 053import org.apache.hadoop.hbase.replication.ReplicationException; 054import org.apache.hadoop.hbase.testclassification.MasterTests; 055import org.apache.hadoop.hbase.testclassification.SmallTests; 056import org.junit.jupiter.api.BeforeEach; 057import org.junit.jupiter.api.Tag; 058import org.junit.jupiter.api.Test; 059import org.mockito.invocation.InvocationOnMock; 060import org.mockito.stubbing.Answer; 061 062@Tag(MasterTests.TAG) 063@Tag(SmallTests.TAG) 064public class TestSyncReplicationReplayWALManager { 065 066 private static HBaseTestingUtil UTIL = new HBaseTestingUtil(); 067 068 private SyncReplicationReplayWALManager manager; 069 070 private MasterProcedureScheduler scheduler; 071 072 private Set<ServerName> onlineServers; 073 074 private List<ServerListener> listeners; 075 076 private Queue<Procedure<?>> wokenProcedures; 077 078 @BeforeEach 079 public void setUp() throws IOException, ReplicationException { 080 wokenProcedures = new ArrayDeque<>(); 081 onlineServers = new HashSet<>(); 082 listeners = new ArrayList<>(); 083 ServerManager serverManager = mock(ServerManager.class); 084 doAnswer(inv -> listeners.add(inv.getArgument(0))).when(serverManager) 085 .registerListener(any(ServerListener.class)); 086 ServerMetrics serverMetrics = mock(ServerMetrics.class); 087 doAnswer(inv -> onlineServers.stream() 088 .collect(Collectors.toMap(Function.identity(), k -> serverMetrics))).when(serverManager) 089 .getOnlineServers(); 090 091 MasterFileSystem mfs = mock(MasterFileSystem.class); 092 when(mfs.getFileSystem()).thenReturn(UTIL.getTestFileSystem()); 093 when(mfs.getWALRootDir()).thenReturn(new Path("/")); 094 095 scheduler = mock(MasterProcedureScheduler.class); 096 doAnswer(new Answer<Void>() { 097 098 @Override 099 public Void answer(InvocationOnMock invocation) throws Throwable { 100 ProcedureEvent<?> event = ((ProcedureEvent<?>[]) invocation.getArgument(0))[0]; 101 event.wakeInternal(new MasterProcedureScheduler(pid -> null) { 102 103 @Override 104 public void addFront(Iterator<Procedure> procedureIterator) { 105 procedureIterator.forEachRemaining(wokenProcedures::add); 106 } 107 }); 108 return null; 109 } 110 }).when(scheduler).wakeEvents(any(ProcedureEvent[].class)); 111 MasterProcedureEnv env = mock(MasterProcedureEnv.class); 112 when(env.getProcedureScheduler()).thenReturn(scheduler); 113 ProcedureExecutor<MasterProcedureEnv> procExec = mock(ProcedureExecutor.class); 114 when(procExec.getEnvironment()).thenReturn(env); 115 116 MasterServices services = mock(MasterServices.class); 117 when(services.getServerManager()).thenReturn(serverManager); 118 when(services.getMasterFileSystem()).thenReturn(mfs); 119 when(services.getMasterProcedureExecutor()).thenReturn(procExec); 120 manager = new SyncReplicationReplayWALManager(services); 121 assertEquals(1, listeners.size()); 122 } 123 124 @Test 125 public void testUsedWorkers() throws ProcedureSuspendedException { 126 String peerId1 = "1"; 127 String peerId2 = "2"; 128 ServerName sn1 = ServerName.valueOf("host1", 123, 12345); 129 ServerName sn2 = ServerName.valueOf("host2", 234, 23456); 130 ServerName sn3 = ServerName.valueOf("host3", 345, 34567); 131 onlineServers.add(sn1); 132 manager.registerPeer(peerId1); 133 manager.registerPeer(peerId2); 134 // confirm that different peer ids does not affect each other 135 assertEquals(sn1, manager.acquirePeerWorker(peerId1, new NoopProcedure<>())); 136 assertEquals(sn1, manager.acquirePeerWorker(peerId2, new NoopProcedure<>())); 137 onlineServers.add(sn2); 138 assertEquals(sn2, manager.acquirePeerWorker(peerId1, new NoopProcedure<>())); 139 assertEquals(sn2, manager.acquirePeerWorker(peerId2, new NoopProcedure<>())); 140 141 NoopProcedure<?> proc = new NoopProcedure<>(); 142 try { 143 manager.acquirePeerWorker(peerId1, proc); 144 fail("Should suspend"); 145 } catch (ProcedureSuspendedException e) { 146 // expected 147 } 148 manager.releasePeerWorker(peerId1, sn1, scheduler); 149 assertEquals(1, wokenProcedures.size()); 150 assertSame(proc, wokenProcedures.poll()); 151 152 assertEquals(sn1, manager.acquirePeerWorker(peerId1, new NoopProcedure<>())); 153 154 NoopProcedure<?> proc1 = new NoopProcedure<>(); 155 NoopProcedure<?> proc2 = new NoopProcedure<>(); 156 try { 157 manager.acquirePeerWorker(peerId1, proc1); 158 fail("Should suspend"); 159 } catch (ProcedureSuspendedException e) { 160 // expected 161 } 162 try { 163 manager.acquirePeerWorker(peerId1, proc2); 164 fail("Should suspend"); 165 } catch (ProcedureSuspendedException e) { 166 // expected 167 } 168 169 listeners.get(0).serverAdded(sn3); 170 assertEquals(2, wokenProcedures.size()); 171 assertSame(proc2, wokenProcedures.poll()); 172 assertSame(proc1, wokenProcedures.poll()); 173 } 174}