001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.replication;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertSame;
022import static org.junit.jupiter.api.Assertions.fail;
023import static org.mockito.ArgumentMatchers.any;
024import static org.mockito.Mockito.doAnswer;
025import static org.mockito.Mockito.mock;
026import static org.mockito.Mockito.when;
027
028import java.io.IOException;
029import java.util.ArrayDeque;
030import java.util.ArrayList;
031import java.util.HashSet;
032import java.util.Iterator;
033import java.util.List;
034import java.util.Queue;
035import java.util.Set;
036import java.util.function.Function;
037import java.util.stream.Collectors;
038import org.apache.hadoop.fs.Path;
039import org.apache.hadoop.hbase.HBaseTestingUtil;
040import org.apache.hadoop.hbase.ServerMetrics;
041import org.apache.hadoop.hbase.ServerName;
042import org.apache.hadoop.hbase.master.MasterFileSystem;
043import org.apache.hadoop.hbase.master.MasterServices;
044import org.apache.hadoop.hbase.master.ServerListener;
045import org.apache.hadoop.hbase.master.ServerManager;
046import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
047import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler;
048import org.apache.hadoop.hbase.procedure2.Procedure;
049import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
050import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
051import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
052import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.NoopProcedure;
053import org.apache.hadoop.hbase.replication.ReplicationException;
054import org.apache.hadoop.hbase.testclassification.MasterTests;
055import org.apache.hadoop.hbase.testclassification.SmallTests;
056import org.junit.jupiter.api.BeforeEach;
057import org.junit.jupiter.api.Tag;
058import org.junit.jupiter.api.Test;
059import org.mockito.invocation.InvocationOnMock;
060import org.mockito.stubbing.Answer;
061
062@Tag(MasterTests.TAG)
063@Tag(SmallTests.TAG)
064public class TestSyncReplicationReplayWALManager {
065
066  private static HBaseTestingUtil UTIL = new HBaseTestingUtil();
067
068  private SyncReplicationReplayWALManager manager;
069
070  private MasterProcedureScheduler scheduler;
071
072  private Set<ServerName> onlineServers;
073
074  private List<ServerListener> listeners;
075
076  private Queue<Procedure<?>> wokenProcedures;
077
078  @BeforeEach
079  public void setUp() throws IOException, ReplicationException {
080    wokenProcedures = new ArrayDeque<>();
081    onlineServers = new HashSet<>();
082    listeners = new ArrayList<>();
083    ServerManager serverManager = mock(ServerManager.class);
084    doAnswer(inv -> listeners.add(inv.getArgument(0))).when(serverManager)
085      .registerListener(any(ServerListener.class));
086    ServerMetrics serverMetrics = mock(ServerMetrics.class);
087    doAnswer(inv -> onlineServers.stream()
088      .collect(Collectors.toMap(Function.identity(), k -> serverMetrics))).when(serverManager)
089      .getOnlineServers();
090
091    MasterFileSystem mfs = mock(MasterFileSystem.class);
092    when(mfs.getFileSystem()).thenReturn(UTIL.getTestFileSystem());
093    when(mfs.getWALRootDir()).thenReturn(new Path("/"));
094
095    scheduler = mock(MasterProcedureScheduler.class);
096    doAnswer(new Answer<Void>() {
097
098      @Override
099      public Void answer(InvocationOnMock invocation) throws Throwable {
100        ProcedureEvent<?> event = ((ProcedureEvent<?>[]) invocation.getArgument(0))[0];
101        event.wakeInternal(new MasterProcedureScheduler(pid -> null) {
102
103          @Override
104          public void addFront(Iterator<Procedure> procedureIterator) {
105            procedureIterator.forEachRemaining(wokenProcedures::add);
106          }
107        });
108        return null;
109      }
110    }).when(scheduler).wakeEvents(any(ProcedureEvent[].class));
111    MasterProcedureEnv env = mock(MasterProcedureEnv.class);
112    when(env.getProcedureScheduler()).thenReturn(scheduler);
113    ProcedureExecutor<MasterProcedureEnv> procExec = mock(ProcedureExecutor.class);
114    when(procExec.getEnvironment()).thenReturn(env);
115
116    MasterServices services = mock(MasterServices.class);
117    when(services.getServerManager()).thenReturn(serverManager);
118    when(services.getMasterFileSystem()).thenReturn(mfs);
119    when(services.getMasterProcedureExecutor()).thenReturn(procExec);
120    manager = new SyncReplicationReplayWALManager(services);
121    assertEquals(1, listeners.size());
122  }
123
124  @Test
125  public void testUsedWorkers() throws ProcedureSuspendedException {
126    String peerId1 = "1";
127    String peerId2 = "2";
128    ServerName sn1 = ServerName.valueOf("host1", 123, 12345);
129    ServerName sn2 = ServerName.valueOf("host2", 234, 23456);
130    ServerName sn3 = ServerName.valueOf("host3", 345, 34567);
131    onlineServers.add(sn1);
132    manager.registerPeer(peerId1);
133    manager.registerPeer(peerId2);
134    // confirm that different peer ids does not affect each other
135    assertEquals(sn1, manager.acquirePeerWorker(peerId1, new NoopProcedure<>()));
136    assertEquals(sn1, manager.acquirePeerWorker(peerId2, new NoopProcedure<>()));
137    onlineServers.add(sn2);
138    assertEquals(sn2, manager.acquirePeerWorker(peerId1, new NoopProcedure<>()));
139    assertEquals(sn2, manager.acquirePeerWorker(peerId2, new NoopProcedure<>()));
140
141    NoopProcedure<?> proc = new NoopProcedure<>();
142    try {
143      manager.acquirePeerWorker(peerId1, proc);
144      fail("Should suspend");
145    } catch (ProcedureSuspendedException e) {
146      // expected
147    }
148    manager.releasePeerWorker(peerId1, sn1, scheduler);
149    assertEquals(1, wokenProcedures.size());
150    assertSame(proc, wokenProcedures.poll());
151
152    assertEquals(sn1, manager.acquirePeerWorker(peerId1, new NoopProcedure<>()));
153
154    NoopProcedure<?> proc1 = new NoopProcedure<>();
155    NoopProcedure<?> proc2 = new NoopProcedure<>();
156    try {
157      manager.acquirePeerWorker(peerId1, proc1);
158      fail("Should suspend");
159    } catch (ProcedureSuspendedException e) {
160      // expected
161    }
162    try {
163      manager.acquirePeerWorker(peerId1, proc2);
164      fail("Should suspend");
165    } catch (ProcedureSuspendedException e) {
166      // expected
167    }
168
169    listeners.get(0).serverAdded(sn3);
170    assertEquals(2, wokenProcedures.size());
171    assertSame(proc2, wokenProcedures.poll());
172    assertSame(proc1, wokenProcedures.poll());
173  }
174}