001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.master;
019
020import static org.hamcrest.MatcherAssert.assertThat;
021import static org.hamcrest.Matchers.emptyIterable;
022import static org.junit.Assert.assertFalse;
023import static org.junit.Assert.assertSame;
024import static org.junit.Assert.assertTrue;
025import static org.mockito.Mockito.mock;
026import static org.mockito.Mockito.verify;
027import static org.mockito.Mockito.when;
028
029import java.util.ArrayList;
030import java.util.Arrays;
031import java.util.Collections;
032import java.util.Iterator;
033import java.util.List;
034import java.util.Map;
035import java.util.stream.Collectors;
036import java.util.stream.Stream;
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.fs.FileStatus;
039import org.apache.hadoop.fs.Path;
040import org.apache.hadoop.hbase.HBaseClassTestRule;
041import org.apache.hadoop.hbase.HBaseConfiguration;
042import org.apache.hadoop.hbase.ServerName;
043import org.apache.hadoop.hbase.client.AsyncClusterConnection;
044import org.apache.hadoop.hbase.master.HMaster;
045import org.apache.hadoop.hbase.master.MasterServices;
046import org.apache.hadoop.hbase.master.ServerManager;
047import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
048import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
049import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
050import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
051import org.apache.hadoop.hbase.replication.ReplicationException;
052import org.apache.hadoop.hbase.replication.ReplicationGroupOffset;
053import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
054import org.apache.hadoop.hbase.replication.ReplicationQueueData;
055import org.apache.hadoop.hbase.replication.ReplicationQueueId;
056import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
057import org.apache.hadoop.hbase.testclassification.MasterTests;
058import org.apache.hadoop.hbase.testclassification.SmallTests;
059import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
060import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
061import org.junit.After;
062import org.junit.Before;
063import org.junit.ClassRule;
064import org.junit.Test;
065import org.junit.experimental.categories.Category;
066import org.mockito.Mockito;
067
068import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
069
070@Category({ MasterTests.class, SmallTests.class })
071public class TestReplicationLogCleaner {
072
073  @ClassRule
074  public static final HBaseClassTestRule CLASS_RULE =
075    HBaseClassTestRule.forClass(TestReplicationLogCleaner.class);
076
077  private static final Configuration CONF = HBaseConfiguration.create();
078
079  private MasterServices services;
080
081  private ReplicationLogCleaner cleaner;
082
083  private ReplicationPeerManager rpm;
084
085  @Before
086  public void setUp() throws ReplicationException {
087    services = mock(MasterServices.class);
088    when(services.getReplicationLogCleanerBarrier()).thenReturn(new ReplicationLogCleanerBarrier());
089    AsyncClusterConnection asyncClusterConnection = mock(AsyncClusterConnection.class);
090    when(services.getAsyncClusterConnection()).thenReturn(asyncClusterConnection);
091    when(asyncClusterConnection.isClosed()).thenReturn(false);
092    rpm = mock(ReplicationPeerManager.class);
093    when(services.getReplicationPeerManager()).thenReturn(rpm);
094    when(rpm.listPeers(null)).thenReturn(new ArrayList<>());
095    ReplicationQueueStorage rqs = mock(ReplicationQueueStorage.class);
096    when(rpm.getQueueStorage()).thenReturn(rqs);
097    when(rqs.hasData()).thenReturn(true);
098    when(rqs.listAllQueues()).thenReturn(new ArrayList<>());
099    ServerManager sm = mock(ServerManager.class);
100    when(services.getServerManager()).thenReturn(sm);
101    when(sm.getOnlineServersList()).thenReturn(new ArrayList<>());
102    @SuppressWarnings("unchecked")
103    ProcedureExecutor<MasterProcedureEnv> procExec = mock(ProcedureExecutor.class);
104    when(services.getMasterProcedureExecutor()).thenReturn(procExec);
105    when(procExec.getProcedures()).thenReturn(new ArrayList<>());
106
107    cleaner = new ReplicationLogCleaner();
108    cleaner.setConf(CONF);
109    Map<String, Object> params = ImmutableMap.of(HMaster.MASTER, services);
110    cleaner.init(params);
111  }
112
113  @After
114  public void tearDown() {
115    cleaner.postClean();
116  }
117
118  private static Iterable<FileStatus> runCleaner(ReplicationLogCleaner cleaner,
119    Iterable<FileStatus> files) {
120    cleaner.preClean();
121    return cleaner.getDeletableFiles(files);
122  }
123
124  private static FileStatus createFileStatus(Path path) {
125    return new FileStatus(100, false, 3, 256, EnvironmentEdgeManager.currentTime(), path);
126  }
127
128  private static FileStatus createFileStatus(ServerName sn, int number) {
129    Path path = new Path(sn.toString() + "." + number);
130    return createFileStatus(path);
131  }
132
133  private static ReplicationPeerDescription createPeer(String peerId) {
134    return new ReplicationPeerDescription(peerId, true, null, null);
135  }
136
137  private void addServer(ServerName serverName) {
138    services.getServerManager().getOnlineServersList().add(serverName);
139  }
140
141  private void addSCP(ServerName serverName, boolean finished) {
142    ServerCrashProcedure scp = mock(ServerCrashProcedure.class);
143    when(scp.getServerName()).thenReturn(serverName);
144    when(scp.isFinished()).thenReturn(finished);
145    services.getMasterProcedureExecutor().getProcedures().add(scp);
146  }
147
148  private void addPeer(String... peerIds) {
149    services.getReplicationPeerManager().listPeers(null).addAll(
150      Stream.of(peerIds).map(TestReplicationLogCleaner::createPeer).collect(Collectors.toList()));
151  }
152
153  private void addQueueData(ReplicationQueueData... datas) throws ReplicationException {
154    services.getReplicationPeerManager().getQueueStorage().listAllQueues()
155      .addAll(Arrays.asList(datas));
156  }
157
158  @Test
159  public void testNoConf() {
160    ReplicationLogCleaner cleaner = new ReplicationLogCleaner();
161    List<FileStatus> files = Arrays.asList(new FileStatus());
162    assertSame(files, runCleaner(cleaner, files));
163    cleaner.postClean();
164  }
165
166  @Test
167  public void testCanNotFilter() {
168    assertTrue(services.getReplicationLogCleanerBarrier().disable());
169    List<FileStatus> files = Arrays.asList(new FileStatus());
170    assertSame(Collections.emptyList(), runCleaner(cleaner, files));
171  }
172
173  @Test
174  public void testNoPeer() {
175    Path path = new Path("/wal." + EnvironmentEdgeManager.currentTime());
176    assertTrue(AbstractFSWALProvider.validateWALFilename(path.getName()));
177    FileStatus file = createFileStatus(path);
178    Iterator<FileStatus> iter = runCleaner(cleaner, Arrays.asList(file)).iterator();
179    assertSame(file, iter.next());
180    assertFalse(iter.hasNext());
181  }
182
183  @Test
184  public void testNotValidWalFile() {
185    addPeer("1");
186    Path path = new Path("/whatever");
187    assertFalse(AbstractFSWALProvider.validateWALFilename(path.getName()));
188    FileStatus file = createFileStatus(path);
189    Iterator<FileStatus> iter = runCleaner(cleaner, Arrays.asList(file)).iterator();
190    assertSame(file, iter.next());
191    assertFalse(iter.hasNext());
192  }
193
194  @Test
195  public void testMetaWalFile() {
196    addPeer("1");
197    Path path = new Path(
198      "/wal." + EnvironmentEdgeManager.currentTime() + AbstractFSWALProvider.META_WAL_PROVIDER_ID);
199    assertTrue(AbstractFSWALProvider.validateWALFilename(path.getName()));
200    assertTrue(AbstractFSWALProvider.isMetaFile(path));
201    FileStatus file = createFileStatus(path);
202    Iterator<FileStatus> iter = runCleaner(cleaner, Arrays.asList(file)).iterator();
203    assertSame(file, iter.next());
204    assertFalse(iter.hasNext());
205  }
206
207  @Test
208  public void testLiveRegionServerNoQueues() {
209    addPeer("1");
210    ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime());
211    addServer(sn);
212    List<FileStatus> files = Arrays.asList(createFileStatus(sn, 1));
213    assertThat(runCleaner(cleaner, files), emptyIterable());
214  }
215
216  @Test
217  public void testLiveRegionServerWithSCPNoQueues() {
218    addPeer("1");
219    ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime());
220    addSCP(sn, false);
221    List<FileStatus> files = Arrays.asList(createFileStatus(sn, 1));
222    assertThat(runCleaner(cleaner, files), emptyIterable());
223  }
224
225  @Test
226  public void testDeadRegionServerNoQueues() {
227    addPeer("1");
228    ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime());
229    FileStatus file = createFileStatus(sn, 1);
230    Iterator<FileStatus> iter = runCleaner(cleaner, Arrays.asList(file)).iterator();
231    assertSame(file, iter.next());
232    assertFalse(iter.hasNext());
233  }
234
235  @Test
236  public void testDeadRegionServerWithSCPNoQueues() {
237    addPeer("1");
238    ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime());
239    addSCP(sn, true);
240    FileStatus file = createFileStatus(sn, 1);
241    Iterator<FileStatus> iter = runCleaner(cleaner, Arrays.asList(file)).iterator();
242    assertSame(file, iter.next());
243    assertFalse(iter.hasNext());
244  }
245
246  @Test
247  public void testLiveRegionServerMissingQueue() throws ReplicationException {
248    String peerId1 = "1";
249    String peerId2 = "2";
250    addPeer(peerId1, peerId2);
251    ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime());
252    addServer(sn);
253    FileStatus file = createFileStatus(sn, 1);
254    ReplicationQueueData data1 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId1),
255      ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1)));
256    addQueueData(data1);
257    assertThat(runCleaner(cleaner, Arrays.asList(file)), emptyIterable());
258  }
259
260  @Test
261  public void testLiveRegionServerShouldNotDelete() throws ReplicationException {
262    String peerId = "1";
263    addPeer(peerId);
264    ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime());
265    addServer(sn);
266    FileStatus file = createFileStatus(sn, 1);
267    ReplicationQueueData data = new ReplicationQueueData(new ReplicationQueueId(sn, peerId),
268      ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), 0)));
269    addQueueData(data);
270    assertThat(runCleaner(cleaner, Arrays.asList(file)), emptyIterable());
271  }
272
273  @Test
274  public void testLiveRegionServerShouldNotDeleteTwoPeers() throws ReplicationException {
275    String peerId1 = "1";
276    String peerId2 = "2";
277    addPeer(peerId1, peerId2);
278    ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime());
279    addServer(sn);
280    FileStatus file = createFileStatus(sn, 1);
281    ReplicationQueueData data1 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId1),
282      ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1)));
283    ReplicationQueueData data2 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId2),
284      ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), 0)));
285    addQueueData(data1, data2);
286    assertThat(runCleaner(cleaner, Arrays.asList(file)), emptyIterable());
287  }
288
289  @Test
290  public void testLiveRegionServerShouldDelete() throws ReplicationException {
291    String peerId = "1";
292    addPeer(peerId);
293    ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime());
294    addServer(sn);
295    FileStatus file = createFileStatus(sn, 1);
296    ReplicationQueueData data = new ReplicationQueueData(new ReplicationQueueId(sn, peerId),
297      ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1)));
298    services.getReplicationPeerManager().getQueueStorage().listAllQueues().add(data);
299    Iterator<FileStatus> iter = runCleaner(cleaner, Arrays.asList(file)).iterator();
300    assertSame(file, iter.next());
301    assertFalse(iter.hasNext());
302  }
303
304  @Test
305  public void testLiveRegionServerShouldDeleteTwoPeers() throws ReplicationException {
306    String peerId1 = "1";
307    String peerId2 = "2";
308    addPeer(peerId1, peerId2);
309    ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime());
310    addServer(sn);
311    FileStatus file = createFileStatus(sn, 1);
312    ReplicationQueueData data1 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId1),
313      ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1)));
314    ReplicationQueueData data2 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId2),
315      ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1)));
316    addQueueData(data1, data2);
317    Iterator<FileStatus> iter = runCleaner(cleaner, Arrays.asList(file)).iterator();
318    assertSame(file, iter.next());
319    assertFalse(iter.hasNext());
320  }
321
322  @Test
323  public void testDeadRegionServerMissingQueue() throws ReplicationException {
324    String peerId1 = "1";
325    String peerId2 = "2";
326    addPeer(peerId1, peerId2);
327    ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime());
328    FileStatus file = createFileStatus(sn, 1);
329    ReplicationQueueData data1 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId1),
330      ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1)));
331    addQueueData(data1);
332    Iterator<FileStatus> iter = runCleaner(cleaner, Arrays.asList(file)).iterator();
333    assertSame(file, iter.next());
334    assertFalse(iter.hasNext());
335  }
336
337  @Test
338  public void testDeadRegionServerShouldNotDelete() throws ReplicationException {
339    String peerId = "1";
340    addPeer(peerId);
341    ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime());
342    FileStatus file = createFileStatus(sn, 1);
343    ReplicationQueueData data = new ReplicationQueueData(new ReplicationQueueId(sn, peerId),
344      ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), 0)));
345    addQueueData(data);
346    assertThat(runCleaner(cleaner, Arrays.asList(file)), emptyIterable());
347  }
348
349  @Test
350  public void testDeadRegionServerShouldNotDeleteTwoPeers() throws ReplicationException {
351    String peerId1 = "1";
352    String peerId2 = "2";
353    addPeer(peerId1, peerId2);
354    ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime());
355    FileStatus file = createFileStatus(sn, 1);
356    ReplicationQueueData data1 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId1),
357      ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1)));
358    ReplicationQueueData data2 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId2),
359      ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), 0)));
360    addQueueData(data1, data2);
361    assertThat(runCleaner(cleaner, Arrays.asList(file)), emptyIterable());
362  }
363
364  @Test
365  public void testDeadRegionServerShouldDelete() throws ReplicationException {
366    String peerId = "1";
367    addPeer(peerId);
368    ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime());
369    FileStatus file = createFileStatus(sn, 1);
370    ReplicationQueueData data = new ReplicationQueueData(new ReplicationQueueId(sn, peerId),
371      ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1)));
372    services.getReplicationPeerManager().getQueueStorage().listAllQueues().add(data);
373    Iterator<FileStatus> iter = runCleaner(cleaner, Arrays.asList(file)).iterator();
374    assertSame(file, iter.next());
375    assertFalse(iter.hasNext());
376  }
377
378  @Test
379  public void testDeadRegionServerShouldDeleteTwoPeers() throws ReplicationException {
380    String peerId1 = "1";
381    String peerId2 = "2";
382    addPeer(peerId1, peerId2);
383    ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime());
384    FileStatus file = createFileStatus(sn, 1);
385    ReplicationQueueData data1 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId1),
386      ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1)));
387    ReplicationQueueData data2 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId2),
388      ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1)));
389    addQueueData(data1, data2);
390    Iterator<FileStatus> iter = runCleaner(cleaner, Arrays.asList(file)).iterator();
391    assertSame(file, iter.next());
392    assertFalse(iter.hasNext());
393  }
394
395  @Test
396  public void testPreCleanWhenAsyncClusterConnectionClosed() throws ReplicationException {
397    assertFalse(services.getAsyncClusterConnection().isClosed());
398    verify(services.getAsyncClusterConnection(), Mockito.times(1)).isClosed();
399    cleaner.preClean();
400    verify(services.getAsyncClusterConnection(), Mockito.times(2)).isClosed();
401    verify(rpm.getQueueStorage(), Mockito.times(1)).hasData();
402
403    when(services.getAsyncClusterConnection().isClosed()).thenReturn(true);
404    assertTrue(services.getAsyncClusterConnection().isClosed());
405    verify(services.getAsyncClusterConnection(), Mockito.times(3)).isClosed();
406    cleaner.preClean();
407    verify(services.getAsyncClusterConnection(), Mockito.times(4)).isClosed();
408    // rpm.getQueueStorage().hasData() was not executed, indicating an early return.
409    verify(rpm.getQueueStorage(), Mockito.times(1)).hasData();
410  }
411
412  @Test
413  public void testGetDeletableFilesWhenAsyncClusterConnectionClosed() throws ReplicationException {
414    List<FileStatus> files = List.of(new FileStatus());
415    assertFalse(services.getAsyncClusterConnection().isClosed());
416    verify(services.getAsyncClusterConnection(), Mockito.times(1)).isClosed();
417    cleaner.getDeletableFiles(files);
418    verify(services.getAsyncClusterConnection(), Mockito.times(2)).isClosed();
419    verify(rpm.getQueueStorage(), Mockito.times(1)).hasData();
420
421    when(services.getAsyncClusterConnection().isClosed()).thenReturn(true);
422    assertTrue(services.getAsyncClusterConnection().isClosed());
423    verify(services.getAsyncClusterConnection(), Mockito.times(3)).isClosed();
424    cleaner.getDeletableFiles(files);
425    verify(services.getAsyncClusterConnection(), Mockito.times(4)).isClosed();
426    // rpm.getQueueStorage().hasData() was not executed, indicating an early return.
427    verify(rpm.getQueueStorage(), Mockito.times(1)).hasData();
428  }
429}