001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.mockito.Mockito.doNothing;
021import static org.mockito.Mockito.mock;
022import static org.mockito.Mockito.spy;
023import static org.mockito.Mockito.verify;
024import static org.mockito.Mockito.when;
025
026import java.io.IOException;
027import java.util.ArrayList;
028import java.util.HashMap;
029import java.util.List;
030import java.util.Map;
031import java.util.UUID;
032import java.util.concurrent.Callable;
033import java.util.concurrent.atomic.AtomicBoolean;
034import java.util.concurrent.atomic.AtomicInteger;
035import java.util.concurrent.atomic.AtomicReference;
036import org.apache.hadoop.hbase.Cell;
037import org.apache.hadoop.hbase.HBaseClassTestRule;
038import org.apache.hadoop.hbase.Waiter;
039import org.apache.hadoop.hbase.client.Connection;
040import org.apache.hadoop.hbase.client.ConnectionFactory;
041import org.apache.hadoop.hbase.client.Put;
042import org.apache.hadoop.hbase.client.RegionInfo;
043import org.apache.hadoop.hbase.client.Table;
044import org.apache.hadoop.hbase.regionserver.HRegion;
045import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
046import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSource;
047import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceImpl;
048import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceSource;
049import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceSourceImpl;
050import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
051import org.apache.hadoop.hbase.testclassification.MediumTests;
052import org.apache.hadoop.hbase.testclassification.ReplicationTests;
053import org.apache.hadoop.hbase.util.Bytes;
054import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
055import org.apache.hadoop.hbase.util.Threads;
056import org.apache.hadoop.hbase.wal.WAL.Entry;
057import org.apache.hadoop.hbase.zookeeper.ZKConfig;
058import org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry;
059import org.junit.AfterClass;
060import org.junit.Assert;
061import org.junit.Before;
062import org.junit.BeforeClass;
063import org.junit.ClassRule;
064import org.junit.Test;
065import org.junit.experimental.categories.Category;
066import org.slf4j.Logger;
067import org.slf4j.LoggerFactory;
068
069/**
070 * Tests ReplicationSource and ReplicationEndpoint interactions
071 */
072@Category({ ReplicationTests.class, MediumTests.class })
073public class TestReplicationEndpoint extends TestReplicationBase {
074
075  @ClassRule
076  public static final HBaseClassTestRule CLASS_RULE =
077      HBaseClassTestRule.forClass(TestReplicationEndpoint.class);
078
079  private static final Logger LOG = LoggerFactory.getLogger(TestReplicationEndpoint.class);
080
081  static int numRegionServers;
082
083  @BeforeClass
084  public static void setUpBeforeClass() throws Exception {
085    TestReplicationBase.setUpBeforeClass();
086    numRegionServers = utility1.getHBaseCluster().getRegionServerThreads().size();
087  }
088
089  @AfterClass
090  public static void tearDownAfterClass() throws Exception {
091    TestReplicationBase.tearDownAfterClass();
092    // check stop is called
093    Assert.assertTrue(ReplicationEndpointForTest.stoppedCount.get() > 0);
094  }
095
096  @Before
097  public void setup() throws Exception {
098    ReplicationEndpointForTest.contructedCount.set(0);
099    ReplicationEndpointForTest.startedCount.set(0);
100    ReplicationEndpointForTest.replicateCount.set(0);
101    ReplicationEndpointReturningFalse.replicated.set(false);
102    ReplicationEndpointForTest.lastEntries = null;
103    final List<RegionServerThread> rsThreads =
104        utility1.getMiniHBaseCluster().getRegionServerThreads();
105    for (RegionServerThread rs : rsThreads) {
106      utility1.getAdmin().rollWALWriter(rs.getRegionServer().getServerName());
107    }
108    // Wait for  all log roll to finish
109    utility1.waitFor(3000, new Waiter.ExplainingPredicate<Exception>() {
110      @Override
111      public boolean evaluate() throws Exception {
112        for (RegionServerThread rs : rsThreads) {
113          if (!rs.getRegionServer().walRollRequestFinished()) {
114            return false;
115          }
116        }
117        return true;
118      }
119
120      @Override
121      public String explainFailure() throws Exception {
122        List<String> logRollInProgressRsList = new ArrayList<>();
123        for (RegionServerThread rs : rsThreads) {
124          if (!rs.getRegionServer().walRollRequestFinished()) {
125            logRollInProgressRsList.add(rs.getRegionServer().toString());
126          }
127        }
128        return "Still waiting for log roll on regionservers: " + logRollInProgressRsList;
129      }
130    });
131  }
132
133  @Test
134  public void testCustomReplicationEndpoint() throws Exception {
135    // test installing a custom replication endpoint other than the default one.
136    admin.addPeer("testCustomReplicationEndpoint",
137        new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1))
138            .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()), null);
139
140    // check whether the class has been constructed and started
141    Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() {
142      @Override
143      public boolean evaluate() throws Exception {
144        return ReplicationEndpointForTest.contructedCount.get() >= numRegionServers;
145      }
146    });
147
148    Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() {
149      @Override
150      public boolean evaluate() throws Exception {
151        return ReplicationEndpointForTest.startedCount.get() >= numRegionServers;
152      }
153    });
154
155    Assert.assertEquals(0, ReplicationEndpointForTest.replicateCount.get());
156
157    // now replicate some data.
158    doPut(Bytes.toBytes("row42"));
159
160    Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() {
161      @Override
162      public boolean evaluate() throws Exception {
163        return ReplicationEndpointForTest.replicateCount.get() >= 1;
164      }
165    });
166
167    doAssert(Bytes.toBytes("row42"));
168
169    admin.removePeer("testCustomReplicationEndpoint");
170  }
171
172  @Test
173  public void testReplicationEndpointReturnsFalseOnReplicate() throws Exception {
174    Assert.assertEquals(0, ReplicationEndpointForTest.replicateCount.get());
175    Assert.assertTrue(!ReplicationEndpointReturningFalse.replicated.get());
176    int peerCount = admin.getPeersCount();
177    final String id = "testReplicationEndpointReturnsFalseOnReplicate";
178    admin.addPeer(id,
179      new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1))
180        .setReplicationEndpointImpl(ReplicationEndpointReturningFalse.class.getName()), null);
181    // This test is flakey and then there is so much stuff flying around in here its, hard to
182    // debug.  Peer needs to be up for the edit to make it across. This wait on
183    // peer count seems to be a hack that has us not progress till peer is up.
184    if (admin.getPeersCount() <= peerCount) {
185      LOG.info("Waiting on peercount to go up from " + peerCount);
186      Threads.sleep(100);
187    }
188    // now replicate some data
189    doPut(row);
190
191    Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() {
192      @Override
193      public boolean evaluate() throws Exception {
194        // Looks like replication endpoint returns false unless we put more than 10 edits. We
195        // only send over one edit.
196        int count = ReplicationEndpointForTest.replicateCount.get();
197        LOG.info("count=" + count);
198        return ReplicationEndpointReturningFalse.replicated.get();
199      }
200    });
201    if (ReplicationEndpointReturningFalse.ex.get() != null) {
202      throw ReplicationEndpointReturningFalse.ex.get();
203    }
204
205    admin.removePeer("testReplicationEndpointReturnsFalseOnReplicate");
206  }
207
208  @Test
209  public void testInterClusterReplication() throws Exception {
210    final String id = "testInterClusterReplication";
211
212    List<HRegion> regions = utility1.getHBaseCluster().getRegions(tableName);
213    int totEdits = 0;
214
215    // Make sure edits are spread across regions because we do region based batching
216    // before shipping edits.
217    for(HRegion region: regions) {
218      RegionInfo hri = region.getRegionInfo();
219      byte[] row = hri.getStartKey();
220      for (int i = 0; i < 100; i++) {
221        if (row.length > 0) {
222          Put put = new Put(row);
223          put.addColumn(famName, row, row);
224          region.put(put);
225          totEdits++;
226        }
227      }
228    }
229
230    admin.addPeer(id,
231        new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf2))
232            .setReplicationEndpointImpl(InterClusterReplicationEndpointForTest.class.getName()),
233        null);
234
235    final int numEdits = totEdits;
236    Waiter.waitFor(conf1, 30000, new Waiter.ExplainingPredicate<Exception>() {
237      @Override
238      public boolean evaluate() throws Exception {
239        return InterClusterReplicationEndpointForTest.replicateCount.get() == numEdits;
240      }
241
242      @Override
243      public String explainFailure() throws Exception {
244        String failure = "Failed to replicate all edits, expected = " + numEdits
245            + " replicated = " + InterClusterReplicationEndpointForTest.replicateCount.get();
246        return failure;
247      }
248    });
249
250    admin.removePeer("testInterClusterReplication");
251    utility1.deleteTableData(tableName);
252  }
253
254  @Test
255  public void testWALEntryFilterFromReplicationEndpoint() throws Exception {
256    ReplicationPeerConfig rpc =  new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1))
257        .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName());
258    //test that we can create mutliple WALFilters reflectively
259    rpc.getConfiguration().put(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY,
260        EverythingPassesWALEntryFilter.class.getName() +
261            "," + EverythingPassesWALEntryFilterSubclass.class.getName());
262    admin.addPeer("testWALEntryFilterFromReplicationEndpoint", rpc);
263    // now replicate some data.
264    try (Connection connection = ConnectionFactory.createConnection(conf1)) {
265      doPut(connection, Bytes.toBytes("row1"));
266      doPut(connection, row);
267      doPut(connection, Bytes.toBytes("row2"));
268    }
269
270    Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() {
271      @Override
272      public boolean evaluate() throws Exception {
273        return ReplicationEndpointForTest.replicateCount.get() >= 1;
274      }
275    });
276
277    Assert.assertNull(ReplicationEndpointWithWALEntryFilter.ex.get());
278    //make sure our reflectively created filter is in the filter chain
279    Assert.assertTrue(EverythingPassesWALEntryFilter.hasPassedAnEntry());
280    admin.removePeer("testWALEntryFilterFromReplicationEndpoint");
281  }
282
283  @Test (expected=IOException.class)
284  public void testWALEntryFilterAddValidation() throws Exception {
285    ReplicationPeerConfig rpc =  new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1))
286        .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName());
287    //test that we can create mutliple WALFilters reflectively
288    rpc.getConfiguration().put(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY,
289        "IAmNotARealWalEntryFilter");
290    admin.addPeer("testWALEntryFilterAddValidation", rpc);
291  }
292
293  @Test (expected=IOException.class)
294  public void testWALEntryFilterUpdateValidation() throws Exception {
295    ReplicationPeerConfig rpc =  new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1))
296        .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName());
297    //test that we can create mutliple WALFilters reflectively
298    rpc.getConfiguration().put(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY,
299        "IAmNotARealWalEntryFilter");
300    admin.updatePeerConfig("testWALEntryFilterUpdateValidation", rpc);
301  }
302
303
304  @Test
305  public void testMetricsSourceBaseSourcePassthrough(){
306    /*
307    The replication MetricsSource wraps a MetricsReplicationSourceSourceImpl
308    and a MetricsReplicationGlobalSourceSource, so that metrics get written to both namespaces.
309    Both of those classes wrap a MetricsReplicationSourceImpl that implements BaseSource, which
310    allows for custom JMX metrics.
311    This test checks to make sure the BaseSource decorator logic on MetricsSource actually calls down through
312    the two layers of wrapping to the actual BaseSource.
313    */
314    String id = "id";
315    DynamicMetricsRegistry mockRegistry = mock(DynamicMetricsRegistry.class);
316    MetricsReplicationSourceImpl singleRms = mock(MetricsReplicationSourceImpl.class);
317    when(singleRms.getMetricsRegistry()).thenReturn(mockRegistry);
318    MetricsReplicationSourceImpl globalRms = mock(MetricsReplicationSourceImpl.class);
319    when(globalRms.getMetricsRegistry()).thenReturn(mockRegistry);
320
321
322    MetricsReplicationSourceSource singleSourceSource = new MetricsReplicationSourceSourceImpl(singleRms, id);
323    MetricsReplicationSourceSource globalSourceSource = new MetricsReplicationGlobalSourceSource(globalRms);
324    MetricsReplicationSourceSource spyglobalSourceSource = spy(globalSourceSource);
325    doNothing().when(spyglobalSourceSource).incrFailedRecoveryQueue();
326
327    Map<String, MetricsReplicationSourceSource> singleSourceSourceByTable = new HashMap<>();
328    MetricsSource source = new MetricsSource(id, singleSourceSource, spyglobalSourceSource,
329        singleSourceSourceByTable);
330
331
332    String gaugeName = "gauge";
333    String singleGaugeName = "source.id." + gaugeName;
334    String globalGaugeName = "source." + gaugeName;
335    long delta = 1;
336    String counterName = "counter";
337    String singleCounterName = "source.id." + counterName;
338    String globalCounterName = "source." + counterName;
339    long count = 2;
340    source.decGauge(gaugeName, delta);
341    source.getMetricsContext();
342    source.getMetricsDescription();
343    source.getMetricsJmxContext();
344    source.getMetricsName();
345    source.incCounters(counterName, count);
346    source.incGauge(gaugeName, delta);
347    source.init();
348    source.removeMetric(gaugeName);
349    source.setGauge(gaugeName, delta);
350    source.updateHistogram(counterName, count);
351    source.incrFailedRecoveryQueue();
352
353
354    verify(singleRms).decGauge(singleGaugeName, delta);
355    verify(globalRms).decGauge(globalGaugeName, delta);
356    verify(globalRms).getMetricsContext();
357    verify(globalRms).getMetricsJmxContext();
358    verify(globalRms).getMetricsName();
359    verify(singleRms).incCounters(singleCounterName, count);
360    verify(globalRms).incCounters(globalCounterName, count);
361    verify(singleRms).incGauge(singleGaugeName, delta);
362    verify(globalRms).incGauge(globalGaugeName, delta);
363    verify(globalRms).init();
364    verify(singleRms).removeMetric(singleGaugeName);
365    verify(globalRms).removeMetric(globalGaugeName);
366    verify(singleRms).setGauge(singleGaugeName, delta);
367    verify(globalRms).setGauge(globalGaugeName, delta);
368    verify(singleRms).updateHistogram(singleCounterName, count);
369    verify(globalRms).updateHistogram(globalCounterName, count);
370    verify(spyglobalSourceSource).incrFailedRecoveryQueue();
371
372    //check singleSourceSourceByTable metrics.
373    // singleSourceSourceByTable map entry will be created only
374    // after calling #setAgeOfLastShippedOpByTable
375    boolean containsRandomNewTable = source.getSingleSourceSourceByTable()
376        .containsKey("RandomNewTable");
377    Assert.assertEquals(false, containsRandomNewTable);
378    source.setAgeOfLastShippedOpByTable(123L, "RandomNewTable");
379    containsRandomNewTable = source.getSingleSourceSourceByTable()
380        .containsKey("RandomNewTable");
381    Assert.assertEquals(true, containsRandomNewTable);
382    MetricsReplicationSourceSource msr = source.getSingleSourceSourceByTable()
383        .get("RandomNewTable");
384    // cannot put more concreate value here to verify because the age is arbitrary.
385    // as long as it's greater than 0, we see it as correct answer.
386    Assert.assertTrue(msr.getLastShippedAge() > 0);
387
388  }
389
390  private void doPut(byte[] row) throws IOException {
391    try (Connection connection = ConnectionFactory.createConnection(conf1)) {
392      doPut(connection, row);
393    }
394  }
395
396  private void doPut(final Connection connection, final byte [] row) throws IOException {
397    try (Table t = connection.getTable(tableName)) {
398      Put put = new Put(row);
399      put.addColumn(famName, row, row);
400      t.put(put);
401    }
402  }
403
404  private static void doAssert(byte[] row) throws Exception {
405    if (ReplicationEndpointForTest.lastEntries == null) {
406      return; // first call
407    }
408    Assert.assertEquals(1, ReplicationEndpointForTest.lastEntries.size());
409    List<Cell> cells = ReplicationEndpointForTest.lastEntries.get(0).getEdit().getCells();
410    Assert.assertEquals(1, cells.size());
411    Assert.assertTrue(Bytes.equals(cells.get(0).getRowArray(), cells.get(0).getRowOffset(),
412      cells.get(0).getRowLength(), row, 0, row.length));
413  }
414
415  public static class ReplicationEndpointForTest extends BaseReplicationEndpoint {
416    static UUID uuid = utility1.getRandomUUID();
417    static AtomicInteger contructedCount = new AtomicInteger();
418    static AtomicInteger startedCount = new AtomicInteger();
419    static AtomicInteger stoppedCount = new AtomicInteger();
420    static AtomicInteger replicateCount = new AtomicInteger();
421    static volatile List<Entry> lastEntries = null;
422
423    public ReplicationEndpointForTest() {
424      replicateCount.set(0);
425      contructedCount.incrementAndGet();
426    }
427
428    @Override
429    public UUID getPeerUUID() {
430      return uuid;
431    }
432
433    @Override
434    public boolean replicate(ReplicateContext replicateContext) {
435      replicateCount.incrementAndGet();
436      lastEntries = new ArrayList<>(replicateContext.entries);
437      return true;
438    }
439
440    @Override
441    public void start() {
442      startAsync();
443    }
444
445    @Override
446    public void stop() {
447      stopAsync();
448    }
449
450    @Override
451    protected void doStart() {
452      startedCount.incrementAndGet();
453      notifyStarted();
454    }
455
456    @Override
457    protected void doStop() {
458      stoppedCount.incrementAndGet();
459      notifyStopped();
460    }
461  }
462
463  public static class InterClusterReplicationEndpointForTest
464      extends HBaseInterClusterReplicationEndpoint {
465
466    static AtomicInteger replicateCount = new AtomicInteger();
467    static boolean failedOnce;
468
469    public InterClusterReplicationEndpointForTest() {
470      replicateCount.set(0);
471    }
472
473    @Override
474    public boolean replicate(ReplicateContext replicateContext) {
475      boolean success = super.replicate(replicateContext);
476      if (success) {
477        replicateCount.addAndGet(replicateContext.entries.size());
478      }
479      return success;
480    }
481
482    @Override
483    protected Callable<Integer> createReplicator(List<Entry> entries, int ordinal) {
484      // Fail only once, we don't want to slow down the test.
485      if (failedOnce) {
486        return () -> ordinal;
487      } else {
488        failedOnce = true;
489        return () -> {
490          throw new IOException("Sample Exception: Failed to replicate.");
491        };
492      }
493    }
494  }
495
496  public static class ReplicationEndpointReturningFalse extends ReplicationEndpointForTest {
497    static int COUNT = 10;
498    static AtomicReference<Exception> ex = new AtomicReference<>(null);
499    static AtomicBoolean replicated = new AtomicBoolean(false);
500    @Override
501    public boolean replicate(ReplicateContext replicateContext) {
502      try {
503        // check row
504        doAssert(row);
505      } catch (Exception e) {
506        ex.set(e);
507      }
508
509      super.replicate(replicateContext);
510      LOG.info("Replicated " + Bytes.toString(row) + ", count=" + replicateCount.get());
511
512      replicated.set(replicateCount.get() > COUNT); // first 10 times, we return false
513      return replicated.get();
514    }
515  }
516
517  // return a WALEntry filter which only accepts "row", but not other rows
518  public static class ReplicationEndpointWithWALEntryFilter extends ReplicationEndpointForTest {
519    static AtomicReference<Exception> ex = new AtomicReference<>(null);
520
521    @Override
522    public boolean replicate(ReplicateContext replicateContext) {
523      try {
524        super.replicate(replicateContext);
525        doAssert(row);
526      } catch (Exception e) {
527        ex.set(e);
528      }
529      return true;
530    }
531
532    @Override
533    public WALEntryFilter getWALEntryfilter() {
534      return new ChainWALEntryFilter(super.getWALEntryfilter(), new WALEntryFilter() {
535        @Override
536        public Entry filter(Entry entry) {
537          ArrayList<Cell> cells = entry.getEdit().getCells();
538          int size = cells.size();
539          for (int i = size-1; i >= 0; i--) {
540            Cell cell = cells.get(i);
541            if (!Bytes.equals(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
542              row, 0, row.length)) {
543              cells.remove(i);
544            }
545          }
546          return entry;
547        }
548      });
549    }
550  }
551
552  public static class EverythingPassesWALEntryFilter implements WALEntryFilter {
553    private static boolean passedEntry = false;
554    @Override
555    public Entry filter(Entry entry) {
556      passedEntry = true;
557      return entry;
558    }
559
560    public static boolean hasPassedAnEntry(){
561      return passedEntry;
562    }
563  }
564
565  public static class EverythingPassesWALEntryFilterSubclass extends EverythingPassesWALEntryFilter {
566
567  }
568}