001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import static org.junit.jupiter.api.Assertions.assertThrows; 021import static org.mockito.ArgumentMatchers.any; 022import static org.mockito.ArgumentMatchers.anyLong; 023import static org.mockito.ArgumentMatchers.eq; 024import static org.mockito.Mockito.atLeastOnce; 025import static org.mockito.Mockito.doNothing; 026import static org.mockito.Mockito.doReturn; 027import static org.mockito.Mockito.doThrow; 028import static org.mockito.Mockito.inOrder; 029import static org.mockito.Mockito.mock; 030import static org.mockito.Mockito.never; 031import static org.mockito.Mockito.spy; 032import static org.mockito.Mockito.times; 033import static org.mockito.Mockito.verify; 034import static org.mockito.Mockito.when; 035 036import java.io.IOException; 037import java.util.Collections; 038import org.apache.hadoop.conf.Configuration; 039import org.apache.hadoop.fs.Path; 040import org.apache.hadoop.hbase.TableName; 041import org.apache.hadoop.hbase.replication.ReplicationEndpoint; 042import org.apache.hadoop.hbase.replication.ReplicationQueueId; 043import org.apache.hadoop.hbase.testclassification.ReplicationTests; 044import org.apache.hadoop.hbase.testclassification.SmallTests; 045import org.apache.hadoop.hbase.wal.WAL.Entry; 046import org.apache.hadoop.hbase.wal.WALEdit; 047import org.apache.hadoop.hbase.wal.WALKeyImpl; 048import org.junit.jupiter.api.BeforeEach; 049import org.junit.jupiter.api.Tag; 050import org.junit.jupiter.api.Test; 051import org.junit.jupiter.api.Timeout; 052import org.mockito.InOrder; 053 054@Tag(ReplicationTests.TAG) 055@Tag(SmallTests.TAG) 056public class TestReplicationSourceShipperRestart { 057 058 private ReplicationSource source; 059 private ReplicationSourceWALReader reader; 060 private MetricsSource metrics; 061 private ReplicationEndpoint endpoint; 062 063 private static final Path PATH = new Path("/test"); 064 065 @BeforeEach 066 public void setup() { 067 source = mock(ReplicationSource.class); 068 reader = mock(ReplicationSourceWALReader.class); 069 metrics = mock(MetricsSource.class); 070 endpoint = mock(ReplicationEndpoint.class); 071 072 when(source.isPeerEnabled()).thenReturn(true); 073 when(source.isSourceActive()).thenReturn(true); 074 when(source.getSourceMetrics()).thenReturn(metrics); 075 when(source.getReplicationEndpoint()).thenReturn(endpoint); 076 077 ReplicationQueueId qid = mock(ReplicationQueueId.class); 078 when(source.getQueueId()).thenReturn(qid); 079 080 when(endpoint.replicate(any())).thenReturn(true); 081 082 doNothing().when(source).logPositionAndCleanOldLogs(any()); 083 } 084 085 private WALEntryBatch emptyBatch() { 086 WALEntryBatch b = mock(WALEntryBatch.class); 087 when(b.getWalEntries()).thenReturn(Collections.emptyList()); 088 when(b.getHeapSize()).thenReturn(100L); 089 when(b.getLastWalPosition()).thenReturn(1L); 090 when(b.getLastWalPath()).thenReturn(PATH); 091 return b; 092 } 093 094 private WALEntryBatch batch(long size) { 095 WALEntryBatch b = mock(WALEntryBatch.class); 096 097 Entry e = mock(Entry.class); 098 WALEdit edit = mock(WALEdit.class); 099 WALKeyImpl key = mock(WALKeyImpl.class); 100 101 when(key.getWriteTime()).thenReturn(1L); 102 when(key.getTableName()).thenReturn(TableName.valueOf("t")); 103 104 when(e.getKey()).thenReturn(key); 105 when(e.getEdit()).thenReturn(edit); 106 107 when(b.getWalEntries()).thenReturn(Collections.singletonList(e)); 108 when(b.getHeapSize()).thenReturn(size); 109 when(b.getLastWalPosition()).thenReturn(1L); 110 when(b.getLastWalPath()).thenReturn(PATH); 111 112 return b; 113 } 114 115 /** 116 * Light shipper → skips persist logic 117 */ 118 private ReplicationSourceShipper lightShipper(Configuration conf) { 119 return new ReplicationSourceShipper(conf, "group", source, reader) { 120 int loops = 0; 121 122 @Override 123 protected boolean isActive() { 124 return loops++ < 2; 125 } 126 127 @Override 128 protected void cleanUpHFileRefs(WALEdit edit) { 129 } 130 131 @Override 132 void persistLogPosition() { 133 // skip heavy logic 134 } 135 }; 136 } 137 138 /** 139 * Real shipper → executes persist logic 140 */ 141 private ReplicationSourceShipper realShipper(Configuration conf) { 142 return new ReplicationSourceShipper(conf, "group", source, reader) { 143 int loops = 0; 144 145 @Override 146 protected boolean isActive() { 147 return loops++ < 2; 148 } 149 150 @Override 151 protected void cleanUpHFileRefs(WALEdit edit) { 152 } 153 }; 154 } 155 156 // ------------------------------------------------------------------------ 157 // Restart 158 // ------------------------------------------------------------------------ 159 160 @Test 161 public void testAbortAndRestart() { 162 ReplicationSourceShipper s = 163 new ReplicationSourceShipper(new Configuration(), "group", source, reader); 164 165 s.abortAndRestart(new IOException()); 166 167 verify(source).restartShipper("group", s); 168 } 169 170 // ------------------------------------------------------------------------ 171 // Empty batch 172 // ------------------------------------------------------------------------ 173 174 @Test 175 public void testEmptyBatchTriggersPersist() throws Exception { 176 ReplicationSourceShipper s = spy(lightShipper(new Configuration())); 177 178 s.shipEdits(emptyBatch()); 179 180 verify(s).persistLogPosition(); 181 } 182 183 // ------------------------------------------------------------------------ 184 // Default persist 185 // ------------------------------------------------------------------------ 186 187 @Test 188 public void testDefaultImmediatePersist() throws Exception { 189 ReplicationSourceShipper s = spy(lightShipper(new Configuration())); 190 191 s.shipEdits(batch(50)); 192 193 verify(s).persistLogPosition(); 194 } 195 196 // ------------------------------------------------------------------------ 197 // Size threshold 198 // ------------------------------------------------------------------------ 199 200 @Test 201 public void testSizeThreshold() throws Exception { 202 Configuration conf = new Configuration(); 203 conf.setLong("hbase.replication.shipper.offset.update.size.threshold", 100); 204 205 ReplicationSourceShipper s = spy(lightShipper(conf)); 206 207 s.shipEdits(batch(40)); 208 verify(s, never()).persistLogPosition(); 209 210 s.shipEdits(batch(70)); 211 verify(s, times(1)).persistLogPosition(); 212 } 213 214 // ------------------------------------------------------------------------ 215 // Time threshold 216 // ------------------------------------------------------------------------ 217 218 @Test 219 public void testTimeThreshold() throws Exception { 220 Configuration conf = new Configuration(); 221 conf.setLong("hbase.replication.shipper.offset.update.interval.ms", 1); 222 223 ReplicationSourceShipper s = spy(lightShipper(conf)); 224 225 s.shipEdits(batch(10)); 226 Thread.sleep(20); 227 s.shipEdits(batch(10)); 228 229 verify(s, atLeastOnce()).persistLogPosition(); 230 } 231 232 // ------------------------------------------------------------------------ 233 // Hook test (FIXED) 234 // ------------------------------------------------------------------------ 235 236 @Test 237 public void testBeforePersistHookCalled() throws Exception { 238 ReplicationSourceShipper s = realShipper(new Configuration()); 239 240 s.shipEdits(emptyBatch()); 241 242 verify(endpoint, atLeastOnce()).beforePersistingReplicationOffset(); 243 } 244 245 // ------------------------------------------------------------------------ 246 // Ordering test (FIXED) 247 // ------------------------------------------------------------------------ 248 249 @Test 250 public void testBeforePersistCalledBeforeUpdate() throws Exception { 251 ReplicationSourceShipper s = realShipper(new Configuration()); 252 253 InOrder order = inOrder(endpoint, source); 254 255 s.shipEdits(batch(100)); 256 257 order.verify(endpoint).beforePersistingReplicationOffset(); 258 order.verify(source).logPositionAndCleanOldLogs(any()); 259 } 260 261 // ------------------------------------------------------------------------ 262 // Persist failure 263 // ------------------------------------------------------------------------ 264 265 @Test 266 public void testPersistFailureThrows() throws Exception { 267 doThrow(new IOException()).when(endpoint).beforePersistingReplicationOffset(); 268 269 ReplicationSourceShipper shipper = 270 new ReplicationSourceShipper(new Configuration(), "group", source, reader); 271 272 assertThrows(IOException.class, () -> shipper.shipEdits(emptyBatch())); 273 } 274 275 // ------------------------------------------------------------------------ 276 // Restart via run 277 // ------------------------------------------------------------------------ 278 279 @Test 280 @Timeout(30) 281 public void testPersistFailureTriggersRestart() throws Exception { 282 283 WALEntryBatch batch = emptyBatch(); 284 285 when(reader.poll(anyLong())).thenReturn(batch).thenThrow(new InterruptedException()); 286 287 doThrow(new IOException()).when(endpoint).beforePersistingReplicationOffset(); 288 289 ReplicationSourceShipper shipper = 290 new ReplicationSourceShipper(new Configuration(), "group", source, reader) { 291 int loops = 0; 292 293 @Override 294 protected boolean isActive() { 295 return loops++ < 2; 296 } 297 }; 298 299 shipper.start(); 300 shipper.join(); 301 302 verify(source, atLeastOnce()).restartShipper(eq("group"), eq(shipper)); 303 } 304 305 // ------------------------------------------------------------------------ 306 // Normal run 307 // ------------------------------------------------------------------------ 308 309 @Test 310 @Timeout(30) 311 public void testRunNormalFlow() throws Exception { 312 313 WALEntryBatch batch = emptyBatch(); 314 315 when(reader.poll(anyLong())).thenReturn(batch).thenThrow(new InterruptedException()); 316 317 ReplicationSourceShipper s = lightShipper(new Configuration()); 318 319 ReplicationSourceShipper spy = spy(s); 320 doReturn(true, true, false).when(spy).isActive(); 321 322 spy.start(); 323 spy.join(); 324 325 verify(source, never()).restartShipper(any(), any()); 326 } 327}