001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.junit.Assert.assertNull;
021import static org.mockito.Mockito.mock;
022import static org.mockito.Mockito.when;
023
024import java.util.ArrayList;
025import java.util.HashMap;
026import java.util.HashSet;
027import java.util.List;
028import java.util.Map;
029import java.util.Set;
030import java.util.TreeMap;
031import org.apache.hadoop.hbase.Cell;
032import org.apache.hadoop.hbase.CellComparatorImpl;
033import org.apache.hadoop.hbase.HBaseClassTestRule;
034import org.apache.hadoop.hbase.HConstants;
035import org.apache.hadoop.hbase.HRegionInfo;
036import org.apache.hadoop.hbase.KeyValue;
037import org.apache.hadoop.hbase.TableName;
038import org.apache.hadoop.hbase.testclassification.ReplicationTests;
039import org.apache.hadoop.hbase.testclassification.SmallTests;
040import org.apache.hadoop.hbase.util.Bytes;
041import org.apache.hadoop.hbase.wal.WAL.Entry;
042import org.apache.hadoop.hbase.wal.WALEdit;
043import org.apache.hadoop.hbase.wal.WALKeyImpl;
044import org.junit.Assert;
045import org.junit.ClassRule;
046import org.junit.Test;
047import org.junit.experimental.categories.Category;
048
049import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
050
051@Category({ReplicationTests.class, SmallTests.class})
052public class TestReplicationWALEntryFilters {
053
054  @ClassRule
055  public static final HBaseClassTestRule CLASS_RULE =
056      HBaseClassTestRule.forClass(TestReplicationWALEntryFilters.class);
057
058  static byte[] a = new byte[] {'a'};
059  static byte[] b = new byte[] {'b'};
060  static byte[] c = new byte[] {'c'};
061  static byte[] d = new byte[] {'d'};
062
063  @Test
064  public void testSystemTableWALEntryFilter() {
065    SystemTableWALEntryFilter filter = new SystemTableWALEntryFilter();
066
067    // meta
068    WALKeyImpl key1 = new WALKeyImpl(HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes(),
069        TableName.META_TABLE_NAME, System.currentTimeMillis());
070    Entry metaEntry = new Entry(key1, null);
071
072    assertNull(filter.filter(metaEntry));
073
074    // ns table
075    WALKeyImpl key2 =
076        new WALKeyImpl(new byte[0], TableName.NAMESPACE_TABLE_NAME, System.currentTimeMillis());
077    Entry nsEntry = new Entry(key2, null);
078    assertNull(filter.filter(nsEntry));
079
080    // user table
081
082    WALKeyImpl key3 = new WALKeyImpl(new byte[0], TableName.valueOf("foo"),
083        System.currentTimeMillis());
084    Entry userEntry = new Entry(key3, null);
085
086    assertEquals(userEntry, filter.filter(userEntry));
087  }
088
089  @Test
090  public void testScopeWALEntryFilter() {
091    WALEntryFilter filter = new ChainWALEntryFilter(new ScopeWALEntryFilter());
092
093    Entry userEntry = createEntry(null, a, b);
094    Entry userEntryA = createEntry(null, a);
095    Entry userEntryB = createEntry(null, b);
096    Entry userEntryEmpty = createEntry(null);
097
098    // no scopes
099    assertEquals(null, filter.filter(userEntry));
100
101    // empty scopes
102    TreeMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
103    userEntry = createEntry(scopes, a, b);
104    assertEquals(null, filter.filter(userEntry));
105
106    // different scope
107    scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
108    scopes.put(c, HConstants.REPLICATION_SCOPE_GLOBAL);
109    userEntry = createEntry(scopes, a, b);
110    // all kvs should be filtered
111    assertEquals(userEntryEmpty, filter.filter(userEntry));
112
113    // local scope
114    scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
115    scopes.put(a, HConstants.REPLICATION_SCOPE_LOCAL);
116    userEntry = createEntry(scopes, a, b);
117    assertEquals(userEntryEmpty, filter.filter(userEntry));
118    scopes.put(b, HConstants.REPLICATION_SCOPE_LOCAL);
119    assertEquals(userEntryEmpty, filter.filter(userEntry));
120
121    // only scope a
122    scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
123    scopes.put(a, HConstants.REPLICATION_SCOPE_GLOBAL);
124    userEntry = createEntry(scopes, a, b);
125    assertEquals(userEntryA, filter.filter(userEntry));
126    scopes.put(b, HConstants.REPLICATION_SCOPE_LOCAL);
127    assertEquals(userEntryA, filter.filter(userEntry));
128
129    // only scope b
130    scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
131    scopes.put(b, HConstants.REPLICATION_SCOPE_GLOBAL);
132    userEntry = createEntry(scopes, a, b);
133    assertEquals(userEntryB, filter.filter(userEntry));
134    scopes.put(a, HConstants.REPLICATION_SCOPE_LOCAL);
135    assertEquals(userEntryB, filter.filter(userEntry));
136
137    // scope a and b
138    scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
139    scopes.put(b, HConstants.REPLICATION_SCOPE_GLOBAL);
140    userEntry = createEntry(scopes, a, b);
141    assertEquals(userEntryB, filter.filter(userEntry));
142    scopes.put(a, HConstants.REPLICATION_SCOPE_LOCAL);
143    assertEquals(userEntryB, filter.filter(userEntry));
144  }
145
146  WALEntryFilter nullFilter = new WALEntryFilter() {
147    @Override
148    public Entry filter(Entry entry) {
149      return null;
150    }
151  };
152
153  WALEntryFilter passFilter = new WALEntryFilter() {
154    @Override
155    public Entry filter(Entry entry) {
156      return entry;
157    }
158  };
159
160  @Test
161  public void testChainWALEntryFilter() {
162    Entry userEntry = createEntry(null, a, b, c);
163
164    ChainWALEntryFilter filter = new ChainWALEntryFilter(passFilter);
165    assertEquals(createEntry(null, a,b,c), filter.filter(userEntry));
166
167    filter = new ChainWALEntryFilter(passFilter, passFilter);
168    assertEquals(createEntry(null, a,b,c), filter.filter(userEntry));
169
170    filter = new ChainWALEntryFilter(passFilter, passFilter, passFilter);
171    assertEquals(createEntry(null, a,b,c), filter.filter(userEntry));
172
173    filter = new ChainWALEntryFilter(nullFilter);
174    assertEquals(null, filter.filter(userEntry));
175
176    filter = new ChainWALEntryFilter(nullFilter, passFilter);
177    assertEquals(null, filter.filter(userEntry));
178
179    filter = new ChainWALEntryFilter(passFilter, nullFilter);
180    assertEquals(null, filter.filter(userEntry));
181
182    filter = new ChainWALEntryFilter(nullFilter, passFilter, nullFilter);
183    assertEquals(null, filter.filter(userEntry));
184
185    filter = new ChainWALEntryFilter(nullFilter, nullFilter);
186    assertEquals(null, filter.filter(userEntry));
187
188    // flatten
189    filter =
190        new ChainWALEntryFilter(
191          new ChainWALEntryFilter(passFilter,
192            new ChainWALEntryFilter(passFilter, passFilter),
193          new ChainWALEntryFilter(passFilter),
194          new ChainWALEntryFilter(passFilter)),
195          new ChainWALEntryFilter(passFilter));
196    assertEquals(createEntry(null, a,b,c), filter.filter(userEntry));
197
198
199    filter =
200        new ChainWALEntryFilter(
201          new ChainWALEntryFilter(passFilter,
202            new ChainWALEntryFilter(passFilter,
203              new ChainWALEntryFilter(nullFilter))),
204          new ChainWALEntryFilter(passFilter));
205    assertEquals(null, filter.filter(userEntry));
206  }
207
208  @Test
209  public void testNamespaceTableCfWALEntryFilter() {
210    ReplicationPeer peer = mock(ReplicationPeer.class);
211    ReplicationPeerConfig peerConfig = mock(ReplicationPeerConfig.class);
212
213    // 1. replicate_all flag is false, no namespaces and table-cfs config
214    when(peerConfig.replicateAllUserTables()).thenReturn(false);
215    when(peerConfig.getNamespaces()).thenReturn(null);
216    when(peerConfig.getTableCFsMap()).thenReturn(null);
217    when(peer.getPeerConfig()).thenReturn(peerConfig);
218    Entry userEntry = createEntry(null, a, b, c);
219    ChainWALEntryFilter filter =
220        new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
221    assertEquals(null, filter.filter(userEntry));
222
223    // 2. replicate_all flag is false, and only config table-cfs in peer
224    // empty map
225    userEntry = createEntry(null, a, b, c);
226    Map<TableName, List<String>> tableCfs = new HashMap<>();
227    when(peerConfig.replicateAllUserTables()).thenReturn(false);
228    when(peerConfig.getTableCFsMap()).thenReturn(tableCfs);
229    when(peer.getPeerConfig()).thenReturn(peerConfig);
230    filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
231    assertEquals(null, filter.filter(userEntry));
232
233    // table bar
234    userEntry = createEntry(null, a, b, c);
235    tableCfs = new HashMap<>();
236    tableCfs.put(TableName.valueOf("bar"), null);
237    when(peerConfig.replicateAllUserTables()).thenReturn(false);
238    when(peerConfig.getTableCFsMap()).thenReturn(tableCfs);
239    when(peer.getPeerConfig()).thenReturn(peerConfig);
240    filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
241    assertEquals(null, filter.filter(userEntry));
242
243    // table foo:a
244    userEntry = createEntry(null, a, b, c);
245    tableCfs = new HashMap<>();
246    tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a"));
247    when(peerConfig.replicateAllUserTables()).thenReturn(false);
248    when(peerConfig.getTableCFsMap()).thenReturn(tableCfs);
249    when(peer.getPeerConfig()).thenReturn(peerConfig);
250    filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
251    assertEquals(createEntry(null, a), filter.filter(userEntry));
252
253    // table foo:a,c
254    userEntry = createEntry(null, a, b, c, d);
255    tableCfs = new HashMap<>();
256    tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a", "c"));
257    when(peerConfig.replicateAllUserTables()).thenReturn(false);
258    when(peerConfig.getTableCFsMap()).thenReturn(tableCfs);
259    when(peer.getPeerConfig()).thenReturn(peerConfig);
260    filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
261    assertEquals(createEntry(null, a,c), filter.filter(userEntry));
262
263    // 3. replicate_all flag is false, and only config namespaces in peer
264    when(peer.getTableCFs()).thenReturn(null);
265    // empty set
266    Set<String> namespaces = new HashSet<>();
267    when(peerConfig.replicateAllUserTables()).thenReturn(false);
268    when(peerConfig.getNamespaces()).thenReturn(namespaces);
269    when(peerConfig.getTableCFsMap()).thenReturn(null);
270    when(peer.getPeerConfig()).thenReturn(peerConfig);
271    userEntry = createEntry(null, a, b, c);
272    filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
273    assertEquals(null, filter.filter(userEntry));
274
275    // namespace default
276    namespaces.add("default");
277    when(peerConfig.replicateAllUserTables()).thenReturn(false);
278    when(peerConfig.getNamespaces()).thenReturn(namespaces);
279    when(peer.getPeerConfig()).thenReturn(peerConfig);
280    userEntry = createEntry(null, a, b, c);
281    filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
282    assertEquals(createEntry(null, a,b,c), filter.filter(userEntry));
283
284    // namespace ns1
285    namespaces = new HashSet<>();
286    namespaces.add("ns1");
287    when(peerConfig.replicateAllUserTables()).thenReturn(false);
288    when(peerConfig.getNamespaces()).thenReturn(namespaces);
289    when(peer.getPeerConfig()).thenReturn(peerConfig);
290    userEntry = createEntry(null, a, b, c);
291    filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
292    assertEquals(null, filter.filter(userEntry));
293
294    // 4. replicate_all flag is false, and config namespaces and table-cfs both
295    // Namespaces config should not confict with table-cfs config
296    namespaces = new HashSet<>();
297    tableCfs = new HashMap<>();
298    namespaces.add("ns1");
299    tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a", "c"));
300    when(peerConfig.replicateAllUserTables()).thenReturn(false);
301    when(peerConfig.getNamespaces()).thenReturn(namespaces);
302    when(peerConfig.getTableCFsMap()).thenReturn(tableCfs);
303    when(peer.getPeerConfig()).thenReturn(peerConfig);
304    userEntry = createEntry(null, a, b, c);
305    filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
306    assertEquals(createEntry(null, a, c), filter.filter(userEntry));
307
308    namespaces = new HashSet<>();
309    tableCfs = new HashMap<>();
310    namespaces.add("default");
311    tableCfs.put(TableName.valueOf("ns1:foo"), Lists.newArrayList("a", "c"));
312    when(peerConfig.replicateAllUserTables()).thenReturn(false);
313    when(peerConfig.getNamespaces()).thenReturn(namespaces);
314    when(peerConfig.getTableCFsMap()).thenReturn(tableCfs);
315    when(peer.getPeerConfig()).thenReturn(peerConfig);
316    userEntry = createEntry(null, a, b, c);
317    filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
318    assertEquals(createEntry(null, a, b, c), filter.filter(userEntry));
319
320    namespaces = new HashSet<>();
321    tableCfs = new HashMap<>();
322    namespaces.add("ns1");
323    tableCfs.put(TableName.valueOf("bar"), null);
324    when(peerConfig.replicateAllUserTables()).thenReturn(false);
325    when(peerConfig.getNamespaces()).thenReturn(namespaces);
326    when(peerConfig.getTableCFsMap()).thenReturn(tableCfs);
327    when(peer.getPeerConfig()).thenReturn(peerConfig);
328    userEntry = createEntry(null, a, b, c);
329    filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
330    assertEquals(null, filter.filter(userEntry));
331  }
332
333  @Test
334  public void testNamespaceTableCfWALEntryFilter2() {
335    ReplicationPeer peer = mock(ReplicationPeer.class);
336    ReplicationPeerConfig peerConfig = mock(ReplicationPeerConfig.class);
337
338    // 1. replicate_all flag is true
339    // and no exclude namespaces and no exclude table-cfs config
340    when(peerConfig.replicateAllUserTables()).thenReturn(true);
341    when(peerConfig.getExcludeNamespaces()).thenReturn(null);
342    when(peerConfig.getExcludeTableCFsMap()).thenReturn(null);
343    when(peer.getPeerConfig()).thenReturn(peerConfig);
344    Entry userEntry = createEntry(null, a, b, c);
345    ChainWALEntryFilter filter =
346        new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
347    assertEquals(createEntry(null, a, b, c), filter.filter(userEntry));
348
349    // 2. replicate_all flag is true, and only config exclude namespaces
350    // empty set
351    Set<String> namespaces = new HashSet<String>();
352    when(peerConfig.getExcludeNamespaces()).thenReturn(namespaces);
353    when(peerConfig.getExcludeTableCFsMap()).thenReturn(null);
354    when(peer.getPeerConfig()).thenReturn(peerConfig);
355    userEntry = createEntry(null, a, b, c);
356    filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
357    assertEquals(createEntry(null, a, b, c), filter.filter(userEntry));
358
359    // exclude namespace default
360    namespaces.add("default");
361    when(peerConfig.getExcludeNamespaces()).thenReturn(namespaces);
362    when(peerConfig.getExcludeTableCFsMap()).thenReturn(null);
363    when(peer.getPeerConfig()).thenReturn(peerConfig);
364    userEntry = createEntry(null, a, b, c);
365    filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
366    assertEquals(null, filter.filter(userEntry));
367
368    // exclude namespace ns1
369    namespaces = new HashSet<String>();
370    namespaces.add("ns1");
371    when(peerConfig.getExcludeNamespaces()).thenReturn(namespaces);
372    when(peerConfig.getExcludeTableCFsMap()).thenReturn(null);
373    when(peer.getPeerConfig()).thenReturn(peerConfig);
374    userEntry = createEntry(null, a, b, c);
375    filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
376    assertEquals(createEntry(null, a, b, c), filter.filter(userEntry));
377
378    // 3. replicate_all flag is true, and only config exclude table-cfs
379    // empty table-cfs map
380    Map<TableName, List<String>> tableCfs = new HashMap<TableName, List<String>>();
381    when(peerConfig.getExcludeNamespaces()).thenReturn(null);
382    when(peerConfig.getExcludeTableCFsMap()).thenReturn(tableCfs);
383    when(peer.getPeerConfig()).thenReturn(peerConfig);
384    userEntry = createEntry(null, a, b, c);
385    filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
386    assertEquals(createEntry(null, a, b, c), filter.filter(userEntry));
387
388    // exclude table bar
389    tableCfs = new HashMap<TableName, List<String>>();
390    tableCfs.put(TableName.valueOf("bar"), null);
391    when(peerConfig.getExcludeNamespaces()).thenReturn(null);
392    when(peerConfig.getExcludeTableCFsMap()).thenReturn(tableCfs);
393    when(peer.getPeerConfig()).thenReturn(peerConfig);
394    userEntry = createEntry(null, a, b, c);
395    filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
396    assertEquals(createEntry(null, a, b, c), filter.filter(userEntry));
397
398    // exclude table foo:a
399    tableCfs = new HashMap<TableName, List<String>>();
400    tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a"));
401    when(peerConfig.getExcludeNamespaces()).thenReturn(null);
402    when(peerConfig.getExcludeTableCFsMap()).thenReturn(tableCfs);
403    when(peer.getPeerConfig()).thenReturn(peerConfig);
404    userEntry = createEntry(null, a, b, c);
405    filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
406    assertEquals(createEntry(null, b, c), filter.filter(userEntry));
407
408    // 4. replicate_all flag is true, and config exclude namespaces and table-cfs both
409    // exclude ns1 and table foo:a,c
410    namespaces = new HashSet<String>();
411    tableCfs = new HashMap<TableName, List<String>>();
412    namespaces.add("ns1");
413    tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a", "c"));
414    when(peerConfig.getExcludeNamespaces()).thenReturn(namespaces);
415    when(peerConfig.getExcludeTableCFsMap()).thenReturn(tableCfs);
416    when(peer.getPeerConfig()).thenReturn(peerConfig);
417    userEntry = createEntry(null, a, b, c);
418    filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
419    assertEquals(createEntry(null, b), filter.filter(userEntry));
420
421    // exclude namespace default and table ns1:bar
422    namespaces = new HashSet<String>();
423    tableCfs = new HashMap<TableName, List<String>>();
424    namespaces.add("default");
425    tableCfs.put(TableName.valueOf("ns1:bar"), new ArrayList<String>());
426    when(peerConfig.getExcludeNamespaces()).thenReturn(namespaces);
427    when(peerConfig.getExcludeTableCFsMap()).thenReturn(tableCfs);
428    when(peer.getPeerConfig()).thenReturn(peerConfig);
429    userEntry = createEntry(null, a, b, c);
430    filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
431    assertEquals(null, filter.filter(userEntry));
432  }
433
434  private Entry createEntry(TreeMap<byte[], Integer> scopes, byte[]... kvs) {
435    WALKeyImpl key1 = new WALKeyImpl(new byte[0], TableName.valueOf("foo"),
436      System.currentTimeMillis(), scopes);
437    WALEdit edit1 = new WALEdit();
438
439    for (byte[] kv : kvs) {
440      edit1.add(new KeyValue(kv, kv, kv));
441    }
442    return new Entry(key1, edit1);
443  }
444
445  private void assertEquals(Entry e1, Entry e2) {
446    Assert.assertEquals(e1 == null, e2 == null);
447    if (e1 == null) {
448      return;
449    }
450
451    // do not compare WALKeys
452
453    // compare kvs
454    Assert.assertEquals(e1.getEdit() == null, e2.getEdit() == null);
455    if (e1.getEdit() == null) {
456      return;
457    }
458    List<Cell> cells1 = e1.getEdit().getCells();
459    List<Cell> cells2 = e2.getEdit().getCells();
460    Assert.assertEquals(cells1.size(), cells2.size());
461    for (int i = 0; i < cells1.size(); i++) {
462      CellComparatorImpl.COMPARATOR.compare(cells1.get(i), cells2.get(i));
463    }
464  }
465}