001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.junit.Assert.assertNull;
021import static org.junit.Assert.assertTrue;
022import static org.mockito.Mockito.mock;
023import static org.mockito.Mockito.when;
024
025import java.util.ArrayList;
026import java.util.HashMap;
027import java.util.HashSet;
028import java.util.List;
029import java.util.Map;
030import java.util.Set;
031import java.util.TreeMap;
032import org.apache.hadoop.hbase.Cell;
033import org.apache.hadoop.hbase.CellComparatorImpl;
034import org.apache.hadoop.hbase.HBaseClassTestRule;
035import org.apache.hadoop.hbase.HConstants;
036import org.apache.hadoop.hbase.KeyValue;
037import org.apache.hadoop.hbase.TableName;
038import org.apache.hadoop.hbase.client.RegionInfoBuilder;
039import org.apache.hadoop.hbase.testclassification.ReplicationTests;
040import org.apache.hadoop.hbase.testclassification.SmallTests;
041import org.apache.hadoop.hbase.util.Bytes;
042import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
043import org.apache.hadoop.hbase.wal.WAL.Entry;
044import org.apache.hadoop.hbase.wal.WALEdit;
045import org.apache.hadoop.hbase.wal.WALKeyImpl;
046import org.junit.Assert;
047import org.junit.ClassRule;
048import org.junit.Test;
049import org.junit.experimental.categories.Category;
050
051import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
052
053@Category({ ReplicationTests.class, SmallTests.class })
054public class TestReplicationWALEntryFilters {
055
056  @ClassRule
057  public static final HBaseClassTestRule CLASS_RULE =
058    HBaseClassTestRule.forClass(TestReplicationWALEntryFilters.class);
059
060  static byte[] a = new byte[] { 'a' };
061  static byte[] b = new byte[] { 'b' };
062  static byte[] c = new byte[] { 'c' };
063  static byte[] d = new byte[] { 'd' };
064
065  @Test
066  public void testSystemTableWALEntryFilter() {
067    SystemTableWALEntryFilter filter = new SystemTableWALEntryFilter();
068
069    // meta
070    WALKeyImpl key1 =
071      new WALKeyImpl(RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes(),
072        TableName.META_TABLE_NAME, EnvironmentEdgeManager.currentTime());
073    Entry metaEntry = new Entry(key1, null);
074
075    assertNull(filter.filter(metaEntry));
076
077    // ns table
078    WALKeyImpl key2 = new WALKeyImpl(new byte[0], TableName.NAMESPACE_TABLE_NAME,
079      EnvironmentEdgeManager.currentTime());
080    Entry nsEntry = new Entry(key2, null);
081    assertNull(filter.filter(nsEntry));
082
083    // user table
084
085    WALKeyImpl key3 =
086      new WALKeyImpl(new byte[0], TableName.valueOf("foo"), EnvironmentEdgeManager.currentTime());
087    Entry userEntry = new Entry(key3, null);
088
089    assertEquals(userEntry, filter.filter(userEntry));
090  }
091
092  @Test
093  public void testScopeWALEntryFilter() {
094    WALEntryFilter filter = new ChainWALEntryFilter(new ScopeWALEntryFilter());
095
096    Entry userEntry = createEntry(null, a, b);
097    Entry userEntryA = createEntry(null, a);
098    Entry userEntryB = createEntry(null, b);
099    Entry userEntryEmpty = createEntry(null);
100
101    // no scopes
102    // now we will not filter out entries without a replication scope since serial replication still
103    // need the sequence id, but the cells will all be filtered out.
104    assertTrue(filter.filter(userEntry).getEdit().isEmpty());
105
106    // empty scopes
107    // ditto
108    TreeMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
109    userEntry = createEntry(scopes, a, b);
110    assertTrue(filter.filter(userEntry).getEdit().isEmpty());
111
112    // different scope
113    scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
114    scopes.put(c, HConstants.REPLICATION_SCOPE_GLOBAL);
115    userEntry = createEntry(scopes, a, b);
116    // all kvs should be filtered
117    assertEquals(userEntryEmpty, filter.filter(userEntry));
118
119    // local scope
120    scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
121    scopes.put(a, HConstants.REPLICATION_SCOPE_LOCAL);
122    userEntry = createEntry(scopes, a, b);
123    assertEquals(userEntryEmpty, filter.filter(userEntry));
124    scopes.put(b, HConstants.REPLICATION_SCOPE_LOCAL);
125    assertEquals(userEntryEmpty, filter.filter(userEntry));
126
127    // only scope a
128    scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
129    scopes.put(a, HConstants.REPLICATION_SCOPE_GLOBAL);
130    userEntry = createEntry(scopes, a, b);
131    assertEquals(userEntryA, filter.filter(userEntry));
132    scopes.put(b, HConstants.REPLICATION_SCOPE_LOCAL);
133    assertEquals(userEntryA, filter.filter(userEntry));
134
135    // only scope b
136    scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
137    scopes.put(b, HConstants.REPLICATION_SCOPE_GLOBAL);
138    userEntry = createEntry(scopes, a, b);
139    assertEquals(userEntryB, filter.filter(userEntry));
140    scopes.put(a, HConstants.REPLICATION_SCOPE_LOCAL);
141    assertEquals(userEntryB, filter.filter(userEntry));
142
143    // scope a and b
144    scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
145    scopes.put(b, HConstants.REPLICATION_SCOPE_GLOBAL);
146    userEntry = createEntry(scopes, a, b);
147    assertEquals(userEntryB, filter.filter(userEntry));
148    scopes.put(a, HConstants.REPLICATION_SCOPE_LOCAL);
149    assertEquals(userEntryB, filter.filter(userEntry));
150  }
151
152  WALEntryFilter nullFilter = new WALEntryFilter() {
153    @Override
154    public Entry filter(Entry entry) {
155      return null;
156    }
157  };
158
159  WALEntryFilter passFilter = new WALEntryFilter() {
160    @Override
161    public Entry filter(Entry entry) {
162      return entry;
163    }
164  };
165
166  @Test
167  public void testChainWALEntryFilter() {
168    Entry userEntry = createEntry(null, a, b, c);
169
170    ChainWALEntryFilter filter = new ChainWALEntryFilter(passFilter);
171    assertEquals(createEntry(null, a, b, c), filter.filter(userEntry));
172
173    filter = new ChainWALEntryFilter(passFilter, passFilter);
174    assertEquals(createEntry(null, a, b, c), filter.filter(userEntry));
175
176    filter = new ChainWALEntryFilter(passFilter, passFilter, passFilter);
177    assertEquals(createEntry(null, a, b, c), filter.filter(userEntry));
178
179    filter = new ChainWALEntryFilter(nullFilter);
180    assertEquals(null, filter.filter(userEntry));
181
182    filter = new ChainWALEntryFilter(nullFilter, passFilter);
183    assertEquals(null, filter.filter(userEntry));
184
185    filter = new ChainWALEntryFilter(passFilter, nullFilter);
186    assertEquals(null, filter.filter(userEntry));
187
188    filter = new ChainWALEntryFilter(nullFilter, passFilter, nullFilter);
189    assertEquals(null, filter.filter(userEntry));
190
191    filter = new ChainWALEntryFilter(nullFilter, nullFilter);
192    assertEquals(null, filter.filter(userEntry));
193
194    // flatten
195    filter = new ChainWALEntryFilter(
196      new ChainWALEntryFilter(passFilter, new ChainWALEntryFilter(passFilter, passFilter),
197        new ChainWALEntryFilter(passFilter), new ChainWALEntryFilter(passFilter)),
198      new ChainWALEntryFilter(passFilter));
199    assertEquals(createEntry(null, a, b, c), filter.filter(userEntry));
200
201    filter = new ChainWALEntryFilter(
202      new ChainWALEntryFilter(passFilter,
203        new ChainWALEntryFilter(passFilter, new ChainWALEntryFilter(nullFilter))),
204      new ChainWALEntryFilter(passFilter));
205    assertEquals(null, filter.filter(userEntry));
206  }
207
208  @Test
209  public void testNamespaceTableCfWALEntryFilter() {
210    ReplicationPeer peer = mock(ReplicationPeer.class);
211    ReplicationPeerConfigBuilder peerConfigBuilder = ReplicationPeerConfig.newBuilder();
212
213    // 1. replicate_all flag is false, no namespaces and table-cfs config
214    peerConfigBuilder.setReplicateAllUserTables(false).setNamespaces(null).setTableCFsMap(null);
215    when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
216    Entry userEntry = createEntry(null, a, b, c);
217    ChainWALEntryFilter filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
218    assertEquals(null, filter.filter(userEntry));
219
220    // 2. replicate_all flag is false, and only config table-cfs in peer
221    // empty map
222    userEntry = createEntry(null, a, b, c);
223    Map<TableName, List<String>> tableCfs = new HashMap<>();
224    peerConfigBuilder.setReplicateAllUserTables(false).setTableCFsMap(tableCfs);
225    when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
226    filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
227    assertEquals(null, filter.filter(userEntry));
228
229    // table bar
230    userEntry = createEntry(null, a, b, c);
231    tableCfs = new HashMap<>();
232    tableCfs.put(TableName.valueOf("bar"), null);
233    peerConfigBuilder.setReplicateAllUserTables(false).setTableCFsMap(tableCfs);
234    when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
235    filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
236    assertEquals(null, filter.filter(userEntry));
237
238    // table foo:a
239    userEntry = createEntry(null, a, b, c);
240    tableCfs = new HashMap<>();
241    tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a"));
242    peerConfigBuilder.setReplicateAllUserTables(false).setTableCFsMap(tableCfs);
243    when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
244    filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
245    assertEquals(createEntry(null, a), filter.filter(userEntry));
246
247    // table foo:a,c
248    userEntry = createEntry(null, a, b, c, d);
249    tableCfs = new HashMap<>();
250    tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a", "c"));
251    peerConfigBuilder.setReplicateAllUserTables(false).setTableCFsMap(tableCfs);
252    when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
253    filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
254    assertEquals(createEntry(null, a, c), filter.filter(userEntry));
255
256    // 3. replicate_all flag is false, and only config namespaces in peer
257    when(peer.getTableCFs()).thenReturn(null);
258    // empty set
259    Set<String> namespaces = new HashSet<>();
260    peerConfigBuilder.setReplicateAllUserTables(false).setNamespaces(namespaces)
261      .setTableCFsMap(null);
262    when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
263    userEntry = createEntry(null, a, b, c);
264    filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
265    assertEquals(null, filter.filter(userEntry));
266
267    // namespace default
268    namespaces.add("default");
269    peerConfigBuilder.setReplicateAllUserTables(false).setNamespaces(namespaces);
270    when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
271    userEntry = createEntry(null, a, b, c);
272    filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
273    assertEquals(createEntry(null, a, b, c), filter.filter(userEntry));
274
275    // namespace ns1
276    namespaces = new HashSet<>();
277    namespaces.add("ns1");
278    peerConfigBuilder.setReplicateAllUserTables(false).setNamespaces(namespaces);
279    when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
280    userEntry = createEntry(null, a, b, c);
281    filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
282    assertEquals(null, filter.filter(userEntry));
283
284    // 4. replicate_all flag is false, and config namespaces and table-cfs both
285    // Namespaces config should not confict with table-cfs config
286    namespaces = new HashSet<>();
287    tableCfs = new HashMap<>();
288    namespaces.add("ns1");
289    tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a", "c"));
290    peerConfigBuilder.setReplicateAllUserTables(false).setNamespaces(namespaces)
291      .setTableCFsMap(tableCfs);
292    when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
293    userEntry = createEntry(null, a, b, c);
294    filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
295    assertEquals(createEntry(null, a, c), filter.filter(userEntry));
296
297    namespaces = new HashSet<>();
298    tableCfs = new HashMap<>();
299    namespaces.add("default");
300    tableCfs.put(TableName.valueOf("ns1:foo"), Lists.newArrayList("a", "c"));
301    peerConfigBuilder.setReplicateAllUserTables(false).setNamespaces(namespaces)
302      .setTableCFsMap(tableCfs);
303    when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
304    userEntry = createEntry(null, a, b, c);
305    filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
306    assertEquals(createEntry(null, a, b, c), filter.filter(userEntry));
307
308    namespaces = new HashSet<>();
309    tableCfs = new HashMap<>();
310    namespaces.add("ns1");
311    tableCfs.put(TableName.valueOf("bar"), null);
312    peerConfigBuilder.setReplicateAllUserTables(false).setNamespaces(namespaces)
313      .setTableCFsMap(tableCfs);
314    when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
315    userEntry = createEntry(null, a, b, c);
316    filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
317    assertEquals(null, filter.filter(userEntry));
318  }
319
320  @Test
321  public void testNamespaceTableCfWALEntryFilter2() {
322    ReplicationPeer peer = mock(ReplicationPeer.class);
323    ReplicationPeerConfigBuilder peerConfigBuilder = ReplicationPeerConfig.newBuilder();
324
325    // 1. replicate_all flag is true
326    // and no exclude namespaces and no exclude table-cfs config
327    peerConfigBuilder.setReplicateAllUserTables(true).setExcludeNamespaces(null)
328      .setExcludeTableCFsMap(null);
329    when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
330    Entry userEntry = createEntry(null, a, b, c);
331    ChainWALEntryFilter filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
332    assertEquals(createEntry(null, a, b, c), filter.filter(userEntry));
333
334    // 2. replicate_all flag is true, and only config exclude namespaces
335    // empty set
336    Set<String> namespaces = new HashSet<String>();
337    peerConfigBuilder.setExcludeNamespaces(namespaces).setExcludeTableCFsMap(null);
338    when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
339    userEntry = createEntry(null, a, b, c);
340    filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
341    assertEquals(createEntry(null, a, b, c), filter.filter(userEntry));
342
343    // exclude namespace default
344    namespaces.add("default");
345    peerConfigBuilder.setExcludeNamespaces(namespaces).setExcludeTableCFsMap(null);
346    when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
347    userEntry = createEntry(null, a, b, c);
348    filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
349    assertEquals(null, filter.filter(userEntry));
350
351    // exclude namespace ns1
352    namespaces = new HashSet<String>();
353    namespaces.add("ns1");
354    peerConfigBuilder.setExcludeNamespaces(namespaces).setExcludeTableCFsMap(null);
355    when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
356    userEntry = createEntry(null, a, b, c);
357    filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
358    assertEquals(createEntry(null, a, b, c), filter.filter(userEntry));
359
360    // 3. replicate_all flag is true, and only config exclude table-cfs
361    // empty table-cfs map
362    Map<TableName, List<String>> tableCfs = new HashMap<TableName, List<String>>();
363    peerConfigBuilder.setExcludeNamespaces(null).setExcludeTableCFsMap(tableCfs);
364    when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
365    userEntry = createEntry(null, a, b, c);
366    filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
367    assertEquals(createEntry(null, a, b, c), filter.filter(userEntry));
368
369    // exclude table bar
370    tableCfs = new HashMap<TableName, List<String>>();
371    tableCfs.put(TableName.valueOf("bar"), null);
372    peerConfigBuilder.setExcludeNamespaces(null).setExcludeTableCFsMap(tableCfs);
373    when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
374    userEntry = createEntry(null, a, b, c);
375    filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
376    assertEquals(createEntry(null, a, b, c), filter.filter(userEntry));
377
378    // exclude table foo:a
379    tableCfs = new HashMap<TableName, List<String>>();
380    tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a"));
381    peerConfigBuilder.setExcludeNamespaces(null).setExcludeTableCFsMap(tableCfs);
382    when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
383    userEntry = createEntry(null, a, b, c);
384    filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
385    assertEquals(createEntry(null, b, c), filter.filter(userEntry));
386
387    // 4. replicate_all flag is true, and config exclude namespaces and table-cfs both
388    // exclude ns1 and table foo:a,c
389    namespaces = new HashSet<String>();
390    tableCfs = new HashMap<TableName, List<String>>();
391    namespaces.add("ns1");
392    tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a", "c"));
393    peerConfigBuilder.setExcludeNamespaces(namespaces).setExcludeTableCFsMap(tableCfs);
394    when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
395    userEntry = createEntry(null, a, b, c);
396    filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
397    assertEquals(createEntry(null, b), filter.filter(userEntry));
398
399    // exclude namespace default and table ns1:bar
400    namespaces = new HashSet<String>();
401    tableCfs = new HashMap<TableName, List<String>>();
402    namespaces.add("default");
403    tableCfs.put(TableName.valueOf("ns1:bar"), new ArrayList<String>());
404    peerConfigBuilder.setExcludeNamespaces(namespaces).setExcludeTableCFsMap(tableCfs);
405    when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
406    userEntry = createEntry(null, a, b, c);
407    filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
408    assertEquals(null, filter.filter(userEntry));
409  }
410
411  private Entry createEntry(TreeMap<byte[], Integer> scopes, byte[]... kvs) {
412    WALKeyImpl key1 = new WALKeyImpl(new byte[0], TableName.valueOf("foo"),
413      EnvironmentEdgeManager.currentTime(), scopes);
414    WALEdit edit1 = new WALEdit();
415
416    for (byte[] kv : kvs) {
417      edit1.add(new KeyValue(kv, kv, kv));
418    }
419    return new Entry(key1, edit1);
420  }
421
422  private void assertEquals(Entry e1, Entry e2) {
423    Assert.assertEquals(e1 == null, e2 == null);
424    if (e1 == null) {
425      return;
426    }
427
428    // do not compare WALKeys
429
430    // compare kvs
431    Assert.assertEquals(e1.getEdit() == null, e2.getEdit() == null);
432    if (e1.getEdit() == null) {
433      return;
434    }
435    List<Cell> cells1 = e1.getEdit().getCells();
436    List<Cell> cells2 = e2.getEdit().getCells();
437    Assert.assertEquals(cells1.size(), cells2.size());
438    for (int i = 0; i < cells1.size(); i++) {
439      CellComparatorImpl.COMPARATOR.compare(cells1.get(i), cells2.get(i));
440    }
441  }
442}