001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.junit.Assert.assertNull;
021import static org.junit.Assert.assertTrue;
022import static org.mockito.Mockito.mock;
023import static org.mockito.Mockito.when;
024
025import java.util.ArrayList;
026import java.util.HashMap;
027import java.util.HashSet;
028import java.util.List;
029import java.util.Map;
030import java.util.Set;
031import java.util.TreeMap;
032import org.apache.hadoop.hbase.Cell;
033import org.apache.hadoop.hbase.CellComparatorImpl;
034import org.apache.hadoop.hbase.HBaseClassTestRule;
035import org.apache.hadoop.hbase.HConstants;
036import org.apache.hadoop.hbase.KeyValue;
037import org.apache.hadoop.hbase.TableName;
038import org.apache.hadoop.hbase.client.RegionInfoBuilder;
039import org.apache.hadoop.hbase.testclassification.ReplicationTests;
040import org.apache.hadoop.hbase.testclassification.SmallTests;
041import org.apache.hadoop.hbase.util.Bytes;
042import org.apache.hadoop.hbase.wal.WAL.Entry;
043import org.apache.hadoop.hbase.wal.WALEdit;
044import org.apache.hadoop.hbase.wal.WALKeyImpl;
045import org.junit.Assert;
046import org.junit.ClassRule;
047import org.junit.Test;
048import org.junit.experimental.categories.Category;
049
050import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
051
052@Category({ ReplicationTests.class, SmallTests.class })
053public class TestReplicationWALEntryFilters {
054
055  @ClassRule
056  public static final HBaseClassTestRule CLASS_RULE =
057      HBaseClassTestRule.forClass(TestReplicationWALEntryFilters.class);
058
059  static byte[] a = new byte[] {'a'};
060  static byte[] b = new byte[] {'b'};
061  static byte[] c = new byte[] {'c'};
062  static byte[] d = new byte[] {'d'};
063
064  @Test
065  public void testSystemTableWALEntryFilter() {
066    SystemTableWALEntryFilter filter = new SystemTableWALEntryFilter();
067
068    // meta
069    WALKeyImpl key1 =
070      new WALKeyImpl(RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes(),
071        TableName.META_TABLE_NAME, System.currentTimeMillis());
072    Entry metaEntry = new Entry(key1, null);
073
074    assertNull(filter.filter(metaEntry));
075
076    // ns table
077    WALKeyImpl key2 =
078        new WALKeyImpl(new byte[0], TableName.NAMESPACE_TABLE_NAME, System.currentTimeMillis());
079    Entry nsEntry = new Entry(key2, null);
080    assertNull(filter.filter(nsEntry));
081
082    // user table
083
084    WALKeyImpl key3 = new WALKeyImpl(new byte[0], TableName.valueOf("foo"),
085        System.currentTimeMillis());
086    Entry userEntry = new Entry(key3, null);
087
088    assertEquals(userEntry, filter.filter(userEntry));
089  }
090
091  @Test
092  public void testScopeWALEntryFilter() {
093    WALEntryFilter filter = new ChainWALEntryFilter(new ScopeWALEntryFilter());
094
095    Entry userEntry = createEntry(null, a, b);
096    Entry userEntryA = createEntry(null, a);
097    Entry userEntryB = createEntry(null, b);
098    Entry userEntryEmpty = createEntry(null);
099
100    // no scopes
101    // now we will not filter out entries without a replication scope since serial replication still
102    // need the sequence id, but the cells will all be filtered out.
103    assertTrue(filter.filter(userEntry).getEdit().isEmpty());
104
105    // empty scopes
106    // ditto
107    TreeMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
108    userEntry = createEntry(scopes, a, b);
109    assertTrue(filter.filter(userEntry).getEdit().isEmpty());
110
111    // different scope
112    scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
113    scopes.put(c, HConstants.REPLICATION_SCOPE_GLOBAL);
114    userEntry = createEntry(scopes, a, b);
115    // all kvs should be filtered
116    assertEquals(userEntryEmpty, filter.filter(userEntry));
117
118    // local scope
119    scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
120    scopes.put(a, HConstants.REPLICATION_SCOPE_LOCAL);
121    userEntry = createEntry(scopes, a, b);
122    assertEquals(userEntryEmpty, filter.filter(userEntry));
123    scopes.put(b, HConstants.REPLICATION_SCOPE_LOCAL);
124    assertEquals(userEntryEmpty, filter.filter(userEntry));
125
126    // only scope a
127    scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
128    scopes.put(a, HConstants.REPLICATION_SCOPE_GLOBAL);
129    userEntry = createEntry(scopes, a, b);
130    assertEquals(userEntryA, filter.filter(userEntry));
131    scopes.put(b, HConstants.REPLICATION_SCOPE_LOCAL);
132    assertEquals(userEntryA, filter.filter(userEntry));
133
134    // only scope b
135    scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
136    scopes.put(b, HConstants.REPLICATION_SCOPE_GLOBAL);
137    userEntry = createEntry(scopes, a, b);
138    assertEquals(userEntryB, filter.filter(userEntry));
139    scopes.put(a, HConstants.REPLICATION_SCOPE_LOCAL);
140    assertEquals(userEntryB, filter.filter(userEntry));
141
142    // scope a and b
143    scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
144    scopes.put(b, HConstants.REPLICATION_SCOPE_GLOBAL);
145    userEntry = createEntry(scopes, a, b);
146    assertEquals(userEntryB, filter.filter(userEntry));
147    scopes.put(a, HConstants.REPLICATION_SCOPE_LOCAL);
148    assertEquals(userEntryB, filter.filter(userEntry));
149  }
150
151  WALEntryFilter nullFilter = new WALEntryFilter() {
152    @Override
153    public Entry filter(Entry entry) {
154      return null;
155    }
156  };
157
158  WALEntryFilter passFilter = new WALEntryFilter() {
159    @Override
160    public Entry filter(Entry entry) {
161      return entry;
162    }
163  };
164
165  @Test
166  public void testChainWALEntryFilter() {
167    Entry userEntry = createEntry(null, a, b, c);
168
169    ChainWALEntryFilter filter = new ChainWALEntryFilter(passFilter);
170    assertEquals(createEntry(null, a,b,c), filter.filter(userEntry));
171
172    filter = new ChainWALEntryFilter(passFilter, passFilter);
173    assertEquals(createEntry(null, a,b,c), filter.filter(userEntry));
174
175    filter = new ChainWALEntryFilter(passFilter, passFilter, passFilter);
176    assertEquals(createEntry(null, a,b,c), filter.filter(userEntry));
177
178    filter = new ChainWALEntryFilter(nullFilter);
179    assertEquals(null, filter.filter(userEntry));
180
181    filter = new ChainWALEntryFilter(nullFilter, passFilter);
182    assertEquals(null, filter.filter(userEntry));
183
184    filter = new ChainWALEntryFilter(passFilter, nullFilter);
185    assertEquals(null, filter.filter(userEntry));
186
187    filter = new ChainWALEntryFilter(nullFilter, passFilter, nullFilter);
188    assertEquals(null, filter.filter(userEntry));
189
190    filter = new ChainWALEntryFilter(nullFilter, nullFilter);
191    assertEquals(null, filter.filter(userEntry));
192
193    // flatten
194    filter =
195        new ChainWALEntryFilter(
196          new ChainWALEntryFilter(passFilter,
197            new ChainWALEntryFilter(passFilter, passFilter),
198          new ChainWALEntryFilter(passFilter),
199          new ChainWALEntryFilter(passFilter)),
200          new ChainWALEntryFilter(passFilter));
201    assertEquals(createEntry(null, a,b,c), filter.filter(userEntry));
202
203
204    filter =
205        new ChainWALEntryFilter(
206          new ChainWALEntryFilter(passFilter,
207            new ChainWALEntryFilter(passFilter,
208              new ChainWALEntryFilter(nullFilter))),
209          new ChainWALEntryFilter(passFilter));
210    assertEquals(null, filter.filter(userEntry));
211  }
212
213  @Test
214  public void testNamespaceTableCfWALEntryFilter() {
215    ReplicationPeer peer = mock(ReplicationPeer.class);
216    ReplicationPeerConfigBuilder peerConfigBuilder = ReplicationPeerConfig.newBuilder();
217
218    // 1. replicate_all flag is false, no namespaces and table-cfs config
219    peerConfigBuilder.setReplicateAllUserTables(false).setNamespaces(null).setTableCFsMap(null);
220    when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
221    Entry userEntry = createEntry(null, a, b, c);
222    ChainWALEntryFilter filter =
223        new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
224    assertEquals(null, filter.filter(userEntry));
225
226    // 2. replicate_all flag is false, and only config table-cfs in peer
227    // empty map
228    userEntry = createEntry(null, a, b, c);
229    Map<TableName, List<String>> tableCfs = new HashMap<>();
230    peerConfigBuilder.setReplicateAllUserTables(false).setTableCFsMap(tableCfs);
231    when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
232    filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
233    assertEquals(null, filter.filter(userEntry));
234
235    // table bar
236    userEntry = createEntry(null, a, b, c);
237    tableCfs = new HashMap<>();
238    tableCfs.put(TableName.valueOf("bar"), null);
239    peerConfigBuilder.setReplicateAllUserTables(false).setTableCFsMap(tableCfs);
240    when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
241    filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
242    assertEquals(null, filter.filter(userEntry));
243
244    // table foo:a
245    userEntry = createEntry(null, a, b, c);
246    tableCfs = new HashMap<>();
247    tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a"));
248    peerConfigBuilder.setReplicateAllUserTables(false).setTableCFsMap(tableCfs);
249    when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
250    filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
251    assertEquals(createEntry(null, a), filter.filter(userEntry));
252
253    // table foo:a,c
254    userEntry = createEntry(null, a, b, c, d);
255    tableCfs = new HashMap<>();
256    tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a", "c"));
257    peerConfigBuilder.setReplicateAllUserTables(false).setTableCFsMap(tableCfs);
258    when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
259    filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
260    assertEquals(createEntry(null, a,c), filter.filter(userEntry));
261
262    // 3. replicate_all flag is false, and only config namespaces in peer
263    when(peer.getTableCFs()).thenReturn(null);
264    // empty set
265    Set<String> namespaces = new HashSet<>();
266    peerConfigBuilder.setReplicateAllUserTables(false).setNamespaces(namespaces)
267      .setTableCFsMap(null);
268    when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
269    userEntry = createEntry(null, a, b, c);
270    filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
271    assertEquals(null, filter.filter(userEntry));
272
273    // namespace default
274    namespaces.add("default");
275    peerConfigBuilder.setReplicateAllUserTables(false).setNamespaces(namespaces);
276    when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
277    userEntry = createEntry(null, a, b, c);
278    filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
279    assertEquals(createEntry(null, a,b,c), filter.filter(userEntry));
280
281    // namespace ns1
282    namespaces = new HashSet<>();
283    namespaces.add("ns1");
284    peerConfigBuilder.setReplicateAllUserTables(false).setNamespaces(namespaces);
285    when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
286    userEntry = createEntry(null, a, b, c);
287    filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
288    assertEquals(null, filter.filter(userEntry));
289
290    // 4. replicate_all flag is false, and config namespaces and table-cfs both
291    // Namespaces config should not confict with table-cfs config
292    namespaces = new HashSet<>();
293    tableCfs = new HashMap<>();
294    namespaces.add("ns1");
295    tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a", "c"));
296    peerConfigBuilder.setReplicateAllUserTables(false).setNamespaces(namespaces)
297      .setTableCFsMap(tableCfs);
298    when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
299    userEntry = createEntry(null, a, b, c);
300    filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
301    assertEquals(createEntry(null, a, c), filter.filter(userEntry));
302
303    namespaces = new HashSet<>();
304    tableCfs = new HashMap<>();
305    namespaces.add("default");
306    tableCfs.put(TableName.valueOf("ns1:foo"), Lists.newArrayList("a", "c"));
307    peerConfigBuilder.setReplicateAllUserTables(false).setNamespaces(namespaces)
308      .setTableCFsMap(tableCfs);
309    when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
310    userEntry = createEntry(null, a, b, c);
311    filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
312    assertEquals(createEntry(null, a, b, c), filter.filter(userEntry));
313
314    namespaces = new HashSet<>();
315    tableCfs = new HashMap<>();
316    namespaces.add("ns1");
317    tableCfs.put(TableName.valueOf("bar"), null);
318    peerConfigBuilder.setReplicateAllUserTables(false).setNamespaces(namespaces)
319      .setTableCFsMap(tableCfs);
320    when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
321    userEntry = createEntry(null, a, b, c);
322    filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
323    assertEquals(null, filter.filter(userEntry));
324  }
325
326  @Test
327  public void testNamespaceTableCfWALEntryFilter2() {
328    ReplicationPeer peer = mock(ReplicationPeer.class);
329    ReplicationPeerConfigBuilder peerConfigBuilder = ReplicationPeerConfig.newBuilder();
330
331    // 1. replicate_all flag is true
332    // and no exclude namespaces and no exclude table-cfs config
333    peerConfigBuilder.setReplicateAllUserTables(true)
334      .setExcludeNamespaces(null)
335      .setExcludeTableCFsMap(null);
336    when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
337    Entry userEntry = createEntry(null, a, b, c);
338    ChainWALEntryFilter filter =
339        new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
340    assertEquals(createEntry(null, a, b, c), filter.filter(userEntry));
341
342    // 2. replicate_all flag is true, and only config exclude namespaces
343    // empty set
344    Set<String> namespaces = new HashSet<String>();
345    peerConfigBuilder.setExcludeNamespaces(namespaces).setExcludeTableCFsMap(null);
346    when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
347    userEntry = createEntry(null, a, b, c);
348    filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
349    assertEquals(createEntry(null, a, b, c), filter.filter(userEntry));
350
351    // exclude namespace default
352    namespaces.add("default");
353    peerConfigBuilder.setExcludeNamespaces(namespaces).setExcludeTableCFsMap(null);
354    when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
355    userEntry = createEntry(null, a, b, c);
356    filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
357    assertEquals(null, filter.filter(userEntry));
358
359    // exclude namespace ns1
360    namespaces = new HashSet<String>();
361    namespaces.add("ns1");
362    peerConfigBuilder.setExcludeNamespaces(namespaces).setExcludeTableCFsMap(null);
363    when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
364    userEntry = createEntry(null, a, b, c);
365    filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
366    assertEquals(createEntry(null, a, b, c), filter.filter(userEntry));
367
368    // 3. replicate_all flag is true, and only config exclude table-cfs
369    // empty table-cfs map
370    Map<TableName, List<String>> tableCfs = new HashMap<TableName, List<String>>();
371    peerConfigBuilder.setExcludeNamespaces(null).setExcludeTableCFsMap(tableCfs);
372    when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
373    userEntry = createEntry(null, a, b, c);
374    filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
375    assertEquals(createEntry(null, a, b, c), filter.filter(userEntry));
376
377    // exclude table bar
378    tableCfs = new HashMap<TableName, List<String>>();
379    tableCfs.put(TableName.valueOf("bar"), null);
380    peerConfigBuilder.setExcludeNamespaces(null).setExcludeTableCFsMap(tableCfs);
381    when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
382    userEntry = createEntry(null, a, b, c);
383    filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
384    assertEquals(createEntry(null, a, b, c), filter.filter(userEntry));
385
386    // exclude table foo:a
387    tableCfs = new HashMap<TableName, List<String>>();
388    tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a"));
389    peerConfigBuilder.setExcludeNamespaces(null).setExcludeTableCFsMap(tableCfs);
390    when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
391    userEntry = createEntry(null, a, b, c);
392    filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
393    assertEquals(createEntry(null, b, c), filter.filter(userEntry));
394
395    // 4. replicate_all flag is true, and config exclude namespaces and table-cfs both
396    // exclude ns1 and table foo:a,c
397    namespaces = new HashSet<String>();
398    tableCfs = new HashMap<TableName, List<String>>();
399    namespaces.add("ns1");
400    tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a", "c"));
401    peerConfigBuilder.setExcludeNamespaces(namespaces).setExcludeTableCFsMap(tableCfs);
402    when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
403    userEntry = createEntry(null, a, b, c);
404    filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
405    assertEquals(createEntry(null, b), filter.filter(userEntry));
406
407    // exclude namespace default and table ns1:bar
408    namespaces = new HashSet<String>();
409    tableCfs = new HashMap<TableName, List<String>>();
410    namespaces.add("default");
411    tableCfs.put(TableName.valueOf("ns1:bar"), new ArrayList<String>());
412    peerConfigBuilder.setExcludeNamespaces(namespaces).setExcludeTableCFsMap(tableCfs);
413    when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
414    userEntry = createEntry(null, a, b, c);
415    filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
416    assertEquals(null, filter.filter(userEntry));
417  }
418
419  private Entry createEntry(TreeMap<byte[], Integer> scopes, byte[]... kvs) {
420    WALKeyImpl key1 = new WALKeyImpl(new byte[0], TableName.valueOf("foo"),
421      System.currentTimeMillis(), scopes);
422    WALEdit edit1 = new WALEdit();
423
424    for (byte[] kv : kvs) {
425      edit1.add(new KeyValue(kv, kv, kv));
426    }
427    return new Entry(key1, edit1);
428  }
429
430  private void assertEquals(Entry e1, Entry e2) {
431    Assert.assertEquals(e1 == null, e2 == null);
432    if (e1 == null) {
433      return;
434    }
435
436    // do not compare WALKeys
437
438    // compare kvs
439    Assert.assertEquals(e1.getEdit() == null, e2.getEdit() == null);
440    if (e1.getEdit() == null) {
441      return;
442    }
443    List<Cell> cells1 = e1.getEdit().getCells();
444    List<Cell> cells2 = e2.getEdit().getCells();
445    Assert.assertEquals(cells1.size(), cells2.size());
446    for (int i = 0; i < cells1.size(); i++) {
447      CellComparatorImpl.COMPARATOR.compare(cells1.get(i), cells2.get(i));
448    }
449  }
450}