001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.master;
019
020import static org.hamcrest.MatcherAssert.assertThat;
021import static org.hamcrest.Matchers.emptyIterable;
022import static org.junit.jupiter.api.Assertions.assertFalse;
023import static org.junit.jupiter.api.Assertions.assertSame;
024import static org.junit.jupiter.api.Assertions.assertTrue;
025import static org.mockito.Mockito.mock;
026import static org.mockito.Mockito.verify;
027import static org.mockito.Mockito.when;
028
029import java.util.ArrayList;
030import java.util.Arrays;
031import java.util.Collections;
032import java.util.Iterator;
033import java.util.List;
034import java.util.Map;
035import java.util.stream.Collectors;
036import java.util.stream.Stream;
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.fs.FileStatus;
039import org.apache.hadoop.fs.Path;
040import org.apache.hadoop.hbase.HBaseConfiguration;
041import org.apache.hadoop.hbase.ServerName;
042import org.apache.hadoop.hbase.client.AsyncClusterConnection;
043import org.apache.hadoop.hbase.master.HMaster;
044import org.apache.hadoop.hbase.master.MasterServices;
045import org.apache.hadoop.hbase.master.ServerManager;
046import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
047import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
048import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
049import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
050import org.apache.hadoop.hbase.replication.ReplicationException;
051import org.apache.hadoop.hbase.replication.ReplicationGroupOffset;
052import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
053import org.apache.hadoop.hbase.replication.ReplicationQueueData;
054import org.apache.hadoop.hbase.replication.ReplicationQueueId;
055import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
056import org.apache.hadoop.hbase.testclassification.MasterTests;
057import org.apache.hadoop.hbase.testclassification.SmallTests;
058import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
059import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
060import org.junit.jupiter.api.AfterEach;
061import org.junit.jupiter.api.BeforeEach;
062import org.junit.jupiter.api.Tag;
063import org.junit.jupiter.api.Test;
064import org.mockito.Mockito;
065
066import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
067
068@Tag(MasterTests.TAG)
069@Tag(SmallTests.TAG)
070public class TestReplicationLogCleaner {
071
072  private static final Configuration CONF = HBaseConfiguration.create();
073
074  private MasterServices services;
075
076  private ReplicationLogCleaner cleaner;
077
078  private ReplicationPeerManager rpm;
079
080  @BeforeEach
081  public void setUp() throws ReplicationException {
082    services = mock(MasterServices.class);
083    when(services.getReplicationLogCleanerBarrier()).thenReturn(new ReplicationLogCleanerBarrier());
084    AsyncClusterConnection asyncClusterConnection = mock(AsyncClusterConnection.class);
085    when(services.getAsyncClusterConnection()).thenReturn(asyncClusterConnection);
086    when(asyncClusterConnection.isClosed()).thenReturn(false);
087    rpm = mock(ReplicationPeerManager.class);
088    when(services.getReplicationPeerManager()).thenReturn(rpm);
089    when(rpm.listPeers(null)).thenReturn(new ArrayList<>());
090    ReplicationQueueStorage rqs = mock(ReplicationQueueStorage.class);
091    when(rpm.getQueueStorage()).thenReturn(rqs);
092    when(rqs.hasData()).thenReturn(true);
093    when(rqs.listAllQueues()).thenReturn(new ArrayList<>());
094    ServerManager sm = mock(ServerManager.class);
095    when(services.getServerManager()).thenReturn(sm);
096    when(sm.getOnlineServersList()).thenReturn(new ArrayList<>());
097    @SuppressWarnings("unchecked")
098    ProcedureExecutor<MasterProcedureEnv> procExec = mock(ProcedureExecutor.class);
099    when(services.getMasterProcedureExecutor()).thenReturn(procExec);
100    when(procExec.getProcedures()).thenReturn(new ArrayList<>());
101
102    cleaner = new ReplicationLogCleaner();
103    cleaner.setConf(CONF);
104    Map<String, Object> params = ImmutableMap.of(HMaster.MASTER, services);
105    cleaner.init(params);
106  }
107
108  @AfterEach
109  public void tearDown() {
110    cleaner.postClean();
111  }
112
113  private static Iterable<FileStatus> runCleaner(ReplicationLogCleaner cleaner,
114    Iterable<FileStatus> files) {
115    cleaner.preClean();
116    return cleaner.getDeletableFiles(files);
117  }
118
119  private static FileStatus createFileStatus(Path path) {
120    return new FileStatus(100, false, 3, 256, EnvironmentEdgeManager.currentTime(), path);
121  }
122
123  private static FileStatus createFileStatus(ServerName sn, int number) {
124    Path path = new Path(sn.toString() + "." + number);
125    return createFileStatus(path);
126  }
127
128  private static ReplicationPeerDescription createPeer(String peerId) {
129    return new ReplicationPeerDescription(peerId, true, null, null);
130  }
131
132  private void addServer(ServerName serverName) {
133    services.getServerManager().getOnlineServersList().add(serverName);
134  }
135
136  private void addSCP(ServerName serverName, boolean finished) {
137    ServerCrashProcedure scp = mock(ServerCrashProcedure.class);
138    when(scp.getServerName()).thenReturn(serverName);
139    when(scp.isFinished()).thenReturn(finished);
140    services.getMasterProcedureExecutor().getProcedures().add(scp);
141  }
142
143  private void addPeer(String... peerIds) {
144    services.getReplicationPeerManager().listPeers(null).addAll(
145      Stream.of(peerIds).map(TestReplicationLogCleaner::createPeer).collect(Collectors.toList()));
146  }
147
148  private void addQueueData(ReplicationQueueData... datas) throws ReplicationException {
149    services.getReplicationPeerManager().getQueueStorage().listAllQueues()
150      .addAll(Arrays.asList(datas));
151  }
152
153  @Test
154  public void testNoConf() {
155    ReplicationLogCleaner cleaner = new ReplicationLogCleaner();
156    List<FileStatus> files = Arrays.asList(new FileStatus());
157    assertSame(files, runCleaner(cleaner, files));
158    cleaner.postClean();
159  }
160
161  @Test
162  public void testCanNotFilter() {
163    assertTrue(services.getReplicationLogCleanerBarrier().disable());
164    List<FileStatus> files = Arrays.asList(new FileStatus());
165    assertSame(Collections.emptyList(), runCleaner(cleaner, files));
166  }
167
168  @Test
169  public void testNoPeer() {
170    Path path = new Path("/wal." + EnvironmentEdgeManager.currentTime());
171    assertTrue(AbstractFSWALProvider.validateWALFilename(path.getName()));
172    FileStatus file = createFileStatus(path);
173    Iterator<FileStatus> iter = runCleaner(cleaner, Arrays.asList(file)).iterator();
174    assertSame(file, iter.next());
175    assertFalse(iter.hasNext());
176  }
177
178  @Test
179  public void testNotValidWalFile() {
180    addPeer("1");
181    Path path = new Path("/whatever");
182    assertFalse(AbstractFSWALProvider.validateWALFilename(path.getName()));
183    FileStatus file = createFileStatus(path);
184    Iterator<FileStatus> iter = runCleaner(cleaner, Arrays.asList(file)).iterator();
185    assertSame(file, iter.next());
186    assertFalse(iter.hasNext());
187  }
188
189  @Test
190  public void testMetaWalFile() {
191    addPeer("1");
192    Path path = new Path(
193      "/wal." + EnvironmentEdgeManager.currentTime() + AbstractFSWALProvider.META_WAL_PROVIDER_ID);
194    assertTrue(AbstractFSWALProvider.validateWALFilename(path.getName()));
195    assertTrue(AbstractFSWALProvider.isMetaFile(path));
196    FileStatus file = createFileStatus(path);
197    Iterator<FileStatus> iter = runCleaner(cleaner, Arrays.asList(file)).iterator();
198    assertSame(file, iter.next());
199    assertFalse(iter.hasNext());
200  }
201
202  @Test
203  public void testLiveRegionServerNoQueues() {
204    addPeer("1");
205    ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime());
206    addServer(sn);
207    List<FileStatus> files = Arrays.asList(createFileStatus(sn, 1));
208    assertThat(runCleaner(cleaner, files), emptyIterable());
209  }
210
211  @Test
212  public void testLiveRegionServerWithSCPNoQueues() {
213    addPeer("1");
214    ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime());
215    addSCP(sn, false);
216    List<FileStatus> files = Arrays.asList(createFileStatus(sn, 1));
217    assertThat(runCleaner(cleaner, files), emptyIterable());
218  }
219
220  @Test
221  public void testDeadRegionServerNoQueues() {
222    addPeer("1");
223    ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime());
224    FileStatus file = createFileStatus(sn, 1);
225    Iterator<FileStatus> iter = runCleaner(cleaner, Arrays.asList(file)).iterator();
226    assertSame(file, iter.next());
227    assertFalse(iter.hasNext());
228  }
229
230  @Test
231  public void testDeadRegionServerWithSCPNoQueues() {
232    addPeer("1");
233    ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime());
234    addSCP(sn, true);
235    FileStatus file = createFileStatus(sn, 1);
236    Iterator<FileStatus> iter = runCleaner(cleaner, Arrays.asList(file)).iterator();
237    assertSame(file, iter.next());
238    assertFalse(iter.hasNext());
239  }
240
241  @Test
242  public void testLiveRegionServerMissingQueue() throws ReplicationException {
243    String peerId1 = "1";
244    String peerId2 = "2";
245    addPeer(peerId1, peerId2);
246    ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime());
247    addServer(sn);
248    FileStatus file = createFileStatus(sn, 1);
249    ReplicationQueueData data1 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId1),
250      ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1)));
251    addQueueData(data1);
252    assertThat(runCleaner(cleaner, Arrays.asList(file)), emptyIterable());
253  }
254
255  @Test
256  public void testLiveRegionServerShouldNotDelete() throws ReplicationException {
257    String peerId = "1";
258    addPeer(peerId);
259    ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime());
260    addServer(sn);
261    FileStatus file = createFileStatus(sn, 1);
262    ReplicationQueueData data = new ReplicationQueueData(new ReplicationQueueId(sn, peerId),
263      ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), 0)));
264    addQueueData(data);
265    assertThat(runCleaner(cleaner, Arrays.asList(file)), emptyIterable());
266  }
267
268  @Test
269  public void testLiveRegionServerShouldNotDeleteTwoPeers() throws ReplicationException {
270    String peerId1 = "1";
271    String peerId2 = "2";
272    addPeer(peerId1, peerId2);
273    ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime());
274    addServer(sn);
275    FileStatus file = createFileStatus(sn, 1);
276    ReplicationQueueData data1 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId1),
277      ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1)));
278    ReplicationQueueData data2 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId2),
279      ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), 0)));
280    addQueueData(data1, data2);
281    assertThat(runCleaner(cleaner, Arrays.asList(file)), emptyIterable());
282  }
283
284  @Test
285  public void testLiveRegionServerShouldDelete() throws ReplicationException {
286    String peerId = "1";
287    addPeer(peerId);
288    ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime());
289    addServer(sn);
290    FileStatus file = createFileStatus(sn, 1);
291    ReplicationQueueData data = new ReplicationQueueData(new ReplicationQueueId(sn, peerId),
292      ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1)));
293    services.getReplicationPeerManager().getQueueStorage().listAllQueues().add(data);
294    Iterator<FileStatus> iter = runCleaner(cleaner, Arrays.asList(file)).iterator();
295    assertSame(file, iter.next());
296    assertFalse(iter.hasNext());
297  }
298
299  @Test
300  public void testLiveRegionServerShouldDeleteTwoPeers() throws ReplicationException {
301    String peerId1 = "1";
302    String peerId2 = "2";
303    addPeer(peerId1, peerId2);
304    ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime());
305    addServer(sn);
306    FileStatus file = createFileStatus(sn, 1);
307    ReplicationQueueData data1 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId1),
308      ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1)));
309    ReplicationQueueData data2 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId2),
310      ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1)));
311    addQueueData(data1, data2);
312    Iterator<FileStatus> iter = runCleaner(cleaner, Arrays.asList(file)).iterator();
313    assertSame(file, iter.next());
314    assertFalse(iter.hasNext());
315  }
316
317  @Test
318  public void testDeadRegionServerMissingQueue() throws ReplicationException {
319    String peerId1 = "1";
320    String peerId2 = "2";
321    addPeer(peerId1, peerId2);
322    ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime());
323    FileStatus file = createFileStatus(sn, 1);
324    ReplicationQueueData data1 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId1),
325      ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1)));
326    addQueueData(data1);
327    Iterator<FileStatus> iter = runCleaner(cleaner, Arrays.asList(file)).iterator();
328    assertSame(file, iter.next());
329    assertFalse(iter.hasNext());
330  }
331
332  @Test
333  public void testDeadRegionServerShouldNotDelete() throws ReplicationException {
334    String peerId = "1";
335    addPeer(peerId);
336    ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime());
337    FileStatus file = createFileStatus(sn, 1);
338    ReplicationQueueData data = new ReplicationQueueData(new ReplicationQueueId(sn, peerId),
339      ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), 0)));
340    addQueueData(data);
341    assertThat(runCleaner(cleaner, Arrays.asList(file)), emptyIterable());
342  }
343
344  @Test
345  public void testDeadRegionServerShouldNotDeleteTwoPeers() throws ReplicationException {
346    String peerId1 = "1";
347    String peerId2 = "2";
348    addPeer(peerId1, peerId2);
349    ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime());
350    FileStatus file = createFileStatus(sn, 1);
351    ReplicationQueueData data1 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId1),
352      ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1)));
353    ReplicationQueueData data2 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId2),
354      ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), 0)));
355    addQueueData(data1, data2);
356    assertThat(runCleaner(cleaner, Arrays.asList(file)), emptyIterable());
357  }
358
359  @Test
360  public void testDeadRegionServerShouldDelete() throws ReplicationException {
361    String peerId = "1";
362    addPeer(peerId);
363    ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime());
364    FileStatus file = createFileStatus(sn, 1);
365    ReplicationQueueData data = new ReplicationQueueData(new ReplicationQueueId(sn, peerId),
366      ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1)));
367    services.getReplicationPeerManager().getQueueStorage().listAllQueues().add(data);
368    Iterator<FileStatus> iter = runCleaner(cleaner, Arrays.asList(file)).iterator();
369    assertSame(file, iter.next());
370    assertFalse(iter.hasNext());
371  }
372
373  @Test
374  public void testDeadRegionServerShouldDeleteTwoPeers() throws ReplicationException {
375    String peerId1 = "1";
376    String peerId2 = "2";
377    addPeer(peerId1, peerId2);
378    ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime());
379    FileStatus file = createFileStatus(sn, 1);
380    ReplicationQueueData data1 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId1),
381      ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1)));
382    ReplicationQueueData data2 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId2),
383      ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1)));
384    addQueueData(data1, data2);
385    Iterator<FileStatus> iter = runCleaner(cleaner, Arrays.asList(file)).iterator();
386    assertSame(file, iter.next());
387    assertFalse(iter.hasNext());
388  }
389
390  @Test
391  public void testPreCleanWhenAsyncClusterConnectionClosed() throws ReplicationException {
392    assertFalse(services.getAsyncClusterConnection().isClosed());
393    verify(services.getAsyncClusterConnection(), Mockito.times(1)).isClosed();
394    cleaner.preClean();
395    verify(services.getAsyncClusterConnection(), Mockito.times(2)).isClosed();
396    verify(rpm.getQueueStorage(), Mockito.times(1)).hasData();
397
398    when(services.getAsyncClusterConnection().isClosed()).thenReturn(true);
399    assertTrue(services.getAsyncClusterConnection().isClosed());
400    verify(services.getAsyncClusterConnection(), Mockito.times(3)).isClosed();
401    cleaner.preClean();
402    verify(services.getAsyncClusterConnection(), Mockito.times(4)).isClosed();
403    // rpm.getQueueStorage().hasData() was not executed, indicating an early return.
404    verify(rpm.getQueueStorage(), Mockito.times(1)).hasData();
405  }
406
407  @Test
408  public void testGetDeletableFilesWhenAsyncClusterConnectionClosed() throws ReplicationException {
409    List<FileStatus> files = List.of(new FileStatus());
410    assertFalse(services.getAsyncClusterConnection().isClosed());
411    verify(services.getAsyncClusterConnection(), Mockito.times(1)).isClosed();
412    cleaner.getDeletableFiles(files);
413    verify(services.getAsyncClusterConnection(), Mockito.times(2)).isClosed();
414    verify(rpm.getQueueStorage(), Mockito.times(1)).hasData();
415
416    when(services.getAsyncClusterConnection().isClosed()).thenReturn(true);
417    assertTrue(services.getAsyncClusterConnection().isClosed());
418    verify(services.getAsyncClusterConnection(), Mockito.times(3)).isClosed();
419    cleaner.getDeletableFiles(files);
420    verify(services.getAsyncClusterConnection(), Mockito.times(4)).isClosed();
421    // rpm.getQueueStorage().hasData() was not executed, indicating an early return.
422    verify(rpm.getQueueStorage(), Mockito.times(1)).hasData();
423  }
424}