001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.junit.Assert.assertNull; 021import static org.junit.Assert.assertTrue; 022import static org.mockito.Mockito.mock; 023import static org.mockito.Mockito.when; 024 025import java.util.ArrayList; 026import java.util.HashMap; 027import java.util.HashSet; 028import java.util.List; 029import java.util.Map; 030import java.util.Set; 031import java.util.TreeMap; 032import org.apache.hadoop.hbase.Cell; 033import org.apache.hadoop.hbase.CellComparatorImpl; 034import org.apache.hadoop.hbase.HBaseClassTestRule; 035import org.apache.hadoop.hbase.HConstants; 036import org.apache.hadoop.hbase.KeyValue; 037import org.apache.hadoop.hbase.TableName; 038import org.apache.hadoop.hbase.client.RegionInfoBuilder; 039import org.apache.hadoop.hbase.testclassification.ReplicationTests; 040import org.apache.hadoop.hbase.testclassification.SmallTests; 041import org.apache.hadoop.hbase.util.Bytes; 042import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 043import org.apache.hadoop.hbase.wal.WAL.Entry; 044import org.apache.hadoop.hbase.wal.WALEdit; 045import org.apache.hadoop.hbase.wal.WALKeyImpl; 046import org.junit.Assert; 047import org.junit.ClassRule; 048import org.junit.Test; 049import org.junit.experimental.categories.Category; 050 051import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 052 053@Category({ ReplicationTests.class, SmallTests.class }) 054public class TestReplicationWALEntryFilters { 055 056 @ClassRule 057 public static final HBaseClassTestRule CLASS_RULE = 058 HBaseClassTestRule.forClass(TestReplicationWALEntryFilters.class); 059 060 static byte[] a = new byte[] { 'a' }; 061 static byte[] b = new byte[] { 'b' }; 062 static byte[] c = new byte[] { 'c' }; 063 static byte[] d = new byte[] { 'd' }; 064 065 @Test 066 public void testSystemTableWALEntryFilter() { 067 SystemTableWALEntryFilter filter = new SystemTableWALEntryFilter(); 068 069 // meta 070 WALKeyImpl key1 = 071 new WALKeyImpl(RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes(), 072 TableName.META_TABLE_NAME, EnvironmentEdgeManager.currentTime()); 073 Entry metaEntry = new Entry(key1, null); 074 075 assertNull(filter.filter(metaEntry)); 076 077 // ns table 078 WALKeyImpl key2 = new WALKeyImpl(new byte[0], TableName.NAMESPACE_TABLE_NAME, 079 EnvironmentEdgeManager.currentTime()); 080 Entry nsEntry = new Entry(key2, null); 081 assertNull(filter.filter(nsEntry)); 082 083 // user table 084 085 WALKeyImpl key3 = 086 new WALKeyImpl(new byte[0], TableName.valueOf("foo"), EnvironmentEdgeManager.currentTime()); 087 Entry userEntry = new Entry(key3, null); 088 089 assertEquals(userEntry, filter.filter(userEntry)); 090 } 091 092 @Test 093 public void testScopeWALEntryFilter() { 094 WALEntryFilter filter = new ChainWALEntryFilter(new ScopeWALEntryFilter()); 095 096 Entry userEntry = createEntry(null, a, b); 097 Entry userEntryA = createEntry(null, a); 098 Entry userEntryB = createEntry(null, b); 099 Entry userEntryEmpty = createEntry(null); 100 101 // no scopes 102 // now we will not filter out entries without a replication scope since serial replication still 103 // need the sequence id, but the cells will all be filtered out. 104 assertTrue(filter.filter(userEntry).getEdit().isEmpty()); 105 106 // empty scopes 107 // ditto 108 TreeMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 109 userEntry = createEntry(scopes, a, b); 110 assertTrue(filter.filter(userEntry).getEdit().isEmpty()); 111 112 // different scope 113 scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 114 scopes.put(c, HConstants.REPLICATION_SCOPE_GLOBAL); 115 userEntry = createEntry(scopes, a, b); 116 // all kvs should be filtered 117 assertEquals(userEntryEmpty, filter.filter(userEntry)); 118 119 // local scope 120 scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 121 scopes.put(a, HConstants.REPLICATION_SCOPE_LOCAL); 122 userEntry = createEntry(scopes, a, b); 123 assertEquals(userEntryEmpty, filter.filter(userEntry)); 124 scopes.put(b, HConstants.REPLICATION_SCOPE_LOCAL); 125 assertEquals(userEntryEmpty, filter.filter(userEntry)); 126 127 // only scope a 128 scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 129 scopes.put(a, HConstants.REPLICATION_SCOPE_GLOBAL); 130 userEntry = createEntry(scopes, a, b); 131 assertEquals(userEntryA, filter.filter(userEntry)); 132 scopes.put(b, HConstants.REPLICATION_SCOPE_LOCAL); 133 assertEquals(userEntryA, filter.filter(userEntry)); 134 135 // only scope b 136 scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 137 scopes.put(b, HConstants.REPLICATION_SCOPE_GLOBAL); 138 userEntry = createEntry(scopes, a, b); 139 assertEquals(userEntryB, filter.filter(userEntry)); 140 scopes.put(a, HConstants.REPLICATION_SCOPE_LOCAL); 141 assertEquals(userEntryB, filter.filter(userEntry)); 142 143 // scope a and b 144 scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 145 scopes.put(b, HConstants.REPLICATION_SCOPE_GLOBAL); 146 userEntry = createEntry(scopes, a, b); 147 assertEquals(userEntryB, filter.filter(userEntry)); 148 scopes.put(a, HConstants.REPLICATION_SCOPE_LOCAL); 149 assertEquals(userEntryB, filter.filter(userEntry)); 150 } 151 152 WALEntryFilter nullFilter = new WALEntryFilter() { 153 @Override 154 public Entry filter(Entry entry) { 155 return null; 156 } 157 }; 158 159 WALEntryFilter passFilter = new WALEntryFilter() { 160 @Override 161 public Entry filter(Entry entry) { 162 return entry; 163 } 164 }; 165 166 @Test 167 public void testChainWALEntryFilter() { 168 Entry userEntry = createEntry(null, a, b, c); 169 170 ChainWALEntryFilter filter = new ChainWALEntryFilter(passFilter); 171 assertEquals(createEntry(null, a, b, c), filter.filter(userEntry)); 172 173 filter = new ChainWALEntryFilter(passFilter, passFilter); 174 assertEquals(createEntry(null, a, b, c), filter.filter(userEntry)); 175 176 filter = new ChainWALEntryFilter(passFilter, passFilter, passFilter); 177 assertEquals(createEntry(null, a, b, c), filter.filter(userEntry)); 178 179 filter = new ChainWALEntryFilter(nullFilter); 180 assertEquals(null, filter.filter(userEntry)); 181 182 filter = new ChainWALEntryFilter(nullFilter, passFilter); 183 assertEquals(null, filter.filter(userEntry)); 184 185 filter = new ChainWALEntryFilter(passFilter, nullFilter); 186 assertEquals(null, filter.filter(userEntry)); 187 188 filter = new ChainWALEntryFilter(nullFilter, passFilter, nullFilter); 189 assertEquals(null, filter.filter(userEntry)); 190 191 filter = new ChainWALEntryFilter(nullFilter, nullFilter); 192 assertEquals(null, filter.filter(userEntry)); 193 194 // flatten 195 filter = new ChainWALEntryFilter( 196 new ChainWALEntryFilter(passFilter, new ChainWALEntryFilter(passFilter, passFilter), 197 new ChainWALEntryFilter(passFilter), new ChainWALEntryFilter(passFilter)), 198 new ChainWALEntryFilter(passFilter)); 199 assertEquals(createEntry(null, a, b, c), filter.filter(userEntry)); 200 201 filter = new ChainWALEntryFilter( 202 new ChainWALEntryFilter(passFilter, 203 new ChainWALEntryFilter(passFilter, new ChainWALEntryFilter(nullFilter))), 204 new ChainWALEntryFilter(passFilter)); 205 assertEquals(null, filter.filter(userEntry)); 206 } 207 208 @Test 209 public void testNamespaceTableCfWALEntryFilter() { 210 ReplicationPeer peer = mock(ReplicationPeer.class); 211 ReplicationPeerConfigBuilder peerConfigBuilder = ReplicationPeerConfig.newBuilder(); 212 213 // 1. replicate_all flag is false, no namespaces and table-cfs config 214 peerConfigBuilder.setReplicateAllUserTables(false).setNamespaces(null).setTableCFsMap(null); 215 when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build()); 216 Entry userEntry = createEntry(null, a, b, c); 217 ChainWALEntryFilter filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); 218 assertEquals(null, filter.filter(userEntry)); 219 220 // 2. replicate_all flag is false, and only config table-cfs in peer 221 // empty map 222 userEntry = createEntry(null, a, b, c); 223 Map<TableName, List<String>> tableCfs = new HashMap<>(); 224 peerConfigBuilder.setReplicateAllUserTables(false).setTableCFsMap(tableCfs); 225 when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build()); 226 filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); 227 assertEquals(null, filter.filter(userEntry)); 228 229 // table bar 230 userEntry = createEntry(null, a, b, c); 231 tableCfs = new HashMap<>(); 232 tableCfs.put(TableName.valueOf("bar"), null); 233 peerConfigBuilder.setReplicateAllUserTables(false).setTableCFsMap(tableCfs); 234 when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build()); 235 filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); 236 assertEquals(null, filter.filter(userEntry)); 237 238 // table foo:a 239 userEntry = createEntry(null, a, b, c); 240 tableCfs = new HashMap<>(); 241 tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a")); 242 peerConfigBuilder.setReplicateAllUserTables(false).setTableCFsMap(tableCfs); 243 when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build()); 244 filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); 245 assertEquals(createEntry(null, a), filter.filter(userEntry)); 246 247 // table foo:a,c 248 userEntry = createEntry(null, a, b, c, d); 249 tableCfs = new HashMap<>(); 250 tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a", "c")); 251 peerConfigBuilder.setReplicateAllUserTables(false).setTableCFsMap(tableCfs); 252 when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build()); 253 filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); 254 assertEquals(createEntry(null, a, c), filter.filter(userEntry)); 255 256 // 3. replicate_all flag is false, and only config namespaces in peer 257 when(peer.getTableCFs()).thenReturn(null); 258 // empty set 259 Set<String> namespaces = new HashSet<>(); 260 peerConfigBuilder.setReplicateAllUserTables(false).setNamespaces(namespaces) 261 .setTableCFsMap(null); 262 when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build()); 263 userEntry = createEntry(null, a, b, c); 264 filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); 265 assertEquals(null, filter.filter(userEntry)); 266 267 // namespace default 268 namespaces.add("default"); 269 peerConfigBuilder.setReplicateAllUserTables(false).setNamespaces(namespaces); 270 when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build()); 271 userEntry = createEntry(null, a, b, c); 272 filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); 273 assertEquals(createEntry(null, a, b, c), filter.filter(userEntry)); 274 275 // namespace ns1 276 namespaces = new HashSet<>(); 277 namespaces.add("ns1"); 278 peerConfigBuilder.setReplicateAllUserTables(false).setNamespaces(namespaces); 279 when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build()); 280 userEntry = createEntry(null, a, b, c); 281 filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); 282 assertEquals(null, filter.filter(userEntry)); 283 284 // 4. replicate_all flag is false, and config namespaces and table-cfs both 285 // Namespaces config should not confict with table-cfs config 286 namespaces = new HashSet<>(); 287 tableCfs = new HashMap<>(); 288 namespaces.add("ns1"); 289 tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a", "c")); 290 peerConfigBuilder.setReplicateAllUserTables(false).setNamespaces(namespaces) 291 .setTableCFsMap(tableCfs); 292 when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build()); 293 userEntry = createEntry(null, a, b, c); 294 filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); 295 assertEquals(createEntry(null, a, c), filter.filter(userEntry)); 296 297 namespaces = new HashSet<>(); 298 tableCfs = new HashMap<>(); 299 namespaces.add("default"); 300 tableCfs.put(TableName.valueOf("ns1:foo"), Lists.newArrayList("a", "c")); 301 peerConfigBuilder.setReplicateAllUserTables(false).setNamespaces(namespaces) 302 .setTableCFsMap(tableCfs); 303 when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build()); 304 userEntry = createEntry(null, a, b, c); 305 filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); 306 assertEquals(createEntry(null, a, b, c), filter.filter(userEntry)); 307 308 namespaces = new HashSet<>(); 309 tableCfs = new HashMap<>(); 310 namespaces.add("ns1"); 311 tableCfs.put(TableName.valueOf("bar"), null); 312 peerConfigBuilder.setReplicateAllUserTables(false).setNamespaces(namespaces) 313 .setTableCFsMap(tableCfs); 314 when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build()); 315 userEntry = createEntry(null, a, b, c); 316 filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); 317 assertEquals(null, filter.filter(userEntry)); 318 } 319 320 @Test 321 public void testNamespaceTableCfWALEntryFilter2() { 322 ReplicationPeer peer = mock(ReplicationPeer.class); 323 ReplicationPeerConfigBuilder peerConfigBuilder = ReplicationPeerConfig.newBuilder(); 324 325 // 1. replicate_all flag is true 326 // and no exclude namespaces and no exclude table-cfs config 327 peerConfigBuilder.setReplicateAllUserTables(true).setExcludeNamespaces(null) 328 .setExcludeTableCFsMap(null); 329 when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build()); 330 Entry userEntry = createEntry(null, a, b, c); 331 ChainWALEntryFilter filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); 332 assertEquals(createEntry(null, a, b, c), filter.filter(userEntry)); 333 334 // 2. replicate_all flag is true, and only config exclude namespaces 335 // empty set 336 Set<String> namespaces = new HashSet<String>(); 337 peerConfigBuilder.setExcludeNamespaces(namespaces).setExcludeTableCFsMap(null); 338 when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build()); 339 userEntry = createEntry(null, a, b, c); 340 filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); 341 assertEquals(createEntry(null, a, b, c), filter.filter(userEntry)); 342 343 // exclude namespace default 344 namespaces.add("default"); 345 peerConfigBuilder.setExcludeNamespaces(namespaces).setExcludeTableCFsMap(null); 346 when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build()); 347 userEntry = createEntry(null, a, b, c); 348 filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); 349 assertEquals(null, filter.filter(userEntry)); 350 351 // exclude namespace ns1 352 namespaces = new HashSet<String>(); 353 namespaces.add("ns1"); 354 peerConfigBuilder.setExcludeNamespaces(namespaces).setExcludeTableCFsMap(null); 355 when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build()); 356 userEntry = createEntry(null, a, b, c); 357 filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); 358 assertEquals(createEntry(null, a, b, c), filter.filter(userEntry)); 359 360 // 3. replicate_all flag is true, and only config exclude table-cfs 361 // empty table-cfs map 362 Map<TableName, List<String>> tableCfs = new HashMap<TableName, List<String>>(); 363 peerConfigBuilder.setExcludeNamespaces(null).setExcludeTableCFsMap(tableCfs); 364 when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build()); 365 userEntry = createEntry(null, a, b, c); 366 filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); 367 assertEquals(createEntry(null, a, b, c), filter.filter(userEntry)); 368 369 // exclude table bar 370 tableCfs = new HashMap<TableName, List<String>>(); 371 tableCfs.put(TableName.valueOf("bar"), null); 372 peerConfigBuilder.setExcludeNamespaces(null).setExcludeTableCFsMap(tableCfs); 373 when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build()); 374 userEntry = createEntry(null, a, b, c); 375 filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); 376 assertEquals(createEntry(null, a, b, c), filter.filter(userEntry)); 377 378 // exclude table foo:a 379 tableCfs = new HashMap<TableName, List<String>>(); 380 tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a")); 381 peerConfigBuilder.setExcludeNamespaces(null).setExcludeTableCFsMap(tableCfs); 382 when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build()); 383 userEntry = createEntry(null, a, b, c); 384 filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); 385 assertEquals(createEntry(null, b, c), filter.filter(userEntry)); 386 387 // 4. replicate_all flag is true, and config exclude namespaces and table-cfs both 388 // exclude ns1 and table foo:a,c 389 namespaces = new HashSet<String>(); 390 tableCfs = new HashMap<TableName, List<String>>(); 391 namespaces.add("ns1"); 392 tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a", "c")); 393 peerConfigBuilder.setExcludeNamespaces(namespaces).setExcludeTableCFsMap(tableCfs); 394 when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build()); 395 userEntry = createEntry(null, a, b, c); 396 filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); 397 assertEquals(createEntry(null, b), filter.filter(userEntry)); 398 399 // exclude namespace default and table ns1:bar 400 namespaces = new HashSet<String>(); 401 tableCfs = new HashMap<TableName, List<String>>(); 402 namespaces.add("default"); 403 tableCfs.put(TableName.valueOf("ns1:bar"), new ArrayList<String>()); 404 peerConfigBuilder.setExcludeNamespaces(namespaces).setExcludeTableCFsMap(tableCfs); 405 when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build()); 406 userEntry = createEntry(null, a, b, c); 407 filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); 408 assertEquals(null, filter.filter(userEntry)); 409 } 410 411 private Entry createEntry(TreeMap<byte[], Integer> scopes, byte[]... kvs) { 412 WALKeyImpl key1 = new WALKeyImpl(new byte[0], TableName.valueOf("foo"), 413 EnvironmentEdgeManager.currentTime(), scopes); 414 WALEdit edit1 = new WALEdit(); 415 416 for (byte[] kv : kvs) { 417 edit1.add(new KeyValue(kv, kv, kv)); 418 } 419 return new Entry(key1, edit1); 420 } 421 422 private void assertEquals(Entry e1, Entry e2) { 423 Assert.assertEquals(e1 == null, e2 == null); 424 if (e1 == null) { 425 return; 426 } 427 428 // do not compare WALKeys 429 430 // compare kvs 431 Assert.assertEquals(e1.getEdit() == null, e2.getEdit() == null); 432 if (e1.getEdit() == null) { 433 return; 434 } 435 List<Cell> cells1 = e1.getEdit().getCells(); 436 List<Cell> cells2 = e2.getEdit().getCells(); 437 Assert.assertEquals(cells1.size(), cells2.size()); 438 for (int i = 0; i < cells1.size(); i++) { 439 CellComparatorImpl.COMPARATOR.compare(cells1.get(i), cells2.get(i)); 440 } 441 } 442}