001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.master;
019
020import static org.hamcrest.MatcherAssert.assertThat;
021import static org.hamcrest.Matchers.emptyIterable;
022import static org.junit.Assert.assertFalse;
023import static org.junit.Assert.assertSame;
024import static org.junit.Assert.assertTrue;
025import static org.mockito.Mockito.mock;
026import static org.mockito.Mockito.when;
027
028import java.util.ArrayList;
029import java.util.Arrays;
030import java.util.Collections;
031import java.util.Iterator;
032import java.util.List;
033import java.util.Map;
034import java.util.stream.Collectors;
035import java.util.stream.Stream;
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.fs.FileStatus;
038import org.apache.hadoop.fs.Path;
039import org.apache.hadoop.hbase.HBaseClassTestRule;
040import org.apache.hadoop.hbase.HBaseConfiguration;
041import org.apache.hadoop.hbase.ServerName;
042import org.apache.hadoop.hbase.master.HMaster;
043import org.apache.hadoop.hbase.master.MasterServices;
044import org.apache.hadoop.hbase.master.ServerManager;
045import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
046import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
047import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
048import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
049import org.apache.hadoop.hbase.replication.ReplicationException;
050import org.apache.hadoop.hbase.replication.ReplicationGroupOffset;
051import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
052import org.apache.hadoop.hbase.replication.ReplicationQueueData;
053import org.apache.hadoop.hbase.replication.ReplicationQueueId;
054import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
055import org.apache.hadoop.hbase.testclassification.MasterTests;
056import org.apache.hadoop.hbase.testclassification.SmallTests;
057import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
058import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
059import org.junit.After;
060import org.junit.Before;
061import org.junit.ClassRule;
062import org.junit.Test;
063import org.junit.experimental.categories.Category;
064
065import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
066
067@Category({ MasterTests.class, SmallTests.class })
068public class TestReplicationLogCleaner {
069
070  @ClassRule
071  public static final HBaseClassTestRule CLASS_RULE =
072    HBaseClassTestRule.forClass(TestReplicationLogCleaner.class);
073
074  private static final Configuration CONF = HBaseConfiguration.create();
075
076  private MasterServices services;
077
078  private ReplicationLogCleaner cleaner;
079
080  @Before
081  public void setUp() throws ReplicationException {
082    services = mock(MasterServices.class);
083    when(services.getReplicationLogCleanerBarrier()).thenReturn(new ReplicationLogCleanerBarrier());
084    ReplicationPeerManager rpm = mock(ReplicationPeerManager.class);
085    when(services.getReplicationPeerManager()).thenReturn(rpm);
086    when(rpm.listPeers(null)).thenReturn(new ArrayList<>());
087    ReplicationQueueStorage rqs = mock(ReplicationQueueStorage.class);
088    when(rpm.getQueueStorage()).thenReturn(rqs);
089    when(rpm.getQueueStorage().hasData()).thenReturn(true);
090    when(rqs.listAllQueues()).thenReturn(new ArrayList<>());
091    ServerManager sm = mock(ServerManager.class);
092    when(services.getServerManager()).thenReturn(sm);
093    when(sm.getOnlineServersList()).thenReturn(new ArrayList<>());
094    @SuppressWarnings("unchecked")
095    ProcedureExecutor<MasterProcedureEnv> procExec = mock(ProcedureExecutor.class);
096    when(services.getMasterProcedureExecutor()).thenReturn(procExec);
097    when(procExec.getProcedures()).thenReturn(new ArrayList<>());
098
099    cleaner = new ReplicationLogCleaner();
100    cleaner.setConf(CONF);
101    Map<String, Object> params = ImmutableMap.of(HMaster.MASTER, services);
102    cleaner.init(params);
103  }
104
105  @After
106  public void tearDown() {
107    cleaner.postClean();
108  }
109
110  private static Iterable<FileStatus> runCleaner(ReplicationLogCleaner cleaner,
111    Iterable<FileStatus> files) {
112    cleaner.preClean();
113    return cleaner.getDeletableFiles(files);
114  }
115
116  private static FileStatus createFileStatus(Path path) {
117    return new FileStatus(100, false, 3, 256, EnvironmentEdgeManager.currentTime(), path);
118  }
119
120  private static FileStatus createFileStatus(ServerName sn, int number) {
121    Path path = new Path(sn.toString() + "." + number);
122    return createFileStatus(path);
123  }
124
125  private static ReplicationPeerDescription createPeer(String peerId) {
126    return new ReplicationPeerDescription(peerId, true, null, null);
127  }
128
129  private void addServer(ServerName serverName) {
130    services.getServerManager().getOnlineServersList().add(serverName);
131  }
132
133  private void addSCP(ServerName serverName, boolean finished) {
134    ServerCrashProcedure scp = mock(ServerCrashProcedure.class);
135    when(scp.getServerName()).thenReturn(serverName);
136    when(scp.isFinished()).thenReturn(finished);
137    services.getMasterProcedureExecutor().getProcedures().add(scp);
138  }
139
140  private void addPeer(String... peerIds) {
141    services.getReplicationPeerManager().listPeers(null).addAll(
142      Stream.of(peerIds).map(TestReplicationLogCleaner::createPeer).collect(Collectors.toList()));
143  }
144
145  private void addQueueData(ReplicationQueueData... datas) throws ReplicationException {
146    services.getReplicationPeerManager().getQueueStorage().listAllQueues()
147      .addAll(Arrays.asList(datas));
148  }
149
150  @Test
151  public void testNoConf() {
152    ReplicationLogCleaner cleaner = new ReplicationLogCleaner();
153    List<FileStatus> files = Arrays.asList(new FileStatus());
154    assertSame(files, runCleaner(cleaner, files));
155    cleaner.postClean();
156  }
157
158  @Test
159  public void testCanNotFilter() {
160    assertTrue(services.getReplicationLogCleanerBarrier().disable());
161    List<FileStatus> files = Arrays.asList(new FileStatus());
162    assertSame(Collections.emptyList(), runCleaner(cleaner, files));
163  }
164
165  @Test
166  public void testNoPeer() {
167    Path path = new Path("/wal." + EnvironmentEdgeManager.currentTime());
168    assertTrue(AbstractFSWALProvider.validateWALFilename(path.getName()));
169    FileStatus file = createFileStatus(path);
170    Iterator<FileStatus> iter = runCleaner(cleaner, Arrays.asList(file)).iterator();
171    assertSame(file, iter.next());
172    assertFalse(iter.hasNext());
173  }
174
175  @Test
176  public void testNotValidWalFile() {
177    addPeer("1");
178    Path path = new Path("/whatever");
179    assertFalse(AbstractFSWALProvider.validateWALFilename(path.getName()));
180    FileStatus file = createFileStatus(path);
181    Iterator<FileStatus> iter = runCleaner(cleaner, Arrays.asList(file)).iterator();
182    assertSame(file, iter.next());
183    assertFalse(iter.hasNext());
184  }
185
186  @Test
187  public void testMetaWalFile() {
188    addPeer("1");
189    Path path = new Path(
190      "/wal." + EnvironmentEdgeManager.currentTime() + AbstractFSWALProvider.META_WAL_PROVIDER_ID);
191    assertTrue(AbstractFSWALProvider.validateWALFilename(path.getName()));
192    assertTrue(AbstractFSWALProvider.isMetaFile(path));
193    FileStatus file = createFileStatus(path);
194    Iterator<FileStatus> iter = runCleaner(cleaner, Arrays.asList(file)).iterator();
195    assertSame(file, iter.next());
196    assertFalse(iter.hasNext());
197  }
198
199  @Test
200  public void testLiveRegionServerNoQueues() {
201    addPeer("1");
202    ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime());
203    addServer(sn);
204    List<FileStatus> files = Arrays.asList(createFileStatus(sn, 1));
205    assertThat(runCleaner(cleaner, files), emptyIterable());
206  }
207
208  @Test
209  public void testLiveRegionServerWithSCPNoQueues() {
210    addPeer("1");
211    ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime());
212    addSCP(sn, false);
213    List<FileStatus> files = Arrays.asList(createFileStatus(sn, 1));
214    assertThat(runCleaner(cleaner, files), emptyIterable());
215  }
216
217  @Test
218  public void testDeadRegionServerNoQueues() {
219    addPeer("1");
220    ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime());
221    FileStatus file = createFileStatus(sn, 1);
222    Iterator<FileStatus> iter = runCleaner(cleaner, Arrays.asList(file)).iterator();
223    assertSame(file, iter.next());
224    assertFalse(iter.hasNext());
225  }
226
227  @Test
228  public void testDeadRegionServerWithSCPNoQueues() {
229    addPeer("1");
230    ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime());
231    addSCP(sn, true);
232    FileStatus file = createFileStatus(sn, 1);
233    Iterator<FileStatus> iter = runCleaner(cleaner, Arrays.asList(file)).iterator();
234    assertSame(file, iter.next());
235    assertFalse(iter.hasNext());
236  }
237
238  @Test
239  public void testLiveRegionServerMissingQueue() throws ReplicationException {
240    String peerId1 = "1";
241    String peerId2 = "2";
242    addPeer(peerId1, peerId2);
243    ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime());
244    addServer(sn);
245    FileStatus file = createFileStatus(sn, 1);
246    ReplicationQueueData data1 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId1),
247      ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1)));
248    addQueueData(data1);
249    assertThat(runCleaner(cleaner, Arrays.asList(file)), emptyIterable());
250  }
251
252  @Test
253  public void testLiveRegionServerShouldNotDelete() throws ReplicationException {
254    String peerId = "1";
255    addPeer(peerId);
256    ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime());
257    addServer(sn);
258    FileStatus file = createFileStatus(sn, 1);
259    ReplicationQueueData data = new ReplicationQueueData(new ReplicationQueueId(sn, peerId),
260      ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), 0)));
261    addQueueData(data);
262    assertThat(runCleaner(cleaner, Arrays.asList(file)), emptyIterable());
263  }
264
265  @Test
266  public void testLiveRegionServerShouldNotDeleteTwoPeers() throws ReplicationException {
267    String peerId1 = "1";
268    String peerId2 = "2";
269    addPeer(peerId1, peerId2);
270    ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime());
271    addServer(sn);
272    FileStatus file = createFileStatus(sn, 1);
273    ReplicationQueueData data1 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId1),
274      ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1)));
275    ReplicationQueueData data2 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId2),
276      ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), 0)));
277    addQueueData(data1, data2);
278    assertThat(runCleaner(cleaner, Arrays.asList(file)), emptyIterable());
279  }
280
281  @Test
282  public void testLiveRegionServerShouldDelete() throws ReplicationException {
283    String peerId = "1";
284    addPeer(peerId);
285    ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime());
286    addServer(sn);
287    FileStatus file = createFileStatus(sn, 1);
288    ReplicationQueueData data = new ReplicationQueueData(new ReplicationQueueId(sn, peerId),
289      ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1)));
290    services.getReplicationPeerManager().getQueueStorage().listAllQueues().add(data);
291    Iterator<FileStatus> iter = runCleaner(cleaner, Arrays.asList(file)).iterator();
292    assertSame(file, iter.next());
293    assertFalse(iter.hasNext());
294  }
295
296  @Test
297  public void testLiveRegionServerShouldDeleteTwoPeers() throws ReplicationException {
298    String peerId1 = "1";
299    String peerId2 = "2";
300    addPeer(peerId1, peerId2);
301    ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime());
302    addServer(sn);
303    FileStatus file = createFileStatus(sn, 1);
304    ReplicationQueueData data1 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId1),
305      ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1)));
306    ReplicationQueueData data2 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId2),
307      ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1)));
308    addQueueData(data1, data2);
309    Iterator<FileStatus> iter = runCleaner(cleaner, Arrays.asList(file)).iterator();
310    assertSame(file, iter.next());
311    assertFalse(iter.hasNext());
312  }
313
314  @Test
315  public void testDeadRegionServerMissingQueue() throws ReplicationException {
316    String peerId1 = "1";
317    String peerId2 = "2";
318    addPeer(peerId1, peerId2);
319    ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime());
320    FileStatus file = createFileStatus(sn, 1);
321    ReplicationQueueData data1 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId1),
322      ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1)));
323    addQueueData(data1);
324    Iterator<FileStatus> iter = runCleaner(cleaner, Arrays.asList(file)).iterator();
325    assertSame(file, iter.next());
326    assertFalse(iter.hasNext());
327  }
328
329  @Test
330  public void testDeadRegionServerShouldNotDelete() throws ReplicationException {
331    String peerId = "1";
332    addPeer(peerId);
333    ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime());
334    FileStatus file = createFileStatus(sn, 1);
335    ReplicationQueueData data = new ReplicationQueueData(new ReplicationQueueId(sn, peerId),
336      ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), 0)));
337    addQueueData(data);
338    assertThat(runCleaner(cleaner, Arrays.asList(file)), emptyIterable());
339  }
340
341  @Test
342  public void testDeadRegionServerShouldNotDeleteTwoPeers() throws ReplicationException {
343    String peerId1 = "1";
344    String peerId2 = "2";
345    addPeer(peerId1, peerId2);
346    ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime());
347    FileStatus file = createFileStatus(sn, 1);
348    ReplicationQueueData data1 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId1),
349      ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1)));
350    ReplicationQueueData data2 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId2),
351      ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), 0)));
352    addQueueData(data1, data2);
353    assertThat(runCleaner(cleaner, Arrays.asList(file)), emptyIterable());
354  }
355
356  @Test
357  public void testDeadRegionServerShouldDelete() throws ReplicationException {
358    String peerId = "1";
359    addPeer(peerId);
360    ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime());
361    FileStatus file = createFileStatus(sn, 1);
362    ReplicationQueueData data = new ReplicationQueueData(new ReplicationQueueId(sn, peerId),
363      ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1)));
364    services.getReplicationPeerManager().getQueueStorage().listAllQueues().add(data);
365    Iterator<FileStatus> iter = runCleaner(cleaner, Arrays.asList(file)).iterator();
366    assertSame(file, iter.next());
367    assertFalse(iter.hasNext());
368  }
369
370  @Test
371  public void testDeadRegionServerShouldDeleteTwoPeers() throws ReplicationException {
372    String peerId1 = "1";
373    String peerId2 = "2";
374    addPeer(peerId1, peerId2);
375    ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime());
376    FileStatus file = createFileStatus(sn, 1);
377    ReplicationQueueData data1 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId1),
378      ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1)));
379    ReplicationQueueData data2 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId2),
380      ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1)));
381    addQueueData(data1, data2);
382    Iterator<FileStatus> iter = runCleaner(cleaner, Arrays.asList(file)).iterator();
383    assertSame(file, iter.next());
384    assertFalse(iter.hasNext());
385  }
386}