001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.mockito.Mockito.doNothing;
021import static org.mockito.Mockito.mock;
022import static org.mockito.Mockito.spy;
023import static org.mockito.Mockito.verify;
024import static org.mockito.Mockito.when;
025
026import java.io.IOException;
027import java.util.ArrayList;
028import java.util.HashMap;
029import java.util.List;
030import java.util.Map;
031import java.util.UUID;
032import java.util.concurrent.Callable;
033import java.util.concurrent.atomic.AtomicBoolean;
034import java.util.concurrent.atomic.AtomicInteger;
035import java.util.concurrent.atomic.AtomicReference;
036import org.apache.hadoop.hbase.Cell;
037import org.apache.hadoop.hbase.HBaseClassTestRule;
038import org.apache.hadoop.hbase.Waiter;
039import org.apache.hadoop.hbase.client.Connection;
040import org.apache.hadoop.hbase.client.ConnectionFactory;
041import org.apache.hadoop.hbase.client.Put;
042import org.apache.hadoop.hbase.client.RegionInfo;
043import org.apache.hadoop.hbase.client.Table;
044import org.apache.hadoop.hbase.regionserver.HRegion;
045import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
046import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSource;
047import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceImpl;
048import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceSource;
049import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceSourceImpl;
050import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
051import org.apache.hadoop.hbase.testclassification.MediumTests;
052import org.apache.hadoop.hbase.testclassification.ReplicationTests;
053import org.apache.hadoop.hbase.util.Bytes;
054import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
055import org.apache.hadoop.hbase.util.Threads;
056import org.apache.hadoop.hbase.wal.WAL.Entry;
057import org.apache.hadoop.hbase.zookeeper.ZKConfig;
058import org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry;
059import org.junit.AfterClass;
060import org.junit.Assert;
061import org.junit.Before;
062import org.junit.BeforeClass;
063import org.junit.ClassRule;
064import org.junit.Test;
065import org.junit.experimental.categories.Category;
066import org.slf4j.Logger;
067import org.slf4j.LoggerFactory;
068
069/**
070 * Tests ReplicationSource and ReplicationEndpoint interactions
071 */
072@Category({ ReplicationTests.class, MediumTests.class })
073public class TestReplicationEndpoint extends TestReplicationBase {
074
075  @ClassRule
076  public static final HBaseClassTestRule CLASS_RULE =
077      HBaseClassTestRule.forClass(TestReplicationEndpoint.class);
078
079  private static final Logger LOG = LoggerFactory.getLogger(TestReplicationEndpoint.class);
080
081  static int numRegionServers;
082
083  @BeforeClass
084  public static void setUpBeforeClass() throws Exception {
085    TestReplicationBase.setUpBeforeClass();
086    numRegionServers = UTIL1.getHBaseCluster().getRegionServerThreads().size();
087  }
088
089  @AfterClass
090  public static void tearDownAfterClass() throws Exception {
091    TestReplicationBase.tearDownAfterClass();
092    // check stop is called
093    Assert.assertTrue(ReplicationEndpointForTest.stoppedCount.get() > 0);
094  }
095
096  @Before
097  public void setup() throws Exception {
098    ReplicationEndpointForTest.contructedCount.set(0);
099    ReplicationEndpointForTest.startedCount.set(0);
100    ReplicationEndpointForTest.replicateCount.set(0);
101    ReplicationEndpointReturningFalse.replicated.set(false);
102    ReplicationEndpointForTest.lastEntries = null;
103    final List<RegionServerThread> rsThreads =
104        UTIL1.getMiniHBaseCluster().getRegionServerThreads();
105    for (RegionServerThread rs : rsThreads) {
106      UTIL1.getAdmin().rollWALWriter(rs.getRegionServer().getServerName());
107    }
108    // Wait for  all log roll to finish
109    UTIL1.waitFor(3000, new Waiter.ExplainingPredicate<Exception>() {
110      @Override
111      public boolean evaluate() throws Exception {
112        for (RegionServerThread rs : rsThreads) {
113          if (!rs.getRegionServer().walRollRequestFinished()) {
114            return false;
115          }
116        }
117        return true;
118      }
119
120      @Override
121      public String explainFailure() throws Exception {
122        List<String> logRollInProgressRsList = new ArrayList<>();
123        for (RegionServerThread rs : rsThreads) {
124          if (!rs.getRegionServer().walRollRequestFinished()) {
125            logRollInProgressRsList.add(rs.getRegionServer().toString());
126          }
127        }
128        return "Still waiting for log roll on regionservers: " + logRollInProgressRsList;
129      }
130    });
131  }
132
133  @Test
134  public void testCustomReplicationEndpoint() throws Exception {
135    // test installing a custom replication endpoint other than the default one.
136    hbaseAdmin.addReplicationPeer("testCustomReplicationEndpoint",
137        new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1))
138            .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()));
139
140    // check whether the class has been constructed and started
141    Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() {
142      @Override
143      public boolean evaluate() throws Exception {
144        return ReplicationEndpointForTest.contructedCount.get() >= numRegionServers;
145      }
146    });
147
148    Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() {
149      @Override
150      public boolean evaluate() throws Exception {
151        return ReplicationEndpointForTest.startedCount.get() >= numRegionServers;
152      }
153    });
154
155    Assert.assertEquals(0, ReplicationEndpointForTest.replicateCount.get());
156
157    // now replicate some data.
158    doPut(Bytes.toBytes("row42"));
159
160    Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() {
161      @Override
162      public boolean evaluate() throws Exception {
163        return ReplicationEndpointForTest.replicateCount.get() >= 1;
164      }
165    });
166
167    doAssert(Bytes.toBytes("row42"));
168
169    hbaseAdmin.removeReplicationPeer("testCustomReplicationEndpoint");
170  }
171
172  @Test
173  public void testReplicationEndpointReturnsFalseOnReplicate() throws Exception {
174    Assert.assertEquals(0, ReplicationEndpointForTest.replicateCount.get());
175    Assert.assertTrue(!ReplicationEndpointReturningFalse.replicated.get());
176    int peerCount = hbaseAdmin.listReplicationPeers().size();
177    final String id = "testReplicationEndpointReturnsFalseOnReplicate";
178    hbaseAdmin.addReplicationPeer(id,
179      new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1))
180        .setReplicationEndpointImpl(ReplicationEndpointReturningFalse.class.getName()));
181    // This test is flakey and then there is so much stuff flying around in here its, hard to
182    // debug.  Peer needs to be up for the edit to make it across. This wait on
183    // peer count seems to be a hack that has us not progress till peer is up.
184    if (hbaseAdmin.listReplicationPeers().size() <= peerCount) {
185      LOG.info("Waiting on peercount to go up from " + peerCount);
186      Threads.sleep(100);
187    }
188    // now replicate some data
189    doPut(row);
190
191    Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() {
192      @Override
193      public boolean evaluate() throws Exception {
194        // Looks like replication endpoint returns false unless we put more than 10 edits. We
195        // only send over one edit.
196        int count = ReplicationEndpointForTest.replicateCount.get();
197        LOG.info("count=" + count);
198        return ReplicationEndpointReturningFalse.replicated.get();
199      }
200    });
201    if (ReplicationEndpointReturningFalse.ex.get() != null) {
202      throw ReplicationEndpointReturningFalse.ex.get();
203    }
204
205    hbaseAdmin.removeReplicationPeer("testReplicationEndpointReturnsFalseOnReplicate");
206  }
207
208  @Test
209  public void testInterClusterReplication() throws Exception {
210    final String id = "testInterClusterReplication";
211
212    List<HRegion> regions = UTIL1.getHBaseCluster().getRegions(tableName);
213    int totEdits = 0;
214
215    // Make sure edits are spread across regions because we do region based batching
216    // before shipping edits.
217    for(HRegion region: regions) {
218      RegionInfo hri = region.getRegionInfo();
219      byte[] row = hri.getStartKey();
220      for (int i = 0; i < 100; i++) {
221        if (row.length > 0) {
222          Put put = new Put(row);
223          put.addColumn(famName, row, row);
224          region.put(put);
225          totEdits++;
226        }
227      }
228    }
229
230    hbaseAdmin.addReplicationPeer(id,
231        new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF2))
232            .setReplicationEndpointImpl(InterClusterReplicationEndpointForTest.class.getName()));
233
234    final int numEdits = totEdits;
235    Waiter.waitFor(CONF1, 30000, new Waiter.ExplainingPredicate<Exception>() {
236      @Override
237      public boolean evaluate() throws Exception {
238        return InterClusterReplicationEndpointForTest.replicateCount.get() == numEdits;
239      }
240
241      @Override
242      public String explainFailure() throws Exception {
243        String failure = "Failed to replicate all edits, expected = " + numEdits
244            + " replicated = " + InterClusterReplicationEndpointForTest.replicateCount.get();
245        return failure;
246      }
247    });
248
249    hbaseAdmin.removeReplicationPeer("testInterClusterReplication");
250    UTIL1.deleteTableData(tableName);
251  }
252
253  @Test
254  public void testWALEntryFilterFromReplicationEndpoint() throws Exception {
255    ReplicationPeerConfig rpc =
256      new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1))
257        .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName());
258    // test that we can create mutliple WALFilters reflectively
259    rpc.getConfiguration().put(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY,
260      EverythingPassesWALEntryFilter.class.getName() + "," +
261        EverythingPassesWALEntryFilterSubclass.class.getName());
262    hbaseAdmin.addReplicationPeer("testWALEntryFilterFromReplicationEndpoint", rpc);
263    // now replicate some data.
264    try (Connection connection = ConnectionFactory.createConnection(CONF1)) {
265      doPut(connection, Bytes.toBytes("row1"));
266      doPut(connection, row);
267      doPut(connection, Bytes.toBytes("row2"));
268    }
269
270    Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() {
271      @Override
272      public boolean evaluate() throws Exception {
273        return ReplicationEndpointForTest.replicateCount.get() >= 1;
274      }
275    });
276
277    Assert.assertNull(ReplicationEndpointWithWALEntryFilter.ex.get());
278    //make sure our reflectively created filter is in the filter chain
279    Assert.assertTrue(EverythingPassesWALEntryFilter.hasPassedAnEntry());
280    hbaseAdmin.removeReplicationPeer("testWALEntryFilterFromReplicationEndpoint");
281  }
282
283  @Test(expected = IOException.class)
284  public void testWALEntryFilterAddValidation() throws Exception {
285    ReplicationPeerConfig rpc =
286      new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1))
287        .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName());
288    // test that we can create mutliple WALFilters reflectively
289    rpc.getConfiguration().put(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY,
290      "IAmNotARealWalEntryFilter");
291    hbaseAdmin.addReplicationPeer("testWALEntryFilterAddValidation", rpc);
292  }
293
294  @Test(expected = IOException.class)
295  public void testWALEntryFilterUpdateValidation() throws Exception {
296    ReplicationPeerConfig rpc =
297      new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1))
298        .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName());
299    // test that we can create mutliple WALFilters reflectively
300    rpc.getConfiguration().put(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY,
301      "IAmNotARealWalEntryFilter");
302    hbaseAdmin.updateReplicationPeerConfig("testWALEntryFilterUpdateValidation", rpc);
303  }
304
305  @Test
306  public void testMetricsSourceBaseSourcePassthrough() {
307    /*
308     * The replication MetricsSource wraps a MetricsReplicationSourceSourceImpl and a
309     * MetricsReplicationGlobalSourceSource, so that metrics get written to both namespaces. Both of
310     * those classes wrap a MetricsReplicationSourceImpl that implements BaseSource, which allows
311     * for custom JMX metrics. This test checks to make sure the BaseSource decorator logic on
312     * MetricsSource actually calls down through the two layers of wrapping to the actual
313     * BaseSource.
314     */
315    String id = "id";
316    DynamicMetricsRegistry mockRegistry = mock(DynamicMetricsRegistry.class);
317    MetricsReplicationSourceImpl singleRms = mock(MetricsReplicationSourceImpl.class);
318    when(singleRms.getMetricsRegistry()).thenReturn(mockRegistry);
319    MetricsReplicationSourceImpl globalRms = mock(MetricsReplicationSourceImpl.class);
320    when(globalRms.getMetricsRegistry()).thenReturn(mockRegistry);
321
322    MetricsReplicationSourceSource singleSourceSource =
323      new MetricsReplicationSourceSourceImpl(singleRms, id);
324    MetricsReplicationSourceSource globalSourceSource =
325      new MetricsReplicationGlobalSourceSource(globalRms);
326    MetricsReplicationSourceSource spyglobalSourceSource = spy(globalSourceSource);
327    doNothing().when(spyglobalSourceSource).incrFailedRecoveryQueue();
328
329    Map<String, MetricsReplicationSourceSource> singleSourceSourceByTable = new HashMap<>();
330    MetricsSource source =
331      new MetricsSource(id, singleSourceSource, spyglobalSourceSource, singleSourceSourceByTable);
332
333
334    String gaugeName = "gauge";
335    String singleGaugeName = "source.id." + gaugeName;
336    String globalGaugeName = "source." + gaugeName;
337    long delta = 1;
338    String counterName = "counter";
339    String singleCounterName = "source.id." + counterName;
340    String globalCounterName = "source." + counterName;
341    long count = 2;
342    source.decGauge(gaugeName, delta);
343    source.getMetricsContext();
344    source.getMetricsDescription();
345    source.getMetricsJmxContext();
346    source.getMetricsName();
347    source.incCounters(counterName, count);
348    source.incGauge(gaugeName, delta);
349    source.init();
350    source.removeMetric(gaugeName);
351    source.setGauge(gaugeName, delta);
352    source.updateHistogram(counterName, count);
353    source.incrFailedRecoveryQueue();
354
355
356    verify(singleRms).decGauge(singleGaugeName, delta);
357    verify(globalRms).decGauge(globalGaugeName, delta);
358    verify(globalRms).getMetricsContext();
359    verify(globalRms).getMetricsJmxContext();
360    verify(globalRms).getMetricsName();
361    verify(singleRms).incCounters(singleCounterName, count);
362    verify(globalRms).incCounters(globalCounterName, count);
363    verify(singleRms).incGauge(singleGaugeName, delta);
364    verify(globalRms).incGauge(globalGaugeName, delta);
365    verify(globalRms).init();
366    verify(singleRms).removeMetric(singleGaugeName);
367    verify(globalRms).removeMetric(globalGaugeName);
368    verify(singleRms).setGauge(singleGaugeName, delta);
369    verify(globalRms).setGauge(globalGaugeName, delta);
370    verify(singleRms).updateHistogram(singleCounterName, count);
371    verify(globalRms).updateHistogram(globalCounterName, count);
372    verify(spyglobalSourceSource).incrFailedRecoveryQueue();
373
374    //check singleSourceSourceByTable metrics.
375    // singleSourceSourceByTable map entry will be created only
376    // after calling #setAgeOfLastShippedOpByTable
377    boolean containsRandomNewTable = source.getSingleSourceSourceByTable()
378        .containsKey("RandomNewTable");
379    Assert.assertEquals(false, containsRandomNewTable);
380    source.setAgeOfLastShippedOpByTable(123L, "RandomNewTable");
381    containsRandomNewTable = source.getSingleSourceSourceByTable()
382        .containsKey("RandomNewTable");
383    Assert.assertEquals(true, containsRandomNewTable);
384    MetricsReplicationSourceSource msr = source.getSingleSourceSourceByTable()
385        .get("RandomNewTable");
386    // cannot put more concreate value here to verify because the age is arbitrary.
387    // as long as it's greater than 0, we see it as correct answer.
388    Assert.assertTrue(msr.getLastShippedAge() > 0);
389
390  }
391
392  private void doPut(byte[] row) throws IOException {
393    try (Connection connection = ConnectionFactory.createConnection(CONF1)) {
394      doPut(connection, row);
395    }
396  }
397
398  private void doPut(final Connection connection, final byte [] row) throws IOException {
399    try (Table t = connection.getTable(tableName)) {
400      Put put = new Put(row);
401      put.addColumn(famName, row, row);
402      t.put(put);
403    }
404  }
405
406  private static void doAssert(byte[] row) throws Exception {
407    if (ReplicationEndpointForTest.lastEntries == null) {
408      return; // first call
409    }
410    Assert.assertEquals(1, ReplicationEndpointForTest.lastEntries.size());
411    List<Cell> cells = ReplicationEndpointForTest.lastEntries.get(0).getEdit().getCells();
412    Assert.assertEquals(1, cells.size());
413    Assert.assertTrue(Bytes.equals(cells.get(0).getRowArray(), cells.get(0).getRowOffset(),
414      cells.get(0).getRowLength(), row, 0, row.length));
415  }
416
417  public static class ReplicationEndpointForTest extends BaseReplicationEndpoint {
418    static UUID uuid = UTIL1.getRandomUUID();
419    static AtomicInteger contructedCount = new AtomicInteger();
420    static AtomicInteger startedCount = new AtomicInteger();
421    static AtomicInteger stoppedCount = new AtomicInteger();
422    static AtomicInteger replicateCount = new AtomicInteger();
423    static volatile List<Entry> lastEntries = null;
424
425    public ReplicationEndpointForTest() {
426      replicateCount.set(0);
427      contructedCount.incrementAndGet();
428    }
429
430    @Override
431    public UUID getPeerUUID() {
432      return uuid;
433    }
434
435    @Override
436    public boolean replicate(ReplicateContext replicateContext) {
437      replicateCount.incrementAndGet();
438      lastEntries = new ArrayList<>(replicateContext.entries);
439      return true;
440    }
441
442    @Override
443    public void start() {
444      startAsync();
445    }
446
447    @Override
448    public void stop() {
449      stopAsync();
450    }
451
452    @Override
453    protected void doStart() {
454      startedCount.incrementAndGet();
455      notifyStarted();
456    }
457
458    @Override
459    protected void doStop() {
460      stoppedCount.incrementAndGet();
461      notifyStopped();
462    }
463  }
464
465  public static class InterClusterReplicationEndpointForTest
466      extends HBaseInterClusterReplicationEndpoint {
467
468    static AtomicInteger replicateCount = new AtomicInteger();
469    static boolean failedOnce;
470
471    public InterClusterReplicationEndpointForTest() {
472      replicateCount.set(0);
473    }
474
475    @Override
476    public boolean replicate(ReplicateContext replicateContext) {
477      boolean success = super.replicate(replicateContext);
478      if (success) {
479        replicateCount.addAndGet(replicateContext.entries.size());
480      }
481      return success;
482    }
483
484    @Override
485    protected Callable<Integer> createReplicator(List<Entry> entries, int ordinal) {
486      // Fail only once, we don't want to slow down the test.
487      if (failedOnce) {
488        return () -> ordinal;
489      } else {
490        failedOnce = true;
491        return () -> {
492          throw new IOException("Sample Exception: Failed to replicate.");
493        };
494      }
495    }
496  }
497
498  public static class ReplicationEndpointReturningFalse extends ReplicationEndpointForTest {
499    static int COUNT = 10;
500    static AtomicReference<Exception> ex = new AtomicReference<>(null);
501    static AtomicBoolean replicated = new AtomicBoolean(false);
502    @Override
503    public boolean replicate(ReplicateContext replicateContext) {
504      try {
505        // check row
506        doAssert(row);
507      } catch (Exception e) {
508        ex.set(e);
509      }
510
511      super.replicate(replicateContext);
512      LOG.info("Replicated " + Bytes.toString(row) + ", count=" + replicateCount.get());
513
514      replicated.set(replicateCount.get() > COUNT); // first 10 times, we return false
515      return replicated.get();
516    }
517  }
518
519  // return a WALEntry filter which only accepts "row", but not other rows
520  public static class ReplicationEndpointWithWALEntryFilter extends ReplicationEndpointForTest {
521    static AtomicReference<Exception> ex = new AtomicReference<>(null);
522
523    @Override
524    public boolean replicate(ReplicateContext replicateContext) {
525      try {
526        super.replicate(replicateContext);
527        doAssert(row);
528      } catch (Exception e) {
529        ex.set(e);
530      }
531      return true;
532    }
533
534    @Override
535    public WALEntryFilter getWALEntryfilter() {
536      return new ChainWALEntryFilter(super.getWALEntryfilter(), new WALEntryFilter() {
537        @Override
538        public Entry filter(Entry entry) {
539          ArrayList<Cell> cells = entry.getEdit().getCells();
540          int size = cells.size();
541          for (int i = size-1; i >= 0; i--) {
542            Cell cell = cells.get(i);
543            if (!Bytes.equals(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
544              row, 0, row.length)) {
545              cells.remove(i);
546            }
547          }
548          return entry;
549        }
550      });
551    }
552  }
553
554  public static class EverythingPassesWALEntryFilter implements WALEntryFilter {
555    private static boolean passedEntry = false;
556    @Override
557    public Entry filter(Entry entry) {
558      passedEntry = true;
559      return entry;
560    }
561
562    public static boolean hasPassedAnEntry(){
563      return passedEntry;
564    }
565  }
566
567  public static class EverythingPassesWALEntryFilterSubclass
568      extends EverythingPassesWALEntryFilter {
569  }
570}