001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.hamcrest.MatcherAssert.assertThat;
021import static org.hamcrest.Matchers.hasItems;
022import static org.hamcrest.Matchers.hasSize;
023import static org.junit.Assert.assertEquals;
024import static org.junit.Assert.assertFalse;
025import static org.junit.Assert.assertNotNull;
026import static org.mockito.Mockito.mock;
027import static org.mockito.Mockito.when;
028
029import java.io.IOException;
030import java.util.Collections;
031import java.util.Map;
032import java.util.NavigableMap;
033import java.util.Set;
034import java.util.TreeMap;
035import java.util.stream.Collectors;
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.fs.FileSystem;
038import org.apache.hadoop.fs.Path;
039import org.apache.hadoop.hbase.Cell;
040import org.apache.hadoop.hbase.CellBuilderType;
041import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
042import org.apache.hadoop.hbase.ExtendedCellBuilderFactory;
043import org.apache.hadoop.hbase.HBaseClassTestRule;
044import org.apache.hadoop.hbase.HBaseTestingUtil;
045import org.apache.hadoop.hbase.HConstants;
046import org.apache.hadoop.hbase.Server;
047import org.apache.hadoop.hbase.ServerName;
048import org.apache.hadoop.hbase.TableName;
049import org.apache.hadoop.hbase.client.RegionInfo;
050import org.apache.hadoop.hbase.client.RegionInfoBuilder;
051import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter;
052import org.apache.hadoop.hbase.replication.DummyReplicationEndpoint;
053import org.apache.hadoop.hbase.replication.ReplicationException;
054import org.apache.hadoop.hbase.replication.ReplicationGroupOffset;
055import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
056import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder;
057import org.apache.hadoop.hbase.replication.ReplicationPeers;
058import org.apache.hadoop.hbase.replication.ReplicationQueueId;
059import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
060import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
061import org.apache.hadoop.hbase.replication.ReplicationUtils;
062import org.apache.hadoop.hbase.replication.SyncReplicationState;
063import org.apache.hadoop.hbase.testclassification.MediumTests;
064import org.apache.hadoop.hbase.testclassification.ReplicationTests;
065import org.apache.hadoop.hbase.util.Bytes;
066import org.apache.hadoop.hbase.util.CommonFSUtils;
067import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
068import org.apache.hadoop.hbase.wal.WAL;
069import org.apache.hadoop.hbase.wal.WALEdit;
070import org.apache.hadoop.hbase.wal.WALEditInternalHelper;
071import org.apache.hadoop.hbase.wal.WALFactory;
072import org.apache.hadoop.hbase.wal.WALKeyImpl;
073import org.hamcrest.Matchers;
074import org.junit.After;
075import org.junit.AfterClass;
076import org.junit.Before;
077import org.junit.BeforeClass;
078import org.junit.ClassRule;
079import org.junit.Rule;
080import org.junit.Test;
081import org.junit.experimental.categories.Category;
082import org.junit.rules.TestName;
083
084import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
085
086@Category({ ReplicationTests.class, MediumTests.class })
087public class TestReplicationSourceManager {
088
089  @ClassRule
090  public static final HBaseClassTestRule CLASS_RULE =
091    HBaseClassTestRule.forClass(TestReplicationSourceManager.class);
092
093  public static final class ReplicationEndpointForTest extends DummyReplicationEndpoint {
094
095    private String clusterKey;
096
097    @Override
098    public boolean replicate(ReplicateContext replicateContext) {
099      // if you want to block the replication, for example, do not want the recovered source to be
100      // removed
101      if (clusterKey.endsWith("error")) {
102        throw new RuntimeException("Inject error");
103      }
104      return true;
105    }
106
107    @Override
108    public void init(Context context) throws IOException {
109      super.init(context);
110      this.clusterKey = context.getReplicationPeer().getPeerConfig().getClusterKey();
111    }
112
113  }
114
115  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
116
117  private static Configuration CONF;
118
119  private static FileSystem FS;
120
121  private static final byte[] F1 = Bytes.toBytes("f1");
122
123  private static final byte[] F2 = Bytes.toBytes("f2");
124
125  private static final TableName TABLE_NAME = TableName.valueOf("test");
126
127  private static RegionInfo RI;
128
129  private static NavigableMap<byte[], Integer> SCOPES;
130
131  @Rule
132  public final TestName name = new TestName();
133
134  private Path oldLogDir;
135
136  private Path logDir;
137
138  private Path remoteLogDir;
139
140  private Server server;
141
142  private Replication replication;
143
144  private ReplicationSourceManager manager;
145
146  @BeforeClass
147  public static void setUpBeforeClass() throws Exception {
148    UTIL.startMiniCluster(1);
149    FS = UTIL.getTestFileSystem();
150    CONF = new Configuration(UTIL.getConfiguration());
151    CONF.setLong("replication.sleep.before.failover", 0);
152
153    RI = RegionInfoBuilder.newBuilder(TABLE_NAME).build();
154    SCOPES = new TreeMap<>(Bytes.BYTES_COMPARATOR);
155    SCOPES.put(F1, 1);
156    SCOPES.put(F2, 0);
157  }
158
159  @AfterClass
160  public static void tearDownAfterClass() throws IOException {
161    UTIL.shutdownMiniCluster();
162  }
163
164  @Before
165  public void setUp() throws Exception {
166    Path rootDir = UTIL.getDataTestDirOnTestFS(name.getMethodName());
167    CommonFSUtils.setRootDir(CONF, rootDir);
168    server = mock(Server.class);
169    when(server.getConfiguration()).thenReturn(CONF);
170    when(server.getZooKeeper()).thenReturn(UTIL.getZooKeeperWatcher());
171    when(server.getConnection()).thenReturn(UTIL.getConnection());
172    ServerName sn = ServerName.valueOf("hostname.example.org", 1234, 1);
173    when(server.getServerName()).thenReturn(sn);
174    oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
175    FS.mkdirs(oldLogDir);
176    logDir = new Path(rootDir, HConstants.HREGION_LOGDIR_NAME);
177    FS.mkdirs(logDir);
178    remoteLogDir = new Path(rootDir, ReplicationUtils.REMOTE_WAL_DIR_NAME);
179    FS.mkdirs(remoteLogDir);
180    TableName tableName = TableName.valueOf("replication_" + name.getMethodName());
181    UTIL.getAdmin()
182      .createTable(ReplicationStorageFactory.createReplicationQueueTableDescriptor(tableName));
183    CONF.set(ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME, tableName.getNameAsString());
184
185    replication = new Replication();
186    replication.initialize(server, FS, new Path(logDir, sn.toString()), oldLogDir,
187      new WALFactory(CONF, server.getServerName(), null));
188    manager = replication.getReplicationManager();
189  }
190
191  @After
192  public void tearDown() {
193    replication.stopReplicationService();
194  }
195
196  /**
197   * Add a peer and wait for it to initialize
198   */
199  private void addPeerAndWait(String peerId, String clusterKey, boolean syncRep)
200    throws ReplicationException, IOException {
201    ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder()
202      .setClusterKey(UTIL.getZkCluster().getAddress().toString() + ":/" + clusterKey)
203      .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName());
204    if (syncRep) {
205      builder.setTableCFsMap(ImmutableMap.of(TABLE_NAME, Collections.emptyList()))
206        .setRemoteWALDir(FS.makeQualified(remoteLogDir).toString());
207    }
208
209    manager.getReplicationPeers().getPeerStorage().addPeer(peerId, builder.build(), true,
210      syncRep ? SyncReplicationState.DOWNGRADE_ACTIVE : SyncReplicationState.NONE);
211    manager.addPeer(peerId);
212    UTIL.waitFor(20000, () -> {
213      ReplicationSourceInterface rs = manager.getSource(peerId);
214      return rs != null && rs.isSourceActive();
215    });
216  }
217
218  /**
219   * Remove a peer and wait for it to get cleaned up
220   */
221  private void removePeerAndWait(String peerId) throws Exception {
222    ReplicationPeers rp = manager.getReplicationPeers();
223    rp.getPeerStorage().removePeer(peerId);
224    manager.removePeer(peerId);
225    UTIL.waitFor(20000, () -> {
226      if (rp.getPeer(peerId) != null) {
227        return false;
228      }
229      if (manager.getSource(peerId) != null) {
230        return false;
231      }
232      return manager.getOldSources().stream().noneMatch(rs -> rs.getPeerId().equals(peerId));
233    });
234  }
235
236  private void createWALFile(Path file) throws Exception {
237    ProtobufLogWriter writer = new ProtobufLogWriter();
238    try {
239      writer.init(FS, file, CONF, false, FS.getDefaultBlockSize(file), null);
240      WALKeyImpl key = new WALKeyImpl(RI.getEncodedNameAsBytes(), TABLE_NAME,
241        EnvironmentEdgeManager.currentTime(), SCOPES);
242      WALEdit edit = new WALEdit();
243      WALEditInternalHelper.addExtendedCell(edit,
244        ExtendedCellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(F1).setFamily(F1)
245          .setQualifier(F1).setType(Cell.Type.Put).setValue(F1).build());
246      WALEditInternalHelper.addExtendedCell(edit,
247        ExtendedCellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(F2).setFamily(F2)
248          .setQualifier(F2).setType(Cell.Type.Put).setValue(F2).build());
249      writer.append(new WAL.Entry(key, edit));
250      writer.sync(false);
251    } finally {
252      writer.close();
253    }
254  }
255
256  @Test
257  public void testClaimQueue() throws Exception {
258    String peerId = "1";
259    addPeerAndWait(peerId, "error", false);
260    ServerName serverName = ServerName.valueOf("hostname0.example.org", 12345, 123);
261    String walName1 = serverName.toString() + ".1";
262    createWALFile(new Path(oldLogDir, walName1));
263    ReplicationQueueId queueId = new ReplicationQueueId(serverName, peerId);
264    ReplicationQueueStorage queueStorage = manager.getQueueStorage();
265    queueStorage.setOffset(queueId, "", new ReplicationGroupOffset(peerId, 0),
266      Collections.emptyMap());
267    manager.claimQueue(queueId);
268    assertThat(manager.getOldSources(), hasSize(1));
269  }
270
271  @Test
272  public void testSameWALPrefix() throws IOException {
273    String walName1 = "localhost,8080,12345-45678-Peer.34567";
274    String walName2 = "localhost,8080,12345.56789";
275    manager.postLogRoll(new Path(walName1));
276    manager.postLogRoll(new Path(walName2));
277
278    Set<String> latestWals =
279      manager.getLastestPath().stream().map(Path::getName).collect(Collectors.toSet());
280    assertThat(latestWals,
281      Matchers.<Set<String>> both(hasSize(2)).and(hasItems(walName1, walName2)));
282  }
283
284  private MetricsReplicationSourceSource getGlobalSource() {
285    return CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class)
286      .getGlobalSource();
287  }
288
289  @Test
290  public void testRemovePeerMetricsCleanup() throws Exception {
291    MetricsReplicationSourceSource globalSource = getGlobalSource();
292    int globalLogQueueSizeInitial = globalSource.getSizeOfLogQueue();
293    String peerId = "DummyPeer";
294    addPeerAndWait(peerId, "hbase", false);
295    // there is no latestPaths so the size of log queue should not change
296    assertEquals(globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue());
297
298    ReplicationSourceInterface source = manager.getSource(peerId);
299    // Sanity check
300    assertNotNull(source);
301    int sizeOfSingleLogQueue = source.getSourceMetrics().getSizeOfLogQueue();
302    // Enqueue log and check if metrics updated
303    Path serverLogDir = new Path(logDir, server.getServerName().toString());
304    source.enqueueLog(new Path(serverLogDir, server.getServerName() + ".1"));
305    assertEquals(1 + sizeOfSingleLogQueue, source.getSourceMetrics().getSizeOfLogQueue());
306    assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + globalLogQueueSizeInitial,
307      globalSource.getSizeOfLogQueue());
308
309    // Removing the peer should reset the global metrics
310    removePeerAndWait(peerId);
311    assertEquals(globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue());
312
313    // Adding the same peer back again should reset the single source metrics
314    addPeerAndWait(peerId, "hbase", false);
315    source = manager.getSource(peerId);
316    assertNotNull(source);
317    assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + globalLogQueueSizeInitial,
318      globalSource.getSizeOfLogQueue());
319  }
320
321  @Test
322  public void testDisablePeerMetricsCleanup() throws Exception {
323    final String peerId = "DummyPeer";
324    try {
325      MetricsReplicationSourceSource globalSource = getGlobalSource();
326      final int globalLogQueueSizeInitial = globalSource.getSizeOfLogQueue();
327      addPeerAndWait(peerId, "hbase", false);
328      assertEquals(globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue());
329      ReplicationSourceInterface source = manager.getSource(peerId);
330      // Sanity check
331      assertNotNull(source);
332      final int sizeOfSingleLogQueue = source.getSourceMetrics().getSizeOfLogQueue();
333      // Enqueue log and check if metrics updated
334      Path serverLogDir = new Path(logDir, server.getServerName().toString());
335      source.enqueueLog(new Path(serverLogDir, server.getServerName() + ".1"));
336      assertEquals(1 + sizeOfSingleLogQueue, source.getSourceMetrics().getSizeOfLogQueue());
337      assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + globalLogQueueSizeInitial,
338        globalSource.getSizeOfLogQueue());
339
340      // Refreshing the peer should decrement the global and single source metrics
341      manager.refreshSources(peerId);
342      assertEquals(globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue());
343
344      source = manager.getSource(peerId);
345      assertNotNull(source);
346      assertEquals(sizeOfSingleLogQueue, source.getSourceMetrics().getSizeOfLogQueue());
347      assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + globalLogQueueSizeInitial,
348        globalSource.getSizeOfLogQueue());
349    } finally {
350      removePeerAndWait(peerId);
351    }
352  }
353
354  @Test
355  public void testRemoveRemoteWALs() throws Exception {
356    String peerId = "2";
357    addPeerAndWait(peerId, "hbase", true);
358    // make sure that we can deal with files which does not exist
359    String walNameNotExists =
360      "remoteWAL-12345-" + peerId + ".12345" + ReplicationUtils.SYNC_WAL_SUFFIX;
361    Path wal = new Path(logDir, walNameNotExists);
362    manager.postLogRoll(wal);
363
364    Path remoteLogDirForPeer = new Path(remoteLogDir, peerId);
365    FS.mkdirs(remoteLogDirForPeer);
366    String walName = "remoteWAL-12345-" + peerId + ".23456" + ReplicationUtils.SYNC_WAL_SUFFIX;
367    Path remoteWAL =
368      new Path(remoteLogDirForPeer, walName).makeQualified(FS.getUri(), FS.getWorkingDirectory());
369    FS.create(remoteWAL).close();
370    wal = new Path(logDir, walName);
371    manager.postLogRoll(wal);
372
373    ReplicationSourceInterface source = manager.getSource(peerId);
374    manager.cleanOldLogs(walName, true, source);
375    assertFalse(FS.exists(remoteWAL));
376  }
377
378  @Test
379  public void testPeerConfigurationOverridesPropagate() throws Exception {
380    Configuration globalConf = UTIL.getConfiguration();
381    long globalSleepValue = 1000L;
382    globalConf.setLong("replication.source.sleepforretries", globalSleepValue);
383
384    long peerSleepOverride = 5000L;
385    String peerId = "testConfigOverridePeer";
386    String clusterKey = "testPeerConfigOverride";
387
388    ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder()
389      .setClusterKey(UTIL.getZkCluster().getAddress().toString() + ":/" + clusterKey)
390      .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName())
391      .putConfiguration("replication.source.sleepforretries", String.valueOf(peerSleepOverride))
392      .build();
393
394    manager.getReplicationPeers().getPeerStorage().addPeer(peerId, peerConfig, true,
395      SyncReplicationState.NONE);
396    manager.addPeer(peerId);
397    UTIL.waitFor(20000, () -> {
398      ReplicationSourceInterface rs = manager.getSource(peerId);
399      return rs != null && rs.isSourceActive();
400    });
401
402    ReplicationSource source = (ReplicationSource) manager.getSources().stream()
403      .filter(s -> s.getPeerId().equals(peerId)).findFirst().orElse(null);
404    assertNotNull("Source should be created for peer", source);
405
406    assertEquals("ReplicationSource should use peer config override for sleepForRetries",
407      peerSleepOverride, source.getSleepForRetries());
408
409    Map<String, ReplicationSourceShipper> workers = source.workerThreads;
410    if (!workers.isEmpty()) {
411      ReplicationSourceShipper shipper = workers.values().iterator().next();
412      assertEquals("ReplicationSourceShipper should use peer config override for sleepForRetries",
413        peerSleepOverride, shipper.getSleepForRetries());
414
415      ReplicationSourceWALReader reader = shipper.entryReader;
416      if (reader != null) {
417        assertEquals(
418          "ReplicationSourceWALReader should use peer config override for sleepForRetries",
419          peerSleepOverride, reader.getSleepForRetries());
420      }
421    }
422
423    removePeerAndWait(peerId);
424  }
425
426  @Test
427  public void testPeerConfigurationIsolation() throws Exception {
428    Configuration globalConf = UTIL.getConfiguration();
429    long globalSleepValue = 1000L;
430    globalConf.setLong("replication.source.sleepforretries", globalSleepValue);
431
432    // Create first peer WITH config override
433    long peerSleepOverride = 5000L;
434    String peerIdWithOverride = "peerWithOverride";
435    String clusterKeyWithOverride = "testPeerWithOverride";
436
437    ReplicationPeerConfig configWithOverride = ReplicationPeerConfig.newBuilder()
438      .setClusterKey(UTIL.getZkCluster().getAddress().toString() + ":/" + clusterKeyWithOverride)
439      .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName())
440      .putConfiguration("replication.source.sleepforretries", String.valueOf(peerSleepOverride))
441      .build();
442
443    manager.getReplicationPeers().getPeerStorage().addPeer(peerIdWithOverride, configWithOverride,
444      true, SyncReplicationState.NONE);
445    manager.addPeer(peerIdWithOverride);
446
447    // Create second peer WITHOUT config override
448    String peerIdWithoutOverride = "peerWithoutOverride";
449    String clusterKeyWithoutOverride = "testPeerWithoutOverride";
450    addPeerAndWait(peerIdWithoutOverride, clusterKeyWithoutOverride, false);
451
452    // Wait for both peers to be active
453    UTIL.waitFor(20000, () -> {
454      ReplicationSourceInterface rs1 = manager.getSource(peerIdWithOverride);
455      ReplicationSourceInterface rs2 = manager.getSource(peerIdWithoutOverride);
456      return rs1 != null && rs1.isSourceActive() && rs2 != null && rs2.isSourceActive();
457    });
458
459    // Verify peer with override uses the override value
460    ReplicationSource sourceWithOverride = (ReplicationSource) manager.getSources().stream()
461      .filter(s -> s.getPeerId().equals(peerIdWithOverride)).findFirst().orElse(null);
462    assertNotNull("Source with override should be created", sourceWithOverride);
463    assertEquals("Peer with override should use override value", peerSleepOverride,
464      sourceWithOverride.getSleepForRetries());
465
466    // Verify peer without override uses global config
467    ReplicationSource sourceWithoutOverride = (ReplicationSource) manager.getSources().stream()
468      .filter(s -> s.getPeerId().equals(peerIdWithoutOverride)).findFirst().orElse(null);
469    assertNotNull("Source without override should be created", sourceWithoutOverride);
470    assertEquals("Peer without override should use global config", globalSleepValue,
471      sourceWithoutOverride.getSleepForRetries());
472
473    removePeerAndWait(peerIdWithOverride);
474    removePeerAndWait(peerIdWithoutOverride);
475  }
476}