001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.master; 019 020import static org.hamcrest.MatcherAssert.assertThat; 021import static org.hamcrest.Matchers.emptyIterable; 022import static org.junit.jupiter.api.Assertions.assertFalse; 023import static org.junit.jupiter.api.Assertions.assertSame; 024import static org.junit.jupiter.api.Assertions.assertTrue; 025import static org.mockito.Mockito.mock; 026import static org.mockito.Mockito.verify; 027import static org.mockito.Mockito.when; 028 029import java.util.ArrayList; 030import java.util.Arrays; 031import java.util.Collections; 032import java.util.Iterator; 033import java.util.List; 034import java.util.Map; 035import java.util.stream.Collectors; 036import java.util.stream.Stream; 037import org.apache.hadoop.conf.Configuration; 038import org.apache.hadoop.fs.FileStatus; 039import org.apache.hadoop.fs.Path; 040import org.apache.hadoop.hbase.HBaseConfiguration; 041import org.apache.hadoop.hbase.ServerName; 042import org.apache.hadoop.hbase.client.AsyncClusterConnection; 043import org.apache.hadoop.hbase.master.HMaster; 044import org.apache.hadoop.hbase.master.MasterServices; 045import org.apache.hadoop.hbase.master.ServerManager; 046import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; 047import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure; 048import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager; 049import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; 050import org.apache.hadoop.hbase.replication.ReplicationException; 051import org.apache.hadoop.hbase.replication.ReplicationGroupOffset; 052import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; 053import org.apache.hadoop.hbase.replication.ReplicationQueueData; 054import org.apache.hadoop.hbase.replication.ReplicationQueueId; 055import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 056import org.apache.hadoop.hbase.testclassification.MasterTests; 057import org.apache.hadoop.hbase.testclassification.SmallTests; 058import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 059import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 060import org.junit.jupiter.api.AfterEach; 061import org.junit.jupiter.api.BeforeEach; 062import org.junit.jupiter.api.Tag; 063import org.junit.jupiter.api.Test; 064import org.mockito.Mockito; 065 066import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 067 068@Tag(MasterTests.TAG) 069@Tag(SmallTests.TAG) 070public class TestReplicationLogCleaner { 071 072 private static final Configuration CONF = HBaseConfiguration.create(); 073 074 private MasterServices services; 075 076 private ReplicationLogCleaner cleaner; 077 078 private ReplicationPeerManager rpm; 079 080 @BeforeEach 081 public void setUp() throws ReplicationException { 082 services = mock(MasterServices.class); 083 when(services.getReplicationLogCleanerBarrier()).thenReturn(new ReplicationLogCleanerBarrier()); 084 AsyncClusterConnection asyncClusterConnection = mock(AsyncClusterConnection.class); 085 when(services.getAsyncClusterConnection()).thenReturn(asyncClusterConnection); 086 when(asyncClusterConnection.isClosed()).thenReturn(false); 087 rpm = mock(ReplicationPeerManager.class); 088 when(services.getReplicationPeerManager()).thenReturn(rpm); 089 when(rpm.listPeers(null)).thenReturn(new ArrayList<>()); 090 ReplicationQueueStorage rqs = mock(ReplicationQueueStorage.class); 091 when(rpm.getQueueStorage()).thenReturn(rqs); 092 when(rqs.hasData()).thenReturn(true); 093 when(rqs.listAllQueues()).thenReturn(new ArrayList<>()); 094 ServerManager sm = mock(ServerManager.class); 095 when(services.getServerManager()).thenReturn(sm); 096 when(sm.getOnlineServersList()).thenReturn(new ArrayList<>()); 097 @SuppressWarnings("unchecked") 098 ProcedureExecutor<MasterProcedureEnv> procExec = mock(ProcedureExecutor.class); 099 when(services.getMasterProcedureExecutor()).thenReturn(procExec); 100 when(procExec.getProcedures()).thenReturn(new ArrayList<>()); 101 102 cleaner = new ReplicationLogCleaner(); 103 cleaner.setConf(CONF); 104 Map<String, Object> params = ImmutableMap.of(HMaster.MASTER, services); 105 cleaner.init(params); 106 } 107 108 @AfterEach 109 public void tearDown() { 110 cleaner.postClean(); 111 } 112 113 private static Iterable<FileStatus> runCleaner(ReplicationLogCleaner cleaner, 114 Iterable<FileStatus> files) { 115 cleaner.preClean(); 116 return cleaner.getDeletableFiles(files); 117 } 118 119 private static FileStatus createFileStatus(Path path) { 120 return new FileStatus(100, false, 3, 256, EnvironmentEdgeManager.currentTime(), path); 121 } 122 123 private static FileStatus createFileStatus(ServerName sn, int number) { 124 Path path = new Path(sn.toString() + "." + number); 125 return createFileStatus(path); 126 } 127 128 private static ReplicationPeerDescription createPeer(String peerId) { 129 return new ReplicationPeerDescription(peerId, true, null, null); 130 } 131 132 private void addServer(ServerName serverName) { 133 services.getServerManager().getOnlineServersList().add(serverName); 134 } 135 136 private void addSCP(ServerName serverName, boolean finished) { 137 ServerCrashProcedure scp = mock(ServerCrashProcedure.class); 138 when(scp.getServerName()).thenReturn(serverName); 139 when(scp.isFinished()).thenReturn(finished); 140 services.getMasterProcedureExecutor().getProcedures().add(scp); 141 } 142 143 private void addPeer(String... peerIds) { 144 services.getReplicationPeerManager().listPeers(null).addAll( 145 Stream.of(peerIds).map(TestReplicationLogCleaner::createPeer).collect(Collectors.toList())); 146 } 147 148 private void addQueueData(ReplicationQueueData... datas) throws ReplicationException { 149 services.getReplicationPeerManager().getQueueStorage().listAllQueues() 150 .addAll(Arrays.asList(datas)); 151 } 152 153 @Test 154 public void testNoConf() { 155 ReplicationLogCleaner cleaner = new ReplicationLogCleaner(); 156 List<FileStatus> files = Arrays.asList(new FileStatus()); 157 assertSame(files, runCleaner(cleaner, files)); 158 cleaner.postClean(); 159 } 160 161 @Test 162 public void testCanNotFilter() { 163 assertTrue(services.getReplicationLogCleanerBarrier().disable()); 164 List<FileStatus> files = Arrays.asList(new FileStatus()); 165 assertSame(Collections.emptyList(), runCleaner(cleaner, files)); 166 } 167 168 @Test 169 public void testNoPeer() { 170 Path path = new Path("/wal." + EnvironmentEdgeManager.currentTime()); 171 assertTrue(AbstractFSWALProvider.validateWALFilename(path.getName())); 172 FileStatus file = createFileStatus(path); 173 Iterator<FileStatus> iter = runCleaner(cleaner, Arrays.asList(file)).iterator(); 174 assertSame(file, iter.next()); 175 assertFalse(iter.hasNext()); 176 } 177 178 @Test 179 public void testNotValidWalFile() { 180 addPeer("1"); 181 Path path = new Path("/whatever"); 182 assertFalse(AbstractFSWALProvider.validateWALFilename(path.getName())); 183 FileStatus file = createFileStatus(path); 184 Iterator<FileStatus> iter = runCleaner(cleaner, Arrays.asList(file)).iterator(); 185 assertSame(file, iter.next()); 186 assertFalse(iter.hasNext()); 187 } 188 189 @Test 190 public void testMetaWalFile() { 191 addPeer("1"); 192 Path path = new Path( 193 "/wal." + EnvironmentEdgeManager.currentTime() + AbstractFSWALProvider.META_WAL_PROVIDER_ID); 194 assertTrue(AbstractFSWALProvider.validateWALFilename(path.getName())); 195 assertTrue(AbstractFSWALProvider.isMetaFile(path)); 196 FileStatus file = createFileStatus(path); 197 Iterator<FileStatus> iter = runCleaner(cleaner, Arrays.asList(file)).iterator(); 198 assertSame(file, iter.next()); 199 assertFalse(iter.hasNext()); 200 } 201 202 @Test 203 public void testLiveRegionServerNoQueues() { 204 addPeer("1"); 205 ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime()); 206 addServer(sn); 207 List<FileStatus> files = Arrays.asList(createFileStatus(sn, 1)); 208 assertThat(runCleaner(cleaner, files), emptyIterable()); 209 } 210 211 @Test 212 public void testLiveRegionServerWithSCPNoQueues() { 213 addPeer("1"); 214 ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime()); 215 addSCP(sn, false); 216 List<FileStatus> files = Arrays.asList(createFileStatus(sn, 1)); 217 assertThat(runCleaner(cleaner, files), emptyIterable()); 218 } 219 220 @Test 221 public void testDeadRegionServerNoQueues() { 222 addPeer("1"); 223 ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime()); 224 FileStatus file = createFileStatus(sn, 1); 225 Iterator<FileStatus> iter = runCleaner(cleaner, Arrays.asList(file)).iterator(); 226 assertSame(file, iter.next()); 227 assertFalse(iter.hasNext()); 228 } 229 230 @Test 231 public void testDeadRegionServerWithSCPNoQueues() { 232 addPeer("1"); 233 ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime()); 234 addSCP(sn, true); 235 FileStatus file = createFileStatus(sn, 1); 236 Iterator<FileStatus> iter = runCleaner(cleaner, Arrays.asList(file)).iterator(); 237 assertSame(file, iter.next()); 238 assertFalse(iter.hasNext()); 239 } 240 241 @Test 242 public void testLiveRegionServerMissingQueue() throws ReplicationException { 243 String peerId1 = "1"; 244 String peerId2 = "2"; 245 addPeer(peerId1, peerId2); 246 ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime()); 247 addServer(sn); 248 FileStatus file = createFileStatus(sn, 1); 249 ReplicationQueueData data1 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId1), 250 ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1))); 251 addQueueData(data1); 252 assertThat(runCleaner(cleaner, Arrays.asList(file)), emptyIterable()); 253 } 254 255 @Test 256 public void testLiveRegionServerShouldNotDelete() throws ReplicationException { 257 String peerId = "1"; 258 addPeer(peerId); 259 ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime()); 260 addServer(sn); 261 FileStatus file = createFileStatus(sn, 1); 262 ReplicationQueueData data = new ReplicationQueueData(new ReplicationQueueId(sn, peerId), 263 ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), 0))); 264 addQueueData(data); 265 assertThat(runCleaner(cleaner, Arrays.asList(file)), emptyIterable()); 266 } 267 268 @Test 269 public void testLiveRegionServerShouldNotDeleteTwoPeers() throws ReplicationException { 270 String peerId1 = "1"; 271 String peerId2 = "2"; 272 addPeer(peerId1, peerId2); 273 ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime()); 274 addServer(sn); 275 FileStatus file = createFileStatus(sn, 1); 276 ReplicationQueueData data1 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId1), 277 ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1))); 278 ReplicationQueueData data2 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId2), 279 ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), 0))); 280 addQueueData(data1, data2); 281 assertThat(runCleaner(cleaner, Arrays.asList(file)), emptyIterable()); 282 } 283 284 @Test 285 public void testLiveRegionServerShouldDelete() throws ReplicationException { 286 String peerId = "1"; 287 addPeer(peerId); 288 ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime()); 289 addServer(sn); 290 FileStatus file = createFileStatus(sn, 1); 291 ReplicationQueueData data = new ReplicationQueueData(new ReplicationQueueId(sn, peerId), 292 ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1))); 293 services.getReplicationPeerManager().getQueueStorage().listAllQueues().add(data); 294 Iterator<FileStatus> iter = runCleaner(cleaner, Arrays.asList(file)).iterator(); 295 assertSame(file, iter.next()); 296 assertFalse(iter.hasNext()); 297 } 298 299 @Test 300 public void testLiveRegionServerShouldDeleteTwoPeers() throws ReplicationException { 301 String peerId1 = "1"; 302 String peerId2 = "2"; 303 addPeer(peerId1, peerId2); 304 ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime()); 305 addServer(sn); 306 FileStatus file = createFileStatus(sn, 1); 307 ReplicationQueueData data1 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId1), 308 ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1))); 309 ReplicationQueueData data2 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId2), 310 ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1))); 311 addQueueData(data1, data2); 312 Iterator<FileStatus> iter = runCleaner(cleaner, Arrays.asList(file)).iterator(); 313 assertSame(file, iter.next()); 314 assertFalse(iter.hasNext()); 315 } 316 317 @Test 318 public void testDeadRegionServerMissingQueue() throws ReplicationException { 319 String peerId1 = "1"; 320 String peerId2 = "2"; 321 addPeer(peerId1, peerId2); 322 ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime()); 323 FileStatus file = createFileStatus(sn, 1); 324 ReplicationQueueData data1 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId1), 325 ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1))); 326 addQueueData(data1); 327 Iterator<FileStatus> iter = runCleaner(cleaner, Arrays.asList(file)).iterator(); 328 assertSame(file, iter.next()); 329 assertFalse(iter.hasNext()); 330 } 331 332 @Test 333 public void testDeadRegionServerShouldNotDelete() throws ReplicationException { 334 String peerId = "1"; 335 addPeer(peerId); 336 ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime()); 337 FileStatus file = createFileStatus(sn, 1); 338 ReplicationQueueData data = new ReplicationQueueData(new ReplicationQueueId(sn, peerId), 339 ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), 0))); 340 addQueueData(data); 341 assertThat(runCleaner(cleaner, Arrays.asList(file)), emptyIterable()); 342 } 343 344 @Test 345 public void testDeadRegionServerShouldNotDeleteTwoPeers() throws ReplicationException { 346 String peerId1 = "1"; 347 String peerId2 = "2"; 348 addPeer(peerId1, peerId2); 349 ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime()); 350 FileStatus file = createFileStatus(sn, 1); 351 ReplicationQueueData data1 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId1), 352 ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1))); 353 ReplicationQueueData data2 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId2), 354 ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), 0))); 355 addQueueData(data1, data2); 356 assertThat(runCleaner(cleaner, Arrays.asList(file)), emptyIterable()); 357 } 358 359 @Test 360 public void testDeadRegionServerShouldDelete() throws ReplicationException { 361 String peerId = "1"; 362 addPeer(peerId); 363 ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime()); 364 FileStatus file = createFileStatus(sn, 1); 365 ReplicationQueueData data = new ReplicationQueueData(new ReplicationQueueId(sn, peerId), 366 ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1))); 367 services.getReplicationPeerManager().getQueueStorage().listAllQueues().add(data); 368 Iterator<FileStatus> iter = runCleaner(cleaner, Arrays.asList(file)).iterator(); 369 assertSame(file, iter.next()); 370 assertFalse(iter.hasNext()); 371 } 372 373 @Test 374 public void testDeadRegionServerShouldDeleteTwoPeers() throws ReplicationException { 375 String peerId1 = "1"; 376 String peerId2 = "2"; 377 addPeer(peerId1, peerId2); 378 ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime()); 379 FileStatus file = createFileStatus(sn, 1); 380 ReplicationQueueData data1 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId1), 381 ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1))); 382 ReplicationQueueData data2 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId2), 383 ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1))); 384 addQueueData(data1, data2); 385 Iterator<FileStatus> iter = runCleaner(cleaner, Arrays.asList(file)).iterator(); 386 assertSame(file, iter.next()); 387 assertFalse(iter.hasNext()); 388 } 389 390 @Test 391 public void testPreCleanWhenAsyncClusterConnectionClosed() throws ReplicationException { 392 assertFalse(services.getAsyncClusterConnection().isClosed()); 393 verify(services.getAsyncClusterConnection(), Mockito.times(1)).isClosed(); 394 cleaner.preClean(); 395 verify(services.getAsyncClusterConnection(), Mockito.times(2)).isClosed(); 396 verify(rpm.getQueueStorage(), Mockito.times(1)).hasData(); 397 398 when(services.getAsyncClusterConnection().isClosed()).thenReturn(true); 399 assertTrue(services.getAsyncClusterConnection().isClosed()); 400 verify(services.getAsyncClusterConnection(), Mockito.times(3)).isClosed(); 401 cleaner.preClean(); 402 verify(services.getAsyncClusterConnection(), Mockito.times(4)).isClosed(); 403 // rpm.getQueueStorage().hasData() was not executed, indicating an early return. 404 verify(rpm.getQueueStorage(), Mockito.times(1)).hasData(); 405 } 406 407 @Test 408 public void testGetDeletableFilesWhenAsyncClusterConnectionClosed() throws ReplicationException { 409 List<FileStatus> files = List.of(new FileStatus()); 410 assertFalse(services.getAsyncClusterConnection().isClosed()); 411 verify(services.getAsyncClusterConnection(), Mockito.times(1)).isClosed(); 412 cleaner.getDeletableFiles(files); 413 verify(services.getAsyncClusterConnection(), Mockito.times(2)).isClosed(); 414 verify(rpm.getQueueStorage(), Mockito.times(1)).hasData(); 415 416 when(services.getAsyncClusterConnection().isClosed()).thenReturn(true); 417 assertTrue(services.getAsyncClusterConnection().isClosed()); 418 verify(services.getAsyncClusterConnection(), Mockito.times(3)).isClosed(); 419 cleaner.getDeletableFiles(files); 420 verify(services.getAsyncClusterConnection(), Mockito.times(4)).isClosed(); 421 // rpm.getQueueStorage().hasData() was not executed, indicating an early return. 422 verify(rpm.getQueueStorage(), Mockito.times(1)).hasData(); 423 } 424}