001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.hamcrest.MatcherAssert.assertThat;
021import static org.hamcrest.Matchers.hasItems;
022import static org.hamcrest.Matchers.hasSize;
023import static org.junit.Assert.assertEquals;
024import static org.junit.Assert.assertFalse;
025import static org.junit.Assert.assertNotNull;
026import static org.mockito.Mockito.mock;
027import static org.mockito.Mockito.when;
028
029import java.io.IOException;
030import java.util.Collections;
031import java.util.NavigableMap;
032import java.util.Set;
033import java.util.TreeMap;
034import java.util.stream.Collectors;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.fs.FileSystem;
037import org.apache.hadoop.fs.Path;
038import org.apache.hadoop.hbase.Cell;
039import org.apache.hadoop.hbase.CellBuilderFactory;
040import org.apache.hadoop.hbase.CellBuilderType;
041import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
042import org.apache.hadoop.hbase.HBaseClassTestRule;
043import org.apache.hadoop.hbase.HBaseTestingUtil;
044import org.apache.hadoop.hbase.HConstants;
045import org.apache.hadoop.hbase.Server;
046import org.apache.hadoop.hbase.ServerName;
047import org.apache.hadoop.hbase.TableName;
048import org.apache.hadoop.hbase.client.RegionInfo;
049import org.apache.hadoop.hbase.client.RegionInfoBuilder;
050import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter;
051import org.apache.hadoop.hbase.replication.DummyReplicationEndpoint;
052import org.apache.hadoop.hbase.replication.ReplicationException;
053import org.apache.hadoop.hbase.replication.ReplicationGroupOffset;
054import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
055import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder;
056import org.apache.hadoop.hbase.replication.ReplicationPeers;
057import org.apache.hadoop.hbase.replication.ReplicationQueueId;
058import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
059import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
060import org.apache.hadoop.hbase.replication.ReplicationUtils;
061import org.apache.hadoop.hbase.replication.SyncReplicationState;
062import org.apache.hadoop.hbase.testclassification.MediumTests;
063import org.apache.hadoop.hbase.testclassification.ReplicationTests;
064import org.apache.hadoop.hbase.util.Bytes;
065import org.apache.hadoop.hbase.util.CommonFSUtils;
066import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
067import org.apache.hadoop.hbase.wal.WAL;
068import org.apache.hadoop.hbase.wal.WALEdit;
069import org.apache.hadoop.hbase.wal.WALFactory;
070import org.apache.hadoop.hbase.wal.WALKeyImpl;
071import org.hamcrest.Matchers;
072import org.junit.After;
073import org.junit.AfterClass;
074import org.junit.Before;
075import org.junit.BeforeClass;
076import org.junit.ClassRule;
077import org.junit.Rule;
078import org.junit.Test;
079import org.junit.experimental.categories.Category;
080import org.junit.rules.TestName;
081
082import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
083
084@Category({ ReplicationTests.class, MediumTests.class })
085public class TestReplicationSourceManager {
086
087  @ClassRule
088  public static final HBaseClassTestRule CLASS_RULE =
089    HBaseClassTestRule.forClass(TestReplicationSourceManager.class);
090
091  public static final class ReplicationEndpointForTest extends DummyReplicationEndpoint {
092
093    private String clusterKey;
094
095    @Override
096    public boolean replicate(ReplicateContext replicateContext) {
097      // if you want to block the replication, for example, do not want the recovered source to be
098      // removed
099      if (clusterKey.endsWith("error")) {
100        throw new RuntimeException("Inject error");
101      }
102      return true;
103    }
104
105    @Override
106    public void init(Context context) throws IOException {
107      super.init(context);
108      this.clusterKey = context.getReplicationPeer().getPeerConfig().getClusterKey();
109    }
110
111  }
112
113  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
114
115  private static Configuration CONF;
116
117  private static FileSystem FS;
118
119  private static final byte[] F1 = Bytes.toBytes("f1");
120
121  private static final byte[] F2 = Bytes.toBytes("f2");
122
123  private static final TableName TABLE_NAME = TableName.valueOf("test");
124
125  private static RegionInfo RI;
126
127  private static NavigableMap<byte[], Integer> SCOPES;
128
129  @Rule
130  public final TestName name = new TestName();
131
132  private Path oldLogDir;
133
134  private Path logDir;
135
136  private Path remoteLogDir;
137
138  private Server server;
139
140  private Replication replication;
141
142  private ReplicationSourceManager manager;
143
144  @BeforeClass
145  public static void setUpBeforeClass() throws Exception {
146    UTIL.startMiniCluster(1);
147    FS = UTIL.getTestFileSystem();
148    CONF = new Configuration(UTIL.getConfiguration());
149    CONF.setLong("replication.sleep.before.failover", 0);
150
151    RI = RegionInfoBuilder.newBuilder(TABLE_NAME).build();
152    SCOPES = new TreeMap<>(Bytes.BYTES_COMPARATOR);
153    SCOPES.put(F1, 1);
154    SCOPES.put(F2, 0);
155  }
156
157  @AfterClass
158  public static void tearDownAfterClass() throws IOException {
159    UTIL.shutdownMiniCluster();
160  }
161
162  @Before
163  public void setUp() throws Exception {
164    Path rootDir = UTIL.getDataTestDirOnTestFS(name.getMethodName());
165    CommonFSUtils.setRootDir(CONF, rootDir);
166    server = mock(Server.class);
167    when(server.getConfiguration()).thenReturn(CONF);
168    when(server.getZooKeeper()).thenReturn(UTIL.getZooKeeperWatcher());
169    when(server.getConnection()).thenReturn(UTIL.getConnection());
170    ServerName sn = ServerName.valueOf("hostname.example.org", 1234, 1);
171    when(server.getServerName()).thenReturn(sn);
172    oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
173    FS.mkdirs(oldLogDir);
174    logDir = new Path(rootDir, HConstants.HREGION_LOGDIR_NAME);
175    FS.mkdirs(logDir);
176    remoteLogDir = new Path(rootDir, ReplicationUtils.REMOTE_WAL_DIR_NAME);
177    FS.mkdirs(remoteLogDir);
178    TableName tableName = TableName.valueOf("replication_" + name.getMethodName());
179    UTIL.getAdmin()
180      .createTable(ReplicationStorageFactory.createReplicationQueueTableDescriptor(tableName));
181    CONF.set(ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME, tableName.getNameAsString());
182
183    replication = new Replication();
184    replication.initialize(server, FS, new Path(logDir, sn.toString()), oldLogDir,
185      new WALFactory(CONF, server.getServerName(), null));
186    manager = replication.getReplicationManager();
187  }
188
189  @After
190  public void tearDown() {
191    replication.stopReplicationService();
192  }
193
194  /**
195   * Add a peer and wait for it to initialize
196   */
197  private void addPeerAndWait(String peerId, String clusterKey, boolean syncRep)
198    throws ReplicationException, IOException {
199    ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder()
200      .setClusterKey(UTIL.getZkCluster().getAddress().toString() + ":/" + clusterKey)
201      .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName());
202    if (syncRep) {
203      builder.setTableCFsMap(ImmutableMap.of(TABLE_NAME, Collections.emptyList()))
204        .setRemoteWALDir(FS.makeQualified(remoteLogDir).toString());
205    }
206
207    manager.getReplicationPeers().getPeerStorage().addPeer(peerId, builder.build(), true,
208      syncRep ? SyncReplicationState.DOWNGRADE_ACTIVE : SyncReplicationState.NONE);
209    manager.addPeer(peerId);
210    UTIL.waitFor(20000, () -> {
211      ReplicationSourceInterface rs = manager.getSource(peerId);
212      return rs != null && rs.isSourceActive();
213    });
214  }
215
216  /**
217   * Remove a peer and wait for it to get cleaned up
218   */
219  private void removePeerAndWait(String peerId) throws Exception {
220    ReplicationPeers rp = manager.getReplicationPeers();
221    rp.getPeerStorage().removePeer(peerId);
222    manager.removePeer(peerId);
223    UTIL.waitFor(20000, () -> {
224      if (rp.getPeer(peerId) != null) {
225        return false;
226      }
227      if (manager.getSource(peerId) != null) {
228        return false;
229      }
230      return manager.getOldSources().stream().noneMatch(rs -> rs.getPeerId().equals(peerId));
231    });
232  }
233
234  private void createWALFile(Path file) throws Exception {
235    ProtobufLogWriter writer = new ProtobufLogWriter();
236    try {
237      writer.init(FS, file, CONF, false, FS.getDefaultBlockSize(file), null);
238      WALKeyImpl key = new WALKeyImpl(RI.getEncodedNameAsBytes(), TABLE_NAME,
239        EnvironmentEdgeManager.currentTime(), SCOPES);
240      WALEdit edit = new WALEdit();
241      edit.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(F1).setFamily(F1)
242        .setQualifier(F1).setType(Cell.Type.Put).setValue(F1).build());
243      edit.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(F2).setFamily(F2)
244        .setQualifier(F2).setType(Cell.Type.Put).setValue(F2).build());
245      writer.append(new WAL.Entry(key, edit));
246      writer.sync(false);
247    } finally {
248      writer.close();
249    }
250  }
251
252  @Test
253  public void testClaimQueue() throws Exception {
254    String peerId = "1";
255    addPeerAndWait(peerId, "error", false);
256    ServerName serverName = ServerName.valueOf("hostname0.example.org", 12345, 123);
257    String walName1 = serverName.toString() + ".1";
258    createWALFile(new Path(oldLogDir, walName1));
259    ReplicationQueueId queueId = new ReplicationQueueId(serverName, peerId);
260    ReplicationQueueStorage queueStorage = manager.getQueueStorage();
261    queueStorage.setOffset(queueId, "", new ReplicationGroupOffset(peerId, 0),
262      Collections.emptyMap());
263    manager.claimQueue(queueId);
264    assertThat(manager.getOldSources(), hasSize(1));
265  }
266
267  @Test
268  public void testSameWALPrefix() throws IOException {
269    String walName1 = "localhost,8080,12345-45678-Peer.34567";
270    String walName2 = "localhost,8080,12345.56789";
271    manager.postLogRoll(new Path(walName1));
272    manager.postLogRoll(new Path(walName2));
273
274    Set<String> latestWals =
275      manager.getLastestPath().stream().map(Path::getName).collect(Collectors.toSet());
276    assertThat(latestWals,
277      Matchers.<Set<String>> both(hasSize(2)).and(hasItems(walName1, walName2)));
278  }
279
280  private MetricsReplicationSourceSource getGlobalSource() {
281    return CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class)
282      .getGlobalSource();
283  }
284
285  @Test
286  public void testRemovePeerMetricsCleanup() throws Exception {
287    MetricsReplicationSourceSource globalSource = getGlobalSource();
288    int globalLogQueueSizeInitial = globalSource.getSizeOfLogQueue();
289    String peerId = "DummyPeer";
290    addPeerAndWait(peerId, "hbase", false);
291    // there is no latestPaths so the size of log queue should not change
292    assertEquals(globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue());
293
294    ReplicationSourceInterface source = manager.getSource(peerId);
295    // Sanity check
296    assertNotNull(source);
297    int sizeOfSingleLogQueue = source.getSourceMetrics().getSizeOfLogQueue();
298    // Enqueue log and check if metrics updated
299    Path serverLogDir = new Path(logDir, server.getServerName().toString());
300    source.enqueueLog(new Path(serverLogDir, server.getServerName() + ".1"));
301    assertEquals(1 + sizeOfSingleLogQueue, source.getSourceMetrics().getSizeOfLogQueue());
302    assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + globalLogQueueSizeInitial,
303      globalSource.getSizeOfLogQueue());
304
305    // Removing the peer should reset the global metrics
306    removePeerAndWait(peerId);
307    assertEquals(globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue());
308
309    // Adding the same peer back again should reset the single source metrics
310    addPeerAndWait(peerId, "hbase", false);
311    source = manager.getSource(peerId);
312    assertNotNull(source);
313    assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + globalLogQueueSizeInitial,
314      globalSource.getSizeOfLogQueue());
315  }
316
317  @Test
318  public void testDisablePeerMetricsCleanup() throws Exception {
319    final String peerId = "DummyPeer";
320    try {
321      MetricsReplicationSourceSource globalSource = getGlobalSource();
322      final int globalLogQueueSizeInitial = globalSource.getSizeOfLogQueue();
323      addPeerAndWait(peerId, "hbase", false);
324      assertEquals(globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue());
325      ReplicationSourceInterface source = manager.getSource(peerId);
326      // Sanity check
327      assertNotNull(source);
328      final int sizeOfSingleLogQueue = source.getSourceMetrics().getSizeOfLogQueue();
329      // Enqueue log and check if metrics updated
330      Path serverLogDir = new Path(logDir, server.getServerName().toString());
331      source.enqueueLog(new Path(serverLogDir, server.getServerName() + ".1"));
332      assertEquals(1 + sizeOfSingleLogQueue, source.getSourceMetrics().getSizeOfLogQueue());
333      assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + globalLogQueueSizeInitial,
334        globalSource.getSizeOfLogQueue());
335
336      // Refreshing the peer should decrement the global and single source metrics
337      manager.refreshSources(peerId);
338      assertEquals(globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue());
339
340      source = manager.getSource(peerId);
341      assertNotNull(source);
342      assertEquals(sizeOfSingleLogQueue, source.getSourceMetrics().getSizeOfLogQueue());
343      assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + globalLogQueueSizeInitial,
344        globalSource.getSizeOfLogQueue());
345    } finally {
346      removePeerAndWait(peerId);
347    }
348  }
349
350  @Test
351  public void testRemoveRemoteWALs() throws Exception {
352    String peerId = "2";
353    addPeerAndWait(peerId, "hbase", true);
354    // make sure that we can deal with files which does not exist
355    String walNameNotExists =
356      "remoteWAL-12345-" + peerId + ".12345" + ReplicationUtils.SYNC_WAL_SUFFIX;
357    Path wal = new Path(logDir, walNameNotExists);
358    manager.postLogRoll(wal);
359
360    Path remoteLogDirForPeer = new Path(remoteLogDir, peerId);
361    FS.mkdirs(remoteLogDirForPeer);
362    String walName = "remoteWAL-12345-" + peerId + ".23456" + ReplicationUtils.SYNC_WAL_SUFFIX;
363    Path remoteWAL =
364      new Path(remoteLogDirForPeer, walName).makeQualified(FS.getUri(), FS.getWorkingDirectory());
365    FS.create(remoteWAL).close();
366    wal = new Path(logDir, walName);
367    manager.postLogRoll(wal);
368
369    ReplicationSourceInterface source = manager.getSource(peerId);
370    manager.cleanOldLogs(walName, true, source);
371    assertFalse(FS.exists(remoteWAL));
372  }
373}