001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.junit.Assert.assertNull; 021import static org.junit.Assert.assertTrue; 022import static org.mockito.Mockito.mock; 023import static org.mockito.Mockito.when; 024 025import java.util.ArrayList; 026import java.util.HashMap; 027import java.util.HashSet; 028import java.util.List; 029import java.util.Map; 030import java.util.Set; 031import java.util.TreeMap; 032import org.apache.hadoop.hbase.Cell; 033import org.apache.hadoop.hbase.CellComparatorImpl; 034import org.apache.hadoop.hbase.HBaseClassTestRule; 035import org.apache.hadoop.hbase.HConstants; 036import org.apache.hadoop.hbase.KeyValue; 037import org.apache.hadoop.hbase.TableName; 038import org.apache.hadoop.hbase.client.RegionInfoBuilder; 039import org.apache.hadoop.hbase.testclassification.ReplicationTests; 040import org.apache.hadoop.hbase.testclassification.SmallTests; 041import org.apache.hadoop.hbase.util.Bytes; 042import org.apache.hadoop.hbase.wal.WAL.Entry; 043import org.apache.hadoop.hbase.wal.WALEdit; 044import org.apache.hadoop.hbase.wal.WALKeyImpl; 045import org.junit.Assert; 046import org.junit.ClassRule; 047import org.junit.Test; 048import org.junit.experimental.categories.Category; 049 050import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 051 052@Category({ ReplicationTests.class, SmallTests.class }) 053public class TestReplicationWALEntryFilters { 054 055 @ClassRule 056 public static final HBaseClassTestRule CLASS_RULE = 057 HBaseClassTestRule.forClass(TestReplicationWALEntryFilters.class); 058 059 static byte[] a = new byte[] {'a'}; 060 static byte[] b = new byte[] {'b'}; 061 static byte[] c = new byte[] {'c'}; 062 static byte[] d = new byte[] {'d'}; 063 064 @Test 065 public void testSystemTableWALEntryFilter() { 066 SystemTableWALEntryFilter filter = new SystemTableWALEntryFilter(); 067 068 // meta 069 WALKeyImpl key1 = 070 new WALKeyImpl(RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes(), 071 TableName.META_TABLE_NAME, System.currentTimeMillis()); 072 Entry metaEntry = new Entry(key1, null); 073 074 assertNull(filter.filter(metaEntry)); 075 076 // ns table 077 WALKeyImpl key2 = 078 new WALKeyImpl(new byte[0], TableName.NAMESPACE_TABLE_NAME, System.currentTimeMillis()); 079 Entry nsEntry = new Entry(key2, null); 080 assertNull(filter.filter(nsEntry)); 081 082 // user table 083 084 WALKeyImpl key3 = new WALKeyImpl(new byte[0], TableName.valueOf("foo"), 085 System.currentTimeMillis()); 086 Entry userEntry = new Entry(key3, null); 087 088 assertEquals(userEntry, filter.filter(userEntry)); 089 } 090 091 @Test 092 public void testScopeWALEntryFilter() { 093 WALEntryFilter filter = new ChainWALEntryFilter(new ScopeWALEntryFilter()); 094 095 Entry userEntry = createEntry(null, a, b); 096 Entry userEntryA = createEntry(null, a); 097 Entry userEntryB = createEntry(null, b); 098 Entry userEntryEmpty = createEntry(null); 099 100 // no scopes 101 // now we will not filter out entries without a replication scope since serial replication still 102 // need the sequence id, but the cells will all be filtered out. 103 assertTrue(filter.filter(userEntry).getEdit().isEmpty()); 104 105 // empty scopes 106 // ditto 107 TreeMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 108 userEntry = createEntry(scopes, a, b); 109 assertTrue(filter.filter(userEntry).getEdit().isEmpty()); 110 111 // different scope 112 scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 113 scopes.put(c, HConstants.REPLICATION_SCOPE_GLOBAL); 114 userEntry = createEntry(scopes, a, b); 115 // all kvs should be filtered 116 assertEquals(userEntryEmpty, filter.filter(userEntry)); 117 118 // local scope 119 scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 120 scopes.put(a, HConstants.REPLICATION_SCOPE_LOCAL); 121 userEntry = createEntry(scopes, a, b); 122 assertEquals(userEntryEmpty, filter.filter(userEntry)); 123 scopes.put(b, HConstants.REPLICATION_SCOPE_LOCAL); 124 assertEquals(userEntryEmpty, filter.filter(userEntry)); 125 126 // only scope a 127 scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 128 scopes.put(a, HConstants.REPLICATION_SCOPE_GLOBAL); 129 userEntry = createEntry(scopes, a, b); 130 assertEquals(userEntryA, filter.filter(userEntry)); 131 scopes.put(b, HConstants.REPLICATION_SCOPE_LOCAL); 132 assertEquals(userEntryA, filter.filter(userEntry)); 133 134 // only scope b 135 scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 136 scopes.put(b, HConstants.REPLICATION_SCOPE_GLOBAL); 137 userEntry = createEntry(scopes, a, b); 138 assertEquals(userEntryB, filter.filter(userEntry)); 139 scopes.put(a, HConstants.REPLICATION_SCOPE_LOCAL); 140 assertEquals(userEntryB, filter.filter(userEntry)); 141 142 // scope a and b 143 scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 144 scopes.put(b, HConstants.REPLICATION_SCOPE_GLOBAL); 145 userEntry = createEntry(scopes, a, b); 146 assertEquals(userEntryB, filter.filter(userEntry)); 147 scopes.put(a, HConstants.REPLICATION_SCOPE_LOCAL); 148 assertEquals(userEntryB, filter.filter(userEntry)); 149 } 150 151 WALEntryFilter nullFilter = new WALEntryFilter() { 152 @Override 153 public Entry filter(Entry entry) { 154 return null; 155 } 156 }; 157 158 WALEntryFilter passFilter = new WALEntryFilter() { 159 @Override 160 public Entry filter(Entry entry) { 161 return entry; 162 } 163 }; 164 165 @Test 166 public void testChainWALEntryFilter() { 167 Entry userEntry = createEntry(null, a, b, c); 168 169 ChainWALEntryFilter filter = new ChainWALEntryFilter(passFilter); 170 assertEquals(createEntry(null, a,b,c), filter.filter(userEntry)); 171 172 filter = new ChainWALEntryFilter(passFilter, passFilter); 173 assertEquals(createEntry(null, a,b,c), filter.filter(userEntry)); 174 175 filter = new ChainWALEntryFilter(passFilter, passFilter, passFilter); 176 assertEquals(createEntry(null, a,b,c), filter.filter(userEntry)); 177 178 filter = new ChainWALEntryFilter(nullFilter); 179 assertEquals(null, filter.filter(userEntry)); 180 181 filter = new ChainWALEntryFilter(nullFilter, passFilter); 182 assertEquals(null, filter.filter(userEntry)); 183 184 filter = new ChainWALEntryFilter(passFilter, nullFilter); 185 assertEquals(null, filter.filter(userEntry)); 186 187 filter = new ChainWALEntryFilter(nullFilter, passFilter, nullFilter); 188 assertEquals(null, filter.filter(userEntry)); 189 190 filter = new ChainWALEntryFilter(nullFilter, nullFilter); 191 assertEquals(null, filter.filter(userEntry)); 192 193 // flatten 194 filter = 195 new ChainWALEntryFilter( 196 new ChainWALEntryFilter(passFilter, 197 new ChainWALEntryFilter(passFilter, passFilter), 198 new ChainWALEntryFilter(passFilter), 199 new ChainWALEntryFilter(passFilter)), 200 new ChainWALEntryFilter(passFilter)); 201 assertEquals(createEntry(null, a,b,c), filter.filter(userEntry)); 202 203 204 filter = 205 new ChainWALEntryFilter( 206 new ChainWALEntryFilter(passFilter, 207 new ChainWALEntryFilter(passFilter, 208 new ChainWALEntryFilter(nullFilter))), 209 new ChainWALEntryFilter(passFilter)); 210 assertEquals(null, filter.filter(userEntry)); 211 } 212 213 @Test 214 public void testNamespaceTableCfWALEntryFilter() { 215 ReplicationPeer peer = mock(ReplicationPeer.class); 216 ReplicationPeerConfigBuilder peerConfigBuilder = ReplicationPeerConfig.newBuilder(); 217 218 // 1. replicate_all flag is false, no namespaces and table-cfs config 219 peerConfigBuilder.setReplicateAllUserTables(false).setNamespaces(null).setTableCFsMap(null); 220 when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build()); 221 Entry userEntry = createEntry(null, a, b, c); 222 ChainWALEntryFilter filter = 223 new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); 224 assertEquals(null, filter.filter(userEntry)); 225 226 // 2. replicate_all flag is false, and only config table-cfs in peer 227 // empty map 228 userEntry = createEntry(null, a, b, c); 229 Map<TableName, List<String>> tableCfs = new HashMap<>(); 230 peerConfigBuilder.setReplicateAllUserTables(false).setTableCFsMap(tableCfs); 231 when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build()); 232 filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); 233 assertEquals(null, filter.filter(userEntry)); 234 235 // table bar 236 userEntry = createEntry(null, a, b, c); 237 tableCfs = new HashMap<>(); 238 tableCfs.put(TableName.valueOf("bar"), null); 239 peerConfigBuilder.setReplicateAllUserTables(false).setTableCFsMap(tableCfs); 240 when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build()); 241 filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); 242 assertEquals(null, filter.filter(userEntry)); 243 244 // table foo:a 245 userEntry = createEntry(null, a, b, c); 246 tableCfs = new HashMap<>(); 247 tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a")); 248 peerConfigBuilder.setReplicateAllUserTables(false).setTableCFsMap(tableCfs); 249 when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build()); 250 filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); 251 assertEquals(createEntry(null, a), filter.filter(userEntry)); 252 253 // table foo:a,c 254 userEntry = createEntry(null, a, b, c, d); 255 tableCfs = new HashMap<>(); 256 tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a", "c")); 257 peerConfigBuilder.setReplicateAllUserTables(false).setTableCFsMap(tableCfs); 258 when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build()); 259 filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); 260 assertEquals(createEntry(null, a,c), filter.filter(userEntry)); 261 262 // 3. replicate_all flag is false, and only config namespaces in peer 263 when(peer.getTableCFs()).thenReturn(null); 264 // empty set 265 Set<String> namespaces = new HashSet<>(); 266 peerConfigBuilder.setReplicateAllUserTables(false).setNamespaces(namespaces) 267 .setTableCFsMap(null); 268 when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build()); 269 userEntry = createEntry(null, a, b, c); 270 filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); 271 assertEquals(null, filter.filter(userEntry)); 272 273 // namespace default 274 namespaces.add("default"); 275 peerConfigBuilder.setReplicateAllUserTables(false).setNamespaces(namespaces); 276 when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build()); 277 userEntry = createEntry(null, a, b, c); 278 filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); 279 assertEquals(createEntry(null, a,b,c), filter.filter(userEntry)); 280 281 // namespace ns1 282 namespaces = new HashSet<>(); 283 namespaces.add("ns1"); 284 peerConfigBuilder.setReplicateAllUserTables(false).setNamespaces(namespaces); 285 when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build()); 286 userEntry = createEntry(null, a, b, c); 287 filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); 288 assertEquals(null, filter.filter(userEntry)); 289 290 // 4. replicate_all flag is false, and config namespaces and table-cfs both 291 // Namespaces config should not confict with table-cfs config 292 namespaces = new HashSet<>(); 293 tableCfs = new HashMap<>(); 294 namespaces.add("ns1"); 295 tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a", "c")); 296 peerConfigBuilder.setReplicateAllUserTables(false).setNamespaces(namespaces) 297 .setTableCFsMap(tableCfs); 298 when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build()); 299 userEntry = createEntry(null, a, b, c); 300 filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); 301 assertEquals(createEntry(null, a, c), filter.filter(userEntry)); 302 303 namespaces = new HashSet<>(); 304 tableCfs = new HashMap<>(); 305 namespaces.add("default"); 306 tableCfs.put(TableName.valueOf("ns1:foo"), Lists.newArrayList("a", "c")); 307 peerConfigBuilder.setReplicateAllUserTables(false).setNamespaces(namespaces) 308 .setTableCFsMap(tableCfs); 309 when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build()); 310 userEntry = createEntry(null, a, b, c); 311 filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); 312 assertEquals(createEntry(null, a, b, c), filter.filter(userEntry)); 313 314 namespaces = new HashSet<>(); 315 tableCfs = new HashMap<>(); 316 namespaces.add("ns1"); 317 tableCfs.put(TableName.valueOf("bar"), null); 318 peerConfigBuilder.setReplicateAllUserTables(false).setNamespaces(namespaces) 319 .setTableCFsMap(tableCfs); 320 when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build()); 321 userEntry = createEntry(null, a, b, c); 322 filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); 323 assertEquals(null, filter.filter(userEntry)); 324 } 325 326 @Test 327 public void testNamespaceTableCfWALEntryFilter2() { 328 ReplicationPeer peer = mock(ReplicationPeer.class); 329 ReplicationPeerConfigBuilder peerConfigBuilder = ReplicationPeerConfig.newBuilder(); 330 331 // 1. replicate_all flag is true 332 // and no exclude namespaces and no exclude table-cfs config 333 peerConfigBuilder.setReplicateAllUserTables(true) 334 .setExcludeNamespaces(null) 335 .setExcludeTableCFsMap(null); 336 when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build()); 337 Entry userEntry = createEntry(null, a, b, c); 338 ChainWALEntryFilter filter = 339 new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); 340 assertEquals(createEntry(null, a, b, c), filter.filter(userEntry)); 341 342 // 2. replicate_all flag is true, and only config exclude namespaces 343 // empty set 344 Set<String> namespaces = new HashSet<String>(); 345 peerConfigBuilder.setExcludeNamespaces(namespaces).setExcludeTableCFsMap(null); 346 when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build()); 347 userEntry = createEntry(null, a, b, c); 348 filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); 349 assertEquals(createEntry(null, a, b, c), filter.filter(userEntry)); 350 351 // exclude namespace default 352 namespaces.add("default"); 353 peerConfigBuilder.setExcludeNamespaces(namespaces).setExcludeTableCFsMap(null); 354 when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build()); 355 userEntry = createEntry(null, a, b, c); 356 filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); 357 assertEquals(null, filter.filter(userEntry)); 358 359 // exclude namespace ns1 360 namespaces = new HashSet<String>(); 361 namespaces.add("ns1"); 362 peerConfigBuilder.setExcludeNamespaces(namespaces).setExcludeTableCFsMap(null); 363 when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build()); 364 userEntry = createEntry(null, a, b, c); 365 filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); 366 assertEquals(createEntry(null, a, b, c), filter.filter(userEntry)); 367 368 // 3. replicate_all flag is true, and only config exclude table-cfs 369 // empty table-cfs map 370 Map<TableName, List<String>> tableCfs = new HashMap<TableName, List<String>>(); 371 peerConfigBuilder.setExcludeNamespaces(null).setExcludeTableCFsMap(tableCfs); 372 when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build()); 373 userEntry = createEntry(null, a, b, c); 374 filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); 375 assertEquals(createEntry(null, a, b, c), filter.filter(userEntry)); 376 377 // exclude table bar 378 tableCfs = new HashMap<TableName, List<String>>(); 379 tableCfs.put(TableName.valueOf("bar"), null); 380 peerConfigBuilder.setExcludeNamespaces(null).setExcludeTableCFsMap(tableCfs); 381 when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build()); 382 userEntry = createEntry(null, a, b, c); 383 filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); 384 assertEquals(createEntry(null, a, b, c), filter.filter(userEntry)); 385 386 // exclude table foo:a 387 tableCfs = new HashMap<TableName, List<String>>(); 388 tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a")); 389 peerConfigBuilder.setExcludeNamespaces(null).setExcludeTableCFsMap(tableCfs); 390 when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build()); 391 userEntry = createEntry(null, a, b, c); 392 filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); 393 assertEquals(createEntry(null, b, c), filter.filter(userEntry)); 394 395 // 4. replicate_all flag is true, and config exclude namespaces and table-cfs both 396 // exclude ns1 and table foo:a,c 397 namespaces = new HashSet<String>(); 398 tableCfs = new HashMap<TableName, List<String>>(); 399 namespaces.add("ns1"); 400 tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a", "c")); 401 peerConfigBuilder.setExcludeNamespaces(namespaces).setExcludeTableCFsMap(tableCfs); 402 when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build()); 403 userEntry = createEntry(null, a, b, c); 404 filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); 405 assertEquals(createEntry(null, b), filter.filter(userEntry)); 406 407 // exclude namespace default and table ns1:bar 408 namespaces = new HashSet<String>(); 409 tableCfs = new HashMap<TableName, List<String>>(); 410 namespaces.add("default"); 411 tableCfs.put(TableName.valueOf("ns1:bar"), new ArrayList<String>()); 412 peerConfigBuilder.setExcludeNamespaces(namespaces).setExcludeTableCFsMap(tableCfs); 413 when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build()); 414 userEntry = createEntry(null, a, b, c); 415 filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); 416 assertEquals(null, filter.filter(userEntry)); 417 } 418 419 private Entry createEntry(TreeMap<byte[], Integer> scopes, byte[]... kvs) { 420 WALKeyImpl key1 = new WALKeyImpl(new byte[0], TableName.valueOf("foo"), 421 System.currentTimeMillis(), scopes); 422 WALEdit edit1 = new WALEdit(); 423 424 for (byte[] kv : kvs) { 425 edit1.add(new KeyValue(kv, kv, kv)); 426 } 427 return new Entry(key1, edit1); 428 } 429 430 private void assertEquals(Entry e1, Entry e2) { 431 Assert.assertEquals(e1 == null, e2 == null); 432 if (e1 == null) { 433 return; 434 } 435 436 // do not compare WALKeys 437 438 // compare kvs 439 Assert.assertEquals(e1.getEdit() == null, e2.getEdit() == null); 440 if (e1.getEdit() == null) { 441 return; 442 } 443 List<Cell> cells1 = e1.getEdit().getCells(); 444 List<Cell> cells2 = e2.getEdit().getCells(); 445 Assert.assertEquals(cells1.size(), cells2.size()); 446 for (int i = 0; i < cells1.size(); i++) { 447 CellComparatorImpl.COMPARATOR.compare(cells1.get(i), cells2.get(i)); 448 } 449 } 450}