001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.junit.jupiter.api.Assertions.assertThrows;
021import static org.mockito.ArgumentMatchers.any;
022import static org.mockito.ArgumentMatchers.anyLong;
023import static org.mockito.ArgumentMatchers.eq;
024import static org.mockito.Mockito.atLeastOnce;
025import static org.mockito.Mockito.doNothing;
026import static org.mockito.Mockito.doReturn;
027import static org.mockito.Mockito.doThrow;
028import static org.mockito.Mockito.inOrder;
029import static org.mockito.Mockito.mock;
030import static org.mockito.Mockito.never;
031import static org.mockito.Mockito.spy;
032import static org.mockito.Mockito.times;
033import static org.mockito.Mockito.verify;
034import static org.mockito.Mockito.when;
035
036import java.io.IOException;
037import java.util.Collections;
038import org.apache.hadoop.conf.Configuration;
039import org.apache.hadoop.fs.Path;
040import org.apache.hadoop.hbase.TableName;
041import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
042import org.apache.hadoop.hbase.replication.ReplicationQueueId;
043import org.apache.hadoop.hbase.testclassification.ReplicationTests;
044import org.apache.hadoop.hbase.testclassification.SmallTests;
045import org.apache.hadoop.hbase.wal.WAL.Entry;
046import org.apache.hadoop.hbase.wal.WALEdit;
047import org.apache.hadoop.hbase.wal.WALKeyImpl;
048import org.junit.jupiter.api.BeforeEach;
049import org.junit.jupiter.api.Tag;
050import org.junit.jupiter.api.Test;
051import org.junit.jupiter.api.Timeout;
052import org.mockito.InOrder;
053
054@Tag(ReplicationTests.TAG)
055@Tag(SmallTests.TAG)
056public class TestReplicationSourceShipperRestart {
057
058  private ReplicationSource source;
059  private ReplicationSourceWALReader reader;
060  private MetricsSource metrics;
061  private ReplicationEndpoint endpoint;
062
063  private static final Path PATH = new Path("/test");
064
065  @BeforeEach
066  public void setup() {
067    source = mock(ReplicationSource.class);
068    reader = mock(ReplicationSourceWALReader.class);
069    metrics = mock(MetricsSource.class);
070    endpoint = mock(ReplicationEndpoint.class);
071
072    when(source.isPeerEnabled()).thenReturn(true);
073    when(source.isSourceActive()).thenReturn(true);
074    when(source.getSourceMetrics()).thenReturn(metrics);
075    when(source.getReplicationEndpoint()).thenReturn(endpoint);
076
077    ReplicationQueueId qid = mock(ReplicationQueueId.class);
078    when(source.getQueueId()).thenReturn(qid);
079
080    when(endpoint.replicate(any())).thenReturn(true);
081
082    doNothing().when(source).logPositionAndCleanOldLogs(any());
083  }
084
085  private WALEntryBatch emptyBatch() {
086    WALEntryBatch b = mock(WALEntryBatch.class);
087    when(b.getWalEntries()).thenReturn(Collections.emptyList());
088    when(b.getHeapSize()).thenReturn(100L);
089    when(b.getLastWalPosition()).thenReturn(1L);
090    when(b.getLastWalPath()).thenReturn(PATH);
091    return b;
092  }
093
094  private WALEntryBatch batch(long size) {
095    WALEntryBatch b = mock(WALEntryBatch.class);
096
097    Entry e = mock(Entry.class);
098    WALEdit edit = mock(WALEdit.class);
099    WALKeyImpl key = mock(WALKeyImpl.class);
100
101    when(key.getWriteTime()).thenReturn(1L);
102    when(key.getTableName()).thenReturn(TableName.valueOf("t"));
103
104    when(e.getKey()).thenReturn(key);
105    when(e.getEdit()).thenReturn(edit);
106
107    when(b.getWalEntries()).thenReturn(Collections.singletonList(e));
108    when(b.getHeapSize()).thenReturn(size);
109    when(b.getLastWalPosition()).thenReturn(1L);
110    when(b.getLastWalPath()).thenReturn(PATH);
111
112    return b;
113  }
114
115  /**
116   * Light shipper → skips persist logic
117   */
118  private ReplicationSourceShipper lightShipper(Configuration conf) {
119    return new ReplicationSourceShipper(conf, "group", source, reader) {
120      int loops = 0;
121
122      @Override
123      protected boolean isActive() {
124        return loops++ < 2;
125      }
126
127      @Override
128      protected void cleanUpHFileRefs(WALEdit edit) {
129      }
130
131      @Override
132      void persistLogPosition() {
133        // skip heavy logic
134      }
135    };
136  }
137
138  /**
139   * Real shipper → executes persist logic
140   */
141  private ReplicationSourceShipper realShipper(Configuration conf) {
142    return new ReplicationSourceShipper(conf, "group", source, reader) {
143      int loops = 0;
144
145      @Override
146      protected boolean isActive() {
147        return loops++ < 2;
148      }
149
150      @Override
151      protected void cleanUpHFileRefs(WALEdit edit) {
152      }
153    };
154  }
155
156  // ------------------------------------------------------------------------
157  // Restart
158  // ------------------------------------------------------------------------
159
160  @Test
161  public void testAbortAndRestart() {
162    ReplicationSourceShipper s =
163      new ReplicationSourceShipper(new Configuration(), "group", source, reader);
164
165    s.abortAndRestart(new IOException());
166
167    verify(source).restartShipper("group", s);
168  }
169
170  // ------------------------------------------------------------------------
171  // Empty batch
172  // ------------------------------------------------------------------------
173
174  @Test
175  public void testEmptyBatchTriggersPersist() throws Exception {
176    ReplicationSourceShipper s = spy(lightShipper(new Configuration()));
177
178    s.shipEdits(emptyBatch());
179
180    verify(s).persistLogPosition();
181  }
182
183  // ------------------------------------------------------------------------
184  // Default persist
185  // ------------------------------------------------------------------------
186
187  @Test
188  public void testDefaultImmediatePersist() throws Exception {
189    ReplicationSourceShipper s = spy(lightShipper(new Configuration()));
190
191    s.shipEdits(batch(50));
192
193    verify(s).persistLogPosition();
194  }
195
196  // ------------------------------------------------------------------------
197  // Size threshold
198  // ------------------------------------------------------------------------
199
200  @Test
201  public void testSizeThreshold() throws Exception {
202    Configuration conf = new Configuration();
203    conf.setLong("hbase.replication.shipper.offset.update.size.threshold", 100);
204
205    ReplicationSourceShipper s = spy(lightShipper(conf));
206
207    s.shipEdits(batch(40));
208    verify(s, never()).persistLogPosition();
209
210    s.shipEdits(batch(70));
211    verify(s, times(1)).persistLogPosition();
212  }
213
214  // ------------------------------------------------------------------------
215  // Time threshold
216  // ------------------------------------------------------------------------
217
218  @Test
219  public void testTimeThreshold() throws Exception {
220    Configuration conf = new Configuration();
221    conf.setLong("hbase.replication.shipper.offset.update.interval.ms", 1);
222
223    ReplicationSourceShipper s = spy(lightShipper(conf));
224
225    s.shipEdits(batch(10));
226    Thread.sleep(20);
227    s.shipEdits(batch(10));
228
229    verify(s, atLeastOnce()).persistLogPosition();
230  }
231
232  // ------------------------------------------------------------------------
233  // Hook test (FIXED)
234  // ------------------------------------------------------------------------
235
236  @Test
237  public void testBeforePersistHookCalled() throws Exception {
238    ReplicationSourceShipper s = realShipper(new Configuration());
239
240    s.shipEdits(emptyBatch());
241
242    verify(endpoint, atLeastOnce()).beforePersistingReplicationOffset();
243  }
244
245  // ------------------------------------------------------------------------
246  // Ordering test (FIXED)
247  // ------------------------------------------------------------------------
248
249  @Test
250  public void testBeforePersistCalledBeforeUpdate() throws Exception {
251    ReplicationSourceShipper s = realShipper(new Configuration());
252
253    InOrder order = inOrder(endpoint, source);
254
255    s.shipEdits(batch(100));
256
257    order.verify(endpoint).beforePersistingReplicationOffset();
258    order.verify(source).logPositionAndCleanOldLogs(any());
259  }
260
261  // ------------------------------------------------------------------------
262  // Persist failure
263  // ------------------------------------------------------------------------
264
265  @Test
266  public void testPersistFailureThrows() throws Exception {
267    doThrow(new IOException()).when(endpoint).beforePersistingReplicationOffset();
268
269    ReplicationSourceShipper shipper =
270      new ReplicationSourceShipper(new Configuration(), "group", source, reader);
271
272    assertThrows(IOException.class, () -> shipper.shipEdits(emptyBatch()));
273  }
274
275  // ------------------------------------------------------------------------
276  // Restart via run
277  // ------------------------------------------------------------------------
278
279  @Test
280  @Timeout(30)
281  public void testPersistFailureTriggersRestart() throws Exception {
282
283    WALEntryBatch batch = emptyBatch();
284
285    when(reader.poll(anyLong())).thenReturn(batch).thenThrow(new InterruptedException());
286
287    doThrow(new IOException()).when(endpoint).beforePersistingReplicationOffset();
288
289    ReplicationSourceShipper shipper =
290      new ReplicationSourceShipper(new Configuration(), "group", source, reader) {
291        int loops = 0;
292
293        @Override
294        protected boolean isActive() {
295          return loops++ < 2;
296        }
297      };
298
299    shipper.start();
300    shipper.join();
301
302    verify(source, atLeastOnce()).restartShipper(eq("group"), eq(shipper));
303  }
304
305  // ------------------------------------------------------------------------
306  // Normal run
307  // ------------------------------------------------------------------------
308
309  @Test
310  @Timeout(30)
311  public void testRunNormalFlow() throws Exception {
312
313    WALEntryBatch batch = emptyBatch();
314
315    when(reader.poll(anyLong())).thenReturn(batch).thenThrow(new InterruptedException());
316
317    ReplicationSourceShipper s = lightShipper(new Configuration());
318
319    ReplicationSourceShipper spy = spy(s);
320    doReturn(true, true, false).when(spy).isActive();
321
322    spy.start();
323    spy.join();
324
325    verify(source, never()).restartShipper(any(), any());
326  }
327}