001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.master; 019 020import static org.hamcrest.MatcherAssert.assertThat; 021import static org.hamcrest.Matchers.emptyIterable; 022import static org.junit.Assert.assertFalse; 023import static org.junit.Assert.assertSame; 024import static org.junit.Assert.assertTrue; 025import static org.mockito.Mockito.mock; 026import static org.mockito.Mockito.verify; 027import static org.mockito.Mockito.when; 028 029import java.util.ArrayList; 030import java.util.Arrays; 031import java.util.Collections; 032import java.util.Iterator; 033import java.util.List; 034import java.util.Map; 035import java.util.stream.Collectors; 036import java.util.stream.Stream; 037import org.apache.hadoop.conf.Configuration; 038import org.apache.hadoop.fs.FileStatus; 039import org.apache.hadoop.fs.Path; 040import org.apache.hadoop.hbase.HBaseClassTestRule; 041import org.apache.hadoop.hbase.HBaseConfiguration; 042import org.apache.hadoop.hbase.ServerName; 043import org.apache.hadoop.hbase.client.AsyncClusterConnection; 044import org.apache.hadoop.hbase.master.HMaster; 045import org.apache.hadoop.hbase.master.MasterServices; 046import org.apache.hadoop.hbase.master.ServerManager; 047import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; 048import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure; 049import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager; 050import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; 051import org.apache.hadoop.hbase.replication.ReplicationException; 052import org.apache.hadoop.hbase.replication.ReplicationGroupOffset; 053import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; 054import org.apache.hadoop.hbase.replication.ReplicationQueueData; 055import org.apache.hadoop.hbase.replication.ReplicationQueueId; 056import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 057import org.apache.hadoop.hbase.testclassification.MasterTests; 058import org.apache.hadoop.hbase.testclassification.SmallTests; 059import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 060import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 061import org.junit.After; 062import org.junit.Before; 063import org.junit.ClassRule; 064import org.junit.Test; 065import org.junit.experimental.categories.Category; 066import org.mockito.Mockito; 067 068import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 069 070@Category({ MasterTests.class, SmallTests.class }) 071public class TestReplicationLogCleaner { 072 073 @ClassRule 074 public static final HBaseClassTestRule CLASS_RULE = 075 HBaseClassTestRule.forClass(TestReplicationLogCleaner.class); 076 077 private static final Configuration CONF = HBaseConfiguration.create(); 078 079 private MasterServices services; 080 081 private ReplicationLogCleaner cleaner; 082 083 private ReplicationPeerManager rpm; 084 085 @Before 086 public void setUp() throws ReplicationException { 087 services = mock(MasterServices.class); 088 when(services.getReplicationLogCleanerBarrier()).thenReturn(new ReplicationLogCleanerBarrier()); 089 AsyncClusterConnection asyncClusterConnection = mock(AsyncClusterConnection.class); 090 when(services.getAsyncClusterConnection()).thenReturn(asyncClusterConnection); 091 when(asyncClusterConnection.isClosed()).thenReturn(false); 092 rpm = mock(ReplicationPeerManager.class); 093 when(services.getReplicationPeerManager()).thenReturn(rpm); 094 when(rpm.listPeers(null)).thenReturn(new ArrayList<>()); 095 ReplicationQueueStorage rqs = mock(ReplicationQueueStorage.class); 096 when(rpm.getQueueStorage()).thenReturn(rqs); 097 when(rqs.hasData()).thenReturn(true); 098 when(rqs.listAllQueues()).thenReturn(new ArrayList<>()); 099 ServerManager sm = mock(ServerManager.class); 100 when(services.getServerManager()).thenReturn(sm); 101 when(sm.getOnlineServersList()).thenReturn(new ArrayList<>()); 102 @SuppressWarnings("unchecked") 103 ProcedureExecutor<MasterProcedureEnv> procExec = mock(ProcedureExecutor.class); 104 when(services.getMasterProcedureExecutor()).thenReturn(procExec); 105 when(procExec.getProcedures()).thenReturn(new ArrayList<>()); 106 107 cleaner = new ReplicationLogCleaner(); 108 cleaner.setConf(CONF); 109 Map<String, Object> params = ImmutableMap.of(HMaster.MASTER, services); 110 cleaner.init(params); 111 } 112 113 @After 114 public void tearDown() { 115 cleaner.postClean(); 116 } 117 118 private static Iterable<FileStatus> runCleaner(ReplicationLogCleaner cleaner, 119 Iterable<FileStatus> files) { 120 cleaner.preClean(); 121 return cleaner.getDeletableFiles(files); 122 } 123 124 private static FileStatus createFileStatus(Path path) { 125 return new FileStatus(100, false, 3, 256, EnvironmentEdgeManager.currentTime(), path); 126 } 127 128 private static FileStatus createFileStatus(ServerName sn, int number) { 129 Path path = new Path(sn.toString() + "." + number); 130 return createFileStatus(path); 131 } 132 133 private static ReplicationPeerDescription createPeer(String peerId) { 134 return new ReplicationPeerDescription(peerId, true, null, null); 135 } 136 137 private void addServer(ServerName serverName) { 138 services.getServerManager().getOnlineServersList().add(serverName); 139 } 140 141 private void addSCP(ServerName serverName, boolean finished) { 142 ServerCrashProcedure scp = mock(ServerCrashProcedure.class); 143 when(scp.getServerName()).thenReturn(serverName); 144 when(scp.isFinished()).thenReturn(finished); 145 services.getMasterProcedureExecutor().getProcedures().add(scp); 146 } 147 148 private void addPeer(String... peerIds) { 149 services.getReplicationPeerManager().listPeers(null).addAll( 150 Stream.of(peerIds).map(TestReplicationLogCleaner::createPeer).collect(Collectors.toList())); 151 } 152 153 private void addQueueData(ReplicationQueueData... datas) throws ReplicationException { 154 services.getReplicationPeerManager().getQueueStorage().listAllQueues() 155 .addAll(Arrays.asList(datas)); 156 } 157 158 @Test 159 public void testNoConf() { 160 ReplicationLogCleaner cleaner = new ReplicationLogCleaner(); 161 List<FileStatus> files = Arrays.asList(new FileStatus()); 162 assertSame(files, runCleaner(cleaner, files)); 163 cleaner.postClean(); 164 } 165 166 @Test 167 public void testCanNotFilter() { 168 assertTrue(services.getReplicationLogCleanerBarrier().disable()); 169 List<FileStatus> files = Arrays.asList(new FileStatus()); 170 assertSame(Collections.emptyList(), runCleaner(cleaner, files)); 171 } 172 173 @Test 174 public void testNoPeer() { 175 Path path = new Path("/wal." + EnvironmentEdgeManager.currentTime()); 176 assertTrue(AbstractFSWALProvider.validateWALFilename(path.getName())); 177 FileStatus file = createFileStatus(path); 178 Iterator<FileStatus> iter = runCleaner(cleaner, Arrays.asList(file)).iterator(); 179 assertSame(file, iter.next()); 180 assertFalse(iter.hasNext()); 181 } 182 183 @Test 184 public void testNotValidWalFile() { 185 addPeer("1"); 186 Path path = new Path("/whatever"); 187 assertFalse(AbstractFSWALProvider.validateWALFilename(path.getName())); 188 FileStatus file = createFileStatus(path); 189 Iterator<FileStatus> iter = runCleaner(cleaner, Arrays.asList(file)).iterator(); 190 assertSame(file, iter.next()); 191 assertFalse(iter.hasNext()); 192 } 193 194 @Test 195 public void testMetaWalFile() { 196 addPeer("1"); 197 Path path = new Path( 198 "/wal." + EnvironmentEdgeManager.currentTime() + AbstractFSWALProvider.META_WAL_PROVIDER_ID); 199 assertTrue(AbstractFSWALProvider.validateWALFilename(path.getName())); 200 assertTrue(AbstractFSWALProvider.isMetaFile(path)); 201 FileStatus file = createFileStatus(path); 202 Iterator<FileStatus> iter = runCleaner(cleaner, Arrays.asList(file)).iterator(); 203 assertSame(file, iter.next()); 204 assertFalse(iter.hasNext()); 205 } 206 207 @Test 208 public void testLiveRegionServerNoQueues() { 209 addPeer("1"); 210 ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime()); 211 addServer(sn); 212 List<FileStatus> files = Arrays.asList(createFileStatus(sn, 1)); 213 assertThat(runCleaner(cleaner, files), emptyIterable()); 214 } 215 216 @Test 217 public void testLiveRegionServerWithSCPNoQueues() { 218 addPeer("1"); 219 ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime()); 220 addSCP(sn, false); 221 List<FileStatus> files = Arrays.asList(createFileStatus(sn, 1)); 222 assertThat(runCleaner(cleaner, files), emptyIterable()); 223 } 224 225 @Test 226 public void testDeadRegionServerNoQueues() { 227 addPeer("1"); 228 ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime()); 229 FileStatus file = createFileStatus(sn, 1); 230 Iterator<FileStatus> iter = runCleaner(cleaner, Arrays.asList(file)).iterator(); 231 assertSame(file, iter.next()); 232 assertFalse(iter.hasNext()); 233 } 234 235 @Test 236 public void testDeadRegionServerWithSCPNoQueues() { 237 addPeer("1"); 238 ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime()); 239 addSCP(sn, true); 240 FileStatus file = createFileStatus(sn, 1); 241 Iterator<FileStatus> iter = runCleaner(cleaner, Arrays.asList(file)).iterator(); 242 assertSame(file, iter.next()); 243 assertFalse(iter.hasNext()); 244 } 245 246 @Test 247 public void testLiveRegionServerMissingQueue() throws ReplicationException { 248 String peerId1 = "1"; 249 String peerId2 = "2"; 250 addPeer(peerId1, peerId2); 251 ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime()); 252 addServer(sn); 253 FileStatus file = createFileStatus(sn, 1); 254 ReplicationQueueData data1 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId1), 255 ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1))); 256 addQueueData(data1); 257 assertThat(runCleaner(cleaner, Arrays.asList(file)), emptyIterable()); 258 } 259 260 @Test 261 public void testLiveRegionServerShouldNotDelete() throws ReplicationException { 262 String peerId = "1"; 263 addPeer(peerId); 264 ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime()); 265 addServer(sn); 266 FileStatus file = createFileStatus(sn, 1); 267 ReplicationQueueData data = new ReplicationQueueData(new ReplicationQueueId(sn, peerId), 268 ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), 0))); 269 addQueueData(data); 270 assertThat(runCleaner(cleaner, Arrays.asList(file)), emptyIterable()); 271 } 272 273 @Test 274 public void testLiveRegionServerShouldNotDeleteTwoPeers() throws ReplicationException { 275 String peerId1 = "1"; 276 String peerId2 = "2"; 277 addPeer(peerId1, peerId2); 278 ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime()); 279 addServer(sn); 280 FileStatus file = createFileStatus(sn, 1); 281 ReplicationQueueData data1 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId1), 282 ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1))); 283 ReplicationQueueData data2 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId2), 284 ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), 0))); 285 addQueueData(data1, data2); 286 assertThat(runCleaner(cleaner, Arrays.asList(file)), emptyIterable()); 287 } 288 289 @Test 290 public void testLiveRegionServerShouldDelete() throws ReplicationException { 291 String peerId = "1"; 292 addPeer(peerId); 293 ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime()); 294 addServer(sn); 295 FileStatus file = createFileStatus(sn, 1); 296 ReplicationQueueData data = new ReplicationQueueData(new ReplicationQueueId(sn, peerId), 297 ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1))); 298 services.getReplicationPeerManager().getQueueStorage().listAllQueues().add(data); 299 Iterator<FileStatus> iter = runCleaner(cleaner, Arrays.asList(file)).iterator(); 300 assertSame(file, iter.next()); 301 assertFalse(iter.hasNext()); 302 } 303 304 @Test 305 public void testLiveRegionServerShouldDeleteTwoPeers() throws ReplicationException { 306 String peerId1 = "1"; 307 String peerId2 = "2"; 308 addPeer(peerId1, peerId2); 309 ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime()); 310 addServer(sn); 311 FileStatus file = createFileStatus(sn, 1); 312 ReplicationQueueData data1 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId1), 313 ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1))); 314 ReplicationQueueData data2 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId2), 315 ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1))); 316 addQueueData(data1, data2); 317 Iterator<FileStatus> iter = runCleaner(cleaner, Arrays.asList(file)).iterator(); 318 assertSame(file, iter.next()); 319 assertFalse(iter.hasNext()); 320 } 321 322 @Test 323 public void testDeadRegionServerMissingQueue() throws ReplicationException { 324 String peerId1 = "1"; 325 String peerId2 = "2"; 326 addPeer(peerId1, peerId2); 327 ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime()); 328 FileStatus file = createFileStatus(sn, 1); 329 ReplicationQueueData data1 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId1), 330 ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1))); 331 addQueueData(data1); 332 Iterator<FileStatus> iter = runCleaner(cleaner, Arrays.asList(file)).iterator(); 333 assertSame(file, iter.next()); 334 assertFalse(iter.hasNext()); 335 } 336 337 @Test 338 public void testDeadRegionServerShouldNotDelete() throws ReplicationException { 339 String peerId = "1"; 340 addPeer(peerId); 341 ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime()); 342 FileStatus file = createFileStatus(sn, 1); 343 ReplicationQueueData data = new ReplicationQueueData(new ReplicationQueueId(sn, peerId), 344 ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), 0))); 345 addQueueData(data); 346 assertThat(runCleaner(cleaner, Arrays.asList(file)), emptyIterable()); 347 } 348 349 @Test 350 public void testDeadRegionServerShouldNotDeleteTwoPeers() throws ReplicationException { 351 String peerId1 = "1"; 352 String peerId2 = "2"; 353 addPeer(peerId1, peerId2); 354 ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime()); 355 FileStatus file = createFileStatus(sn, 1); 356 ReplicationQueueData data1 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId1), 357 ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1))); 358 ReplicationQueueData data2 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId2), 359 ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), 0))); 360 addQueueData(data1, data2); 361 assertThat(runCleaner(cleaner, Arrays.asList(file)), emptyIterable()); 362 } 363 364 @Test 365 public void testDeadRegionServerShouldDelete() throws ReplicationException { 366 String peerId = "1"; 367 addPeer(peerId); 368 ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime()); 369 FileStatus file = createFileStatus(sn, 1); 370 ReplicationQueueData data = new ReplicationQueueData(new ReplicationQueueId(sn, peerId), 371 ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1))); 372 services.getReplicationPeerManager().getQueueStorage().listAllQueues().add(data); 373 Iterator<FileStatus> iter = runCleaner(cleaner, Arrays.asList(file)).iterator(); 374 assertSame(file, iter.next()); 375 assertFalse(iter.hasNext()); 376 } 377 378 @Test 379 public void testDeadRegionServerShouldDeleteTwoPeers() throws ReplicationException { 380 String peerId1 = "1"; 381 String peerId2 = "2"; 382 addPeer(peerId1, peerId2); 383 ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime()); 384 FileStatus file = createFileStatus(sn, 1); 385 ReplicationQueueData data1 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId1), 386 ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1))); 387 ReplicationQueueData data2 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId2), 388 ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1))); 389 addQueueData(data1, data2); 390 Iterator<FileStatus> iter = runCleaner(cleaner, Arrays.asList(file)).iterator(); 391 assertSame(file, iter.next()); 392 assertFalse(iter.hasNext()); 393 } 394 395 @Test 396 public void testPreCleanWhenAsyncClusterConnectionClosed() throws ReplicationException { 397 assertFalse(services.getAsyncClusterConnection().isClosed()); 398 verify(services.getAsyncClusterConnection(), Mockito.times(1)).isClosed(); 399 cleaner.preClean(); 400 verify(services.getAsyncClusterConnection(), Mockito.times(2)).isClosed(); 401 verify(rpm.getQueueStorage(), Mockito.times(1)).hasData(); 402 403 when(services.getAsyncClusterConnection().isClosed()).thenReturn(true); 404 assertTrue(services.getAsyncClusterConnection().isClosed()); 405 verify(services.getAsyncClusterConnection(), Mockito.times(3)).isClosed(); 406 cleaner.preClean(); 407 verify(services.getAsyncClusterConnection(), Mockito.times(4)).isClosed(); 408 // rpm.getQueueStorage().hasData() was not executed, indicating an early return. 409 verify(rpm.getQueueStorage(), Mockito.times(1)).hasData(); 410 } 411 412 @Test 413 public void testGetDeletableFilesWhenAsyncClusterConnectionClosed() throws ReplicationException { 414 List<FileStatus> files = List.of(new FileStatus()); 415 assertFalse(services.getAsyncClusterConnection().isClosed()); 416 verify(services.getAsyncClusterConnection(), Mockito.times(1)).isClosed(); 417 cleaner.getDeletableFiles(files); 418 verify(services.getAsyncClusterConnection(), Mockito.times(2)).isClosed(); 419 verify(rpm.getQueueStorage(), Mockito.times(1)).hasData(); 420 421 when(services.getAsyncClusterConnection().isClosed()).thenReturn(true); 422 assertTrue(services.getAsyncClusterConnection().isClosed()); 423 verify(services.getAsyncClusterConnection(), Mockito.times(3)).isClosed(); 424 cleaner.getDeletableFiles(files); 425 verify(services.getAsyncClusterConnection(), Mockito.times(4)).isClosed(); 426 // rpm.getQueueStorage().hasData() was not executed, indicating an early return. 427 verify(rpm.getQueueStorage(), Mockito.times(1)).hasData(); 428 } 429}