001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.mockito.Mockito.doNothing;
021import static org.mockito.Mockito.mock;
022import static org.mockito.Mockito.spy;
023import static org.mockito.Mockito.verify;
024import static org.mockito.Mockito.when;
025
026import java.io.IOException;
027import java.util.ArrayList;
028import java.util.HashMap;
029import java.util.List;
030import java.util.Map;
031import java.util.TreeMap;
032import java.util.UUID;
033import java.util.concurrent.Callable;
034import java.util.concurrent.atomic.AtomicBoolean;
035import java.util.concurrent.atomic.AtomicInteger;
036import java.util.concurrent.atomic.AtomicReference;
037import org.apache.hadoop.hbase.Cell;
038import org.apache.hadoop.hbase.HBaseClassTestRule;
039import org.apache.hadoop.hbase.KeyValue;
040import org.apache.hadoop.hbase.TableName;
041import org.apache.hadoop.hbase.Waiter;
042import org.apache.hadoop.hbase.client.Connection;
043import org.apache.hadoop.hbase.client.ConnectionFactory;
044import org.apache.hadoop.hbase.client.Put;
045import org.apache.hadoop.hbase.client.RegionInfo;
046import org.apache.hadoop.hbase.client.Table;
047import org.apache.hadoop.hbase.regionserver.HRegion;
048import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
049import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSource;
050import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSourceImpl;
051import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceImpl;
052import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceSource;
053import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceSourceImpl;
054import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationTableSource;
055import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
056import org.apache.hadoop.hbase.testclassification.MediumTests;
057import org.apache.hadoop.hbase.testclassification.ReplicationTests;
058import org.apache.hadoop.hbase.util.Bytes;
059import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
060import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
061import org.apache.hadoop.hbase.util.Pair;
062import org.apache.hadoop.hbase.util.Threads;
063import org.apache.hadoop.hbase.wal.WAL.Entry;
064import org.apache.hadoop.hbase.wal.WALEdit;
065import org.apache.hadoop.hbase.wal.WALKeyImpl;
066import org.apache.hadoop.hbase.zookeeper.ZKConfig;
067import org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry;
068import org.junit.AfterClass;
069import org.junit.Assert;
070import org.junit.Before;
071import org.junit.BeforeClass;
072import org.junit.ClassRule;
073import org.junit.Test;
074import org.junit.experimental.categories.Category;
075import org.slf4j.Logger;
076import org.slf4j.LoggerFactory;
077
078/**
079 * Tests ReplicationSource and ReplicationEndpoint interactions
080 */
081@Category({ ReplicationTests.class, MediumTests.class })
082public class TestReplicationEndpoint extends TestReplicationBase {
083
084  @ClassRule
085  public static final HBaseClassTestRule CLASS_RULE =
086    HBaseClassTestRule.forClass(TestReplicationEndpoint.class);
087
088  private static final Logger LOG = LoggerFactory.getLogger(TestReplicationEndpoint.class);
089
090  static int numRegionServers;
091
092  @BeforeClass
093  public static void setUpBeforeClass() throws Exception {
094    TestReplicationBase.setUpBeforeClass();
095    numRegionServers = UTIL1.getHBaseCluster().getRegionServerThreads().size();
096  }
097
098  @AfterClass
099  public static void tearDownAfterClass() throws Exception {
100    TestReplicationBase.tearDownAfterClass();
101    // check stop is called
102    Assert.assertTrue(ReplicationEndpointForTest.stoppedCount.get() > 0);
103  }
104
105  @Before
106  public void setup() throws Exception {
107    ReplicationEndpointForTest.contructedCount.set(0);
108    ReplicationEndpointForTest.startedCount.set(0);
109    ReplicationEndpointForTest.replicateCount.set(0);
110    ReplicationEndpointReturningFalse.replicated.set(false);
111    ReplicationEndpointForTest.lastEntries = null;
112    final List<RegionServerThread> rsThreads = UTIL1.getMiniHBaseCluster().getRegionServerThreads();
113    for (RegionServerThread rs : rsThreads) {
114      UTIL1.getAdmin().rollWALWriter(rs.getRegionServer().getServerName());
115    }
116    // Wait for all log roll to finish
117    UTIL1.waitFor(3000, new Waiter.ExplainingPredicate<Exception>() {
118      @Override
119      public boolean evaluate() throws Exception {
120        for (RegionServerThread rs : rsThreads) {
121          if (!rs.getRegionServer().walRollRequestFinished()) {
122            return false;
123          }
124        }
125        return true;
126      }
127
128      @Override
129      public String explainFailure() throws Exception {
130        List<String> logRollInProgressRsList = new ArrayList<>();
131        for (RegionServerThread rs : rsThreads) {
132          if (!rs.getRegionServer().walRollRequestFinished()) {
133            logRollInProgressRsList.add(rs.getRegionServer().toString());
134          }
135        }
136        return "Still waiting for log roll on regionservers: " + logRollInProgressRsList;
137      }
138    });
139  }
140
141  @Test
142  public void testCustomReplicationEndpoint() throws Exception {
143    // test installing a custom replication endpoint other than the default one.
144    admin.addPeer("testCustomReplicationEndpoint",
145      new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1))
146        .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()),
147      null);
148
149    // check whether the class has been constructed and started
150    Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() {
151      @Override
152      public boolean evaluate() throws Exception {
153        return ReplicationEndpointForTest.contructedCount.get() >= numRegionServers;
154      }
155    });
156
157    Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() {
158      @Override
159      public boolean evaluate() throws Exception {
160        return ReplicationEndpointForTest.startedCount.get() >= numRegionServers;
161      }
162    });
163
164    Assert.assertEquals(0, ReplicationEndpointForTest.replicateCount.get());
165
166    // now replicate some data.
167    doPut(Bytes.toBytes("row42"));
168
169    Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() {
170      @Override
171      public boolean evaluate() throws Exception {
172        return ReplicationEndpointForTest.replicateCount.get() >= 1;
173      }
174    });
175
176    doAssert(Bytes.toBytes("row42"));
177
178    admin.removePeer("testCustomReplicationEndpoint");
179  }
180
181  @Test
182  public void testReplicationEndpointReturnsFalseOnReplicate() throws Exception {
183    Assert.assertEquals(0, ReplicationEndpointForTest.replicateCount.get());
184    Assert.assertTrue(!ReplicationEndpointReturningFalse.replicated.get());
185    int peerCount = admin.getPeersCount();
186    final String id = "testReplicationEndpointReturnsFalseOnReplicate";
187    admin.addPeer(id,
188      new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1))
189        .setReplicationEndpointImpl(ReplicationEndpointReturningFalse.class.getName()),
190      null);
191    // This test is flakey and then there is so much stuff flying around in here its, hard to
192    // debug. Peer needs to be up for the edit to make it across. This wait on
193    // peer count seems to be a hack that has us not progress till peer is up.
194    if (admin.getPeersCount() <= peerCount) {
195      LOG.info("Waiting on peercount to go up from " + peerCount);
196      Threads.sleep(100);
197    }
198    // now replicate some data
199    doPut(row);
200
201    Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() {
202      @Override
203      public boolean evaluate() throws Exception {
204        // Looks like replication endpoint returns false unless we put more than 10 edits. We
205        // only send over one edit.
206        int count = ReplicationEndpointForTest.replicateCount.get();
207        LOG.info("count=" + count);
208        return ReplicationEndpointReturningFalse.replicated.get();
209      }
210    });
211    if (ReplicationEndpointReturningFalse.ex.get() != null) {
212      throw ReplicationEndpointReturningFalse.ex.get();
213    }
214
215    admin.removePeer("testReplicationEndpointReturnsFalseOnReplicate");
216  }
217
218  @Test
219  public void testInterClusterReplication() throws Exception {
220    final String id = "testInterClusterReplication";
221
222    List<HRegion> regions = UTIL1.getHBaseCluster().getRegions(tableName);
223    // This trick of waiting on peer to show up is taken from test above.
224    int peerCount = admin.getPeersCount();
225    admin
226      .addPeer(id,
227        new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF2))
228          .setReplicationEndpointImpl(InterClusterReplicationEndpointForTest.class.getName()),
229        null);
230    // This test is flakey and then there is so much stuff flying around in here its, hard to
231    // debug. Peer needs to be up for the edit to make it across. This wait on
232    // peer count seems to be a hack that has us not progress till peer is up.
233    if (admin.getPeersCount() <= peerCount) {
234      LOG.info("Waiting on peercount to go up from " + peerCount);
235      Threads.sleep(100);
236    }
237
238    int totEdits = 0;
239
240    // Make sure edits are spread across regions because we do region based batching
241    // before shipping edits.
242    for (HRegion region : regions) {
243      RegionInfo hri = region.getRegionInfo();
244      byte[] row = hri.getStartKey();
245      for (int i = 0; i < 100; i++) {
246        if (row.length > 0) {
247          Put put = new Put(row);
248          put.addColumn(famName, row, row);
249          region.put(put);
250          totEdits++;
251        }
252      }
253    }
254
255    final int numEdits = totEdits;
256    LOG.info("Waiting on replication of {}", numEdits);
257    Waiter.waitFor(CONF1, 30000, new Waiter.ExplainingPredicate<Exception>() {
258      @Override
259      public boolean evaluate() throws Exception {
260        return InterClusterReplicationEndpointForTest.replicateCount.get() == numEdits;
261      }
262
263      @Override
264      public String explainFailure() throws Exception {
265        String failure = "Failed to replicate all edits, expected = " + numEdits + " replicated = "
266          + InterClusterReplicationEndpointForTest.replicateCount.get();
267        return failure;
268      }
269    });
270
271    admin.removePeer("testInterClusterReplication");
272    UTIL1.deleteTableData(tableName);
273  }
274
275  @Test
276  public void testWALEntryFilterFromReplicationEndpoint() throws Exception {
277    ReplicationPeerConfig rpc =
278      new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1))
279        .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName());
280    // test that we can create mutliple WALFilters reflectively
281    rpc.getConfiguration().put(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY,
282      EverythingPassesWALEntryFilter.class.getName() + ","
283        + EverythingPassesWALEntryFilterSubclass.class.getName());
284    admin.addPeer("testWALEntryFilterFromReplicationEndpoint", rpc);
285    // now replicate some data.
286    try (Connection connection = ConnectionFactory.createConnection(CONF1)) {
287      doPut(connection, Bytes.toBytes("row1"));
288      doPut(connection, row);
289      doPut(connection, Bytes.toBytes("row2"));
290    }
291
292    Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() {
293      @Override
294      public boolean evaluate() throws Exception {
295        return ReplicationEndpointForTest.replicateCount.get() >= 1;
296      }
297    });
298
299    Assert.assertNull(ReplicationEndpointWithWALEntryFilter.ex.get());
300    // make sure our reflectively created filter is in the filter chain
301    Assert.assertTrue(EverythingPassesWALEntryFilter.hasPassedAnEntry());
302    admin.removePeer("testWALEntryFilterFromReplicationEndpoint");
303  }
304
305  @Test(expected = IOException.class)
306  public void testWALEntryFilterAddValidation() throws Exception {
307    ReplicationPeerConfig rpc =
308      new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1))
309        .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName());
310    // test that we can create mutliple WALFilters reflectively
311    rpc.getConfiguration().put(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY,
312      "IAmNotARealWalEntryFilter");
313    admin.addPeer("testWALEntryFilterAddValidation", rpc);
314  }
315
316  @Test(expected = IOException.class)
317  public void testWALEntryFilterUpdateValidation() throws Exception {
318    ReplicationPeerConfig rpc =
319      new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1))
320        .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName());
321    // test that we can create mutliple WALFilters reflectively
322    rpc.getConfiguration().put(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY,
323      "IAmNotARealWalEntryFilter");
324    admin.updatePeerConfig("testWALEntryFilterUpdateValidation", rpc);
325  }
326
327  @Test
328  public void testMetricsSourceBaseSourcePassThrough() {
329    /*
330     * The replication MetricsSource wraps a MetricsReplicationTableSourceImpl,
331     * MetricsReplicationSourceSourceImpl and a MetricsReplicationGlobalSourceSource, so that
332     * metrics get written to both namespaces. Both of those classes wrap a
333     * MetricsReplicationSourceImpl that implements BaseSource, which allows for custom JMX metrics.
334     * This test checks to make sure the BaseSource decorator logic on MetricsSource actually calls
335     * down through the two layers of wrapping to the actual BaseSource.
336     */
337    String id = "id";
338    DynamicMetricsRegistry mockRegistry = mock(DynamicMetricsRegistry.class);
339    MetricsReplicationSourceImpl singleRms = mock(MetricsReplicationSourceImpl.class);
340    when(singleRms.getMetricsRegistry()).thenReturn(mockRegistry);
341    MetricsReplicationSourceImpl globalRms = mock(MetricsReplicationSourceImpl.class);
342    when(globalRms.getMetricsRegistry()).thenReturn(mockRegistry);
343
344    MetricsReplicationSourceSource singleSourceSource =
345      new MetricsReplicationSourceSourceImpl(singleRms, id);
346    MetricsReplicationGlobalSourceSource globalSourceSource =
347      new MetricsReplicationGlobalSourceSourceImpl(globalRms);
348    MetricsReplicationGlobalSourceSource spyglobalSourceSource = spy(globalSourceSource);
349    doNothing().when(spyglobalSourceSource).incrFailedRecoveryQueue();
350
351    Map<String, MetricsReplicationTableSource> singleSourceSourceByTable = new HashMap<>();
352    MetricsSource source =
353      new MetricsSource(id, singleSourceSource, spyglobalSourceSource, singleSourceSourceByTable);
354
355    String gaugeName = "gauge";
356    String singleGaugeName = "source.id." + gaugeName;
357    String globalGaugeName = "source." + gaugeName;
358    long delta = 1;
359    String counterName = "counter";
360    String singleCounterName = "source.id." + counterName;
361    String globalCounterName = "source." + counterName;
362    long count = 2;
363    source.decGauge(gaugeName, delta);
364    source.getMetricsContext();
365    source.getMetricsDescription();
366    source.getMetricsJmxContext();
367    source.getMetricsName();
368    source.incCounters(counterName, count);
369    source.incGauge(gaugeName, delta);
370    source.init();
371    source.removeMetric(gaugeName);
372    source.setGauge(gaugeName, delta);
373    source.updateHistogram(counterName, count);
374    source.incrFailedRecoveryQueue();
375
376    verify(singleRms).decGauge(singleGaugeName, delta);
377    verify(globalRms).decGauge(globalGaugeName, delta);
378    verify(globalRms).getMetricsContext();
379    verify(globalRms).getMetricsJmxContext();
380    verify(globalRms).getMetricsName();
381    verify(singleRms).incCounters(singleCounterName, count);
382    verify(globalRms).incCounters(globalCounterName, count);
383    verify(singleRms).incGauge(singleGaugeName, delta);
384    verify(globalRms).incGauge(globalGaugeName, delta);
385    verify(globalRms).init();
386    verify(singleRms).removeMetric(singleGaugeName);
387    verify(globalRms).removeMetric(globalGaugeName);
388    verify(singleRms).setGauge(singleGaugeName, delta);
389    verify(globalRms).setGauge(globalGaugeName, delta);
390    verify(singleRms).updateHistogram(singleCounterName, count);
391    verify(globalRms).updateHistogram(globalCounterName, count);
392    verify(spyglobalSourceSource).incrFailedRecoveryQueue();
393
394    // check singleSourceSourceByTable metrics.
395    // singleSourceSourceByTable map entry will be created only
396    // after calling #setAgeOfLastShippedOpByTable
397    boolean containsRandomNewTable =
398      source.getSingleSourceSourceByTable().containsKey("RandomNewTable");
399    Assert.assertEquals(false, containsRandomNewTable);
400    source.updateTableLevelMetrics(createWALEntriesWithSize("RandomNewTable"));
401    containsRandomNewTable = source.getSingleSourceSourceByTable().containsKey("RandomNewTable");
402    Assert.assertEquals(true, containsRandomNewTable);
403    MetricsReplicationTableSource msr = source.getSingleSourceSourceByTable().get("RandomNewTable");
404
405    // age should be greater than zero we created the entry with time in the past
406    Assert.assertTrue(msr.getLastShippedAge() > 0);
407    Assert.assertTrue(msr.getShippedBytes() > 0);
408
409  }
410
411  private List<Pair<Entry, Long>> createWALEntriesWithSize(String tableName) {
412    List<Pair<Entry, Long>> walEntriesWithSize = new ArrayList<>();
413    byte[] a = new byte[] { 'a' };
414    Entry entry = createEntry(tableName, null, a);
415    walEntriesWithSize.add(new Pair<>(entry, 10L));
416    return walEntriesWithSize;
417  }
418
419  private Entry createEntry(String tableName, TreeMap<byte[], Integer> scopes, byte[]... kvs) {
420    WALKeyImpl key1 = new WALKeyImpl(new byte[0], TableName.valueOf(tableName),
421      EnvironmentEdgeManager.currentTime() - 1L, scopes);
422    WALEdit edit1 = new WALEdit();
423
424    for (byte[] kv : kvs) {
425      edit1.add(new KeyValue(kv, kv, kv));
426    }
427    return new Entry(key1, edit1);
428  }
429
430  private void doPut(byte[] row) throws IOException {
431    try (Connection connection = ConnectionFactory.createConnection(CONF1)) {
432      doPut(connection, row);
433    }
434  }
435
436  private void doPut(final Connection connection, final byte[] row) throws IOException {
437    try (Table t = connection.getTable(tableName)) {
438      Put put = new Put(row);
439      put.addColumn(famName, row, row);
440      t.put(put);
441    }
442  }
443
444  private static void doAssert(byte[] row) throws Exception {
445    if (ReplicationEndpointForTest.lastEntries == null) {
446      return; // first call
447    }
448    Assert.assertEquals(1, ReplicationEndpointForTest.lastEntries.size());
449    List<Cell> cells = ReplicationEndpointForTest.lastEntries.get(0).getEdit().getCells();
450    Assert.assertEquals(1, cells.size());
451    Assert.assertTrue(Bytes.equals(cells.get(0).getRowArray(), cells.get(0).getRowOffset(),
452      cells.get(0).getRowLength(), row, 0, row.length));
453  }
454
455  public static class ReplicationEndpointForTest extends BaseReplicationEndpoint {
456    static UUID uuid = UTIL1.getRandomUUID();
457    static AtomicInteger contructedCount = new AtomicInteger();
458    static AtomicInteger startedCount = new AtomicInteger();
459    static AtomicInteger stoppedCount = new AtomicInteger();
460    static AtomicInteger replicateCount = new AtomicInteger();
461    static volatile List<Entry> lastEntries = null;
462
463    public ReplicationEndpointForTest() {
464      replicateCount.set(0);
465      contructedCount.incrementAndGet();
466    }
467
468    @Override
469    public UUID getPeerUUID() {
470      return uuid;
471    }
472
473    @Override
474    public boolean replicate(ReplicateContext replicateContext) {
475      replicateCount.incrementAndGet();
476      lastEntries = new ArrayList<>(replicateContext.entries);
477      return true;
478    }
479
480    @Override
481    public void start() {
482      startAsync();
483    }
484
485    @Override
486    public void stop() {
487      stopAsync();
488    }
489
490    @Override
491    protected void doStart() {
492      startedCount.incrementAndGet();
493      notifyStarted();
494    }
495
496    @Override
497    protected void doStop() {
498      stoppedCount.incrementAndGet();
499      notifyStopped();
500    }
501
502    @Override
503    public boolean canReplicateToSameCluster() {
504      return true;
505    }
506  }
507
508  /**
509   * Not used by unit tests, helpful for manual testing with replication.
510   * <p>
511   * Snippet for `hbase shell`:
512   *
513   * <pre>
514   * create 't', 'f'
515   * add_peer '1', ENDPOINT_CLASSNAME =&gt; 'org.apache.hadoop.hbase.replication.' + \
516   *    'TestReplicationEndpoint$SleepingReplicationEndpointForTest'
517   * alter 't', {NAME=&gt;'f', REPLICATION_SCOPE=&gt;1}
518   * </pre>
519   */
520  public static class SleepingReplicationEndpointForTest extends ReplicationEndpointForTest {
521    private long duration;
522
523    public SleepingReplicationEndpointForTest() {
524      super();
525    }
526
527    @Override
528    public void init(Context context) throws IOException {
529      super.init(context);
530      if (this.ctx != null) {
531        duration = this.ctx.getConfiguration()
532          .getLong("hbase.test.sleep.replication.endpoint.duration.millis", 5000L);
533      }
534    }
535
536    @Override
537    public boolean replicate(ReplicateContext context) {
538      try {
539        Thread.sleep(duration);
540      } catch (InterruptedException e) {
541        Thread.currentThread().interrupt();
542        return false;
543      }
544      return super.replicate(context);
545    }
546  }
547
548  public static class InterClusterReplicationEndpointForTest
549    extends HBaseInterClusterReplicationEndpoint {
550
551    static AtomicInteger replicateCount = new AtomicInteger();
552    static boolean failedOnce;
553
554    public InterClusterReplicationEndpointForTest() {
555      replicateCount.set(0);
556    }
557
558    @Override
559    public boolean replicate(ReplicateContext replicateContext) {
560      boolean success = super.replicate(replicateContext);
561      if (success) {
562        replicateCount.addAndGet(replicateContext.entries.size());
563      }
564      return success;
565    }
566
567    @Override
568    protected Callable<Integer> createReplicator(List<Entry> entries, int ordinal, int timeout) {
569      // Fail only once, we don't want to slow down the test.
570      if (failedOnce) {
571        return () -> ordinal;
572      } else {
573        failedOnce = true;
574        return () -> {
575          throw new IOException("Sample Exception: Failed to replicate.");
576        };
577      }
578    }
579  }
580
581  public static class ReplicationEndpointReturningFalse extends ReplicationEndpointForTest {
582    static int COUNT = 10;
583    static AtomicReference<Exception> ex = new AtomicReference<>(null);
584    static AtomicBoolean replicated = new AtomicBoolean(false);
585
586    @Override
587    public boolean replicate(ReplicateContext replicateContext) {
588      try {
589        // check row
590        doAssert(row);
591      } catch (Exception e) {
592        ex.set(e);
593      }
594
595      super.replicate(replicateContext);
596      LOG.info("Replicated " + Bytes.toString(row) + ", count=" + replicateCount.get());
597
598      replicated.set(replicateCount.get() > COUNT); // first 10 times, we return false
599      return replicated.get();
600    }
601  }
602
603  // return a WALEntry filter which only accepts "row", but not other rows
604  public static class ReplicationEndpointWithWALEntryFilter extends ReplicationEndpointForTest {
605    static AtomicReference<Exception> ex = new AtomicReference<>(null);
606
607    @Override
608    public boolean replicate(ReplicateContext replicateContext) {
609      try {
610        super.replicate(replicateContext);
611        doAssert(row);
612      } catch (Exception e) {
613        ex.set(e);
614      }
615      return true;
616    }
617
618    @Override
619    public WALEntryFilter getWALEntryfilter() {
620      return new ChainWALEntryFilter(super.getWALEntryfilter(), new WALEntryFilter() {
621        @Override
622        public Entry filter(Entry entry) {
623          ArrayList<Cell> cells = entry.getEdit().getCells();
624          int size = cells.size();
625          for (int i = size - 1; i >= 0; i--) {
626            Cell cell = cells.get(i);
627            if (
628              !Bytes.equals(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(), row, 0,
629                row.length)
630            ) {
631              cells.remove(i);
632            }
633          }
634          return entry;
635        }
636      });
637    }
638  }
639
640  public static class EverythingPassesWALEntryFilter implements WALEntryFilter {
641    private static boolean passedEntry = false;
642
643    @Override
644    public Entry filter(Entry entry) {
645      passedEntry = true;
646      return entry;
647    }
648
649    public static boolean hasPassedAnEntry() {
650      return passedEntry;
651    }
652  }
653
654  public static class EverythingPassesWALEntryFilterSubclass
655    extends EverythingPassesWALEntryFilter {
656  }
657}