001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.junit.Assert.assertNull; 021import static org.mockito.Mockito.mock; 022import static org.mockito.Mockito.when; 023 024import java.util.ArrayList; 025import java.util.HashMap; 026import java.util.HashSet; 027import java.util.List; 028import java.util.Map; 029import java.util.Set; 030import java.util.TreeMap; 031import org.apache.hadoop.hbase.Cell; 032import org.apache.hadoop.hbase.CellComparatorImpl; 033import org.apache.hadoop.hbase.HBaseClassTestRule; 034import org.apache.hadoop.hbase.HConstants; 035import org.apache.hadoop.hbase.HRegionInfo; 036import org.apache.hadoop.hbase.KeyValue; 037import org.apache.hadoop.hbase.TableName; 038import org.apache.hadoop.hbase.testclassification.ReplicationTests; 039import org.apache.hadoop.hbase.testclassification.SmallTests; 040import org.apache.hadoop.hbase.util.Bytes; 041import org.apache.hadoop.hbase.wal.WAL.Entry; 042import org.apache.hadoop.hbase.wal.WALEdit; 043import org.apache.hadoop.hbase.wal.WALKeyImpl; 044import org.junit.Assert; 045import org.junit.ClassRule; 046import org.junit.Test; 047import org.junit.experimental.categories.Category; 048 049import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 050 051@Category({ReplicationTests.class, SmallTests.class}) 052public class TestReplicationWALEntryFilters { 053 054 @ClassRule 055 public static final HBaseClassTestRule CLASS_RULE = 056 HBaseClassTestRule.forClass(TestReplicationWALEntryFilters.class); 057 058 static byte[] a = new byte[] {'a'}; 059 static byte[] b = new byte[] {'b'}; 060 static byte[] c = new byte[] {'c'}; 061 static byte[] d = new byte[] {'d'}; 062 063 @Test 064 public void testSystemTableWALEntryFilter() { 065 SystemTableWALEntryFilter filter = new SystemTableWALEntryFilter(); 066 067 // meta 068 WALKeyImpl key1 = new WALKeyImpl(HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes(), 069 TableName.META_TABLE_NAME, System.currentTimeMillis()); 070 Entry metaEntry = new Entry(key1, null); 071 072 assertNull(filter.filter(metaEntry)); 073 074 // ns table 075 WALKeyImpl key2 = 076 new WALKeyImpl(new byte[0], TableName.NAMESPACE_TABLE_NAME, System.currentTimeMillis()); 077 Entry nsEntry = new Entry(key2, null); 078 assertNull(filter.filter(nsEntry)); 079 080 // user table 081 082 WALKeyImpl key3 = new WALKeyImpl(new byte[0], TableName.valueOf("foo"), 083 System.currentTimeMillis()); 084 Entry userEntry = new Entry(key3, null); 085 086 assertEquals(userEntry, filter.filter(userEntry)); 087 } 088 089 @Test 090 public void testScopeWALEntryFilter() { 091 WALEntryFilter filter = new ChainWALEntryFilter(new ScopeWALEntryFilter()); 092 093 Entry userEntry = createEntry(null, a, b); 094 Entry userEntryA = createEntry(null, a); 095 Entry userEntryB = createEntry(null, b); 096 Entry userEntryEmpty = createEntry(null); 097 098 // no scopes 099 assertEquals(null, filter.filter(userEntry)); 100 101 // empty scopes 102 TreeMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 103 userEntry = createEntry(scopes, a, b); 104 assertEquals(null, filter.filter(userEntry)); 105 106 // different scope 107 scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 108 scopes.put(c, HConstants.REPLICATION_SCOPE_GLOBAL); 109 userEntry = createEntry(scopes, a, b); 110 // all kvs should be filtered 111 assertEquals(userEntryEmpty, filter.filter(userEntry)); 112 113 // local scope 114 scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 115 scopes.put(a, HConstants.REPLICATION_SCOPE_LOCAL); 116 userEntry = createEntry(scopes, a, b); 117 assertEquals(userEntryEmpty, filter.filter(userEntry)); 118 scopes.put(b, HConstants.REPLICATION_SCOPE_LOCAL); 119 assertEquals(userEntryEmpty, filter.filter(userEntry)); 120 121 // only scope a 122 scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 123 scopes.put(a, HConstants.REPLICATION_SCOPE_GLOBAL); 124 userEntry = createEntry(scopes, a, b); 125 assertEquals(userEntryA, filter.filter(userEntry)); 126 scopes.put(b, HConstants.REPLICATION_SCOPE_LOCAL); 127 assertEquals(userEntryA, filter.filter(userEntry)); 128 129 // only scope b 130 scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 131 scopes.put(b, HConstants.REPLICATION_SCOPE_GLOBAL); 132 userEntry = createEntry(scopes, a, b); 133 assertEquals(userEntryB, filter.filter(userEntry)); 134 scopes.put(a, HConstants.REPLICATION_SCOPE_LOCAL); 135 assertEquals(userEntryB, filter.filter(userEntry)); 136 137 // scope a and b 138 scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 139 scopes.put(b, HConstants.REPLICATION_SCOPE_GLOBAL); 140 userEntry = createEntry(scopes, a, b); 141 assertEquals(userEntryB, filter.filter(userEntry)); 142 scopes.put(a, HConstants.REPLICATION_SCOPE_LOCAL); 143 assertEquals(userEntryB, filter.filter(userEntry)); 144 } 145 146 WALEntryFilter nullFilter = new WALEntryFilter() { 147 @Override 148 public Entry filter(Entry entry) { 149 return null; 150 } 151 }; 152 153 WALEntryFilter passFilter = new WALEntryFilter() { 154 @Override 155 public Entry filter(Entry entry) { 156 return entry; 157 } 158 }; 159 160 @Test 161 public void testChainWALEntryFilter() { 162 Entry userEntry = createEntry(null, a, b, c); 163 164 ChainWALEntryFilter filter = new ChainWALEntryFilter(passFilter); 165 assertEquals(createEntry(null, a,b,c), filter.filter(userEntry)); 166 167 filter = new ChainWALEntryFilter(passFilter, passFilter); 168 assertEquals(createEntry(null, a,b,c), filter.filter(userEntry)); 169 170 filter = new ChainWALEntryFilter(passFilter, passFilter, passFilter); 171 assertEquals(createEntry(null, a,b,c), filter.filter(userEntry)); 172 173 filter = new ChainWALEntryFilter(nullFilter); 174 assertEquals(null, filter.filter(userEntry)); 175 176 filter = new ChainWALEntryFilter(nullFilter, passFilter); 177 assertEquals(null, filter.filter(userEntry)); 178 179 filter = new ChainWALEntryFilter(passFilter, nullFilter); 180 assertEquals(null, filter.filter(userEntry)); 181 182 filter = new ChainWALEntryFilter(nullFilter, passFilter, nullFilter); 183 assertEquals(null, filter.filter(userEntry)); 184 185 filter = new ChainWALEntryFilter(nullFilter, nullFilter); 186 assertEquals(null, filter.filter(userEntry)); 187 188 // flatten 189 filter = 190 new ChainWALEntryFilter( 191 new ChainWALEntryFilter(passFilter, 192 new ChainWALEntryFilter(passFilter, passFilter), 193 new ChainWALEntryFilter(passFilter), 194 new ChainWALEntryFilter(passFilter)), 195 new ChainWALEntryFilter(passFilter)); 196 assertEquals(createEntry(null, a,b,c), filter.filter(userEntry)); 197 198 199 filter = 200 new ChainWALEntryFilter( 201 new ChainWALEntryFilter(passFilter, 202 new ChainWALEntryFilter(passFilter, 203 new ChainWALEntryFilter(nullFilter))), 204 new ChainWALEntryFilter(passFilter)); 205 assertEquals(null, filter.filter(userEntry)); 206 } 207 208 @Test 209 public void testNamespaceTableCfWALEntryFilter() { 210 ReplicationPeer peer = mock(ReplicationPeer.class); 211 ReplicationPeerConfig peerConfig = mock(ReplicationPeerConfig.class); 212 213 // 1. replicate_all flag is false, no namespaces and table-cfs config 214 when(peerConfig.replicateAllUserTables()).thenReturn(false); 215 when(peerConfig.getNamespaces()).thenReturn(null); 216 when(peerConfig.getTableCFsMap()).thenReturn(null); 217 when(peer.getPeerConfig()).thenReturn(peerConfig); 218 Entry userEntry = createEntry(null, a, b, c); 219 ChainWALEntryFilter filter = 220 new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); 221 assertEquals(null, filter.filter(userEntry)); 222 223 // 2. replicate_all flag is false, and only config table-cfs in peer 224 // empty map 225 userEntry = createEntry(null, a, b, c); 226 Map<TableName, List<String>> tableCfs = new HashMap<>(); 227 when(peerConfig.replicateAllUserTables()).thenReturn(false); 228 when(peerConfig.getTableCFsMap()).thenReturn(tableCfs); 229 when(peer.getPeerConfig()).thenReturn(peerConfig); 230 filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); 231 assertEquals(null, filter.filter(userEntry)); 232 233 // table bar 234 userEntry = createEntry(null, a, b, c); 235 tableCfs = new HashMap<>(); 236 tableCfs.put(TableName.valueOf("bar"), null); 237 when(peerConfig.replicateAllUserTables()).thenReturn(false); 238 when(peerConfig.getTableCFsMap()).thenReturn(tableCfs); 239 when(peer.getPeerConfig()).thenReturn(peerConfig); 240 filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); 241 assertEquals(null, filter.filter(userEntry)); 242 243 // table foo:a 244 userEntry = createEntry(null, a, b, c); 245 tableCfs = new HashMap<>(); 246 tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a")); 247 when(peerConfig.replicateAllUserTables()).thenReturn(false); 248 when(peerConfig.getTableCFsMap()).thenReturn(tableCfs); 249 when(peer.getPeerConfig()).thenReturn(peerConfig); 250 filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); 251 assertEquals(createEntry(null, a), filter.filter(userEntry)); 252 253 // table foo:a,c 254 userEntry = createEntry(null, a, b, c, d); 255 tableCfs = new HashMap<>(); 256 tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a", "c")); 257 when(peerConfig.replicateAllUserTables()).thenReturn(false); 258 when(peerConfig.getTableCFsMap()).thenReturn(tableCfs); 259 when(peer.getPeerConfig()).thenReturn(peerConfig); 260 filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); 261 assertEquals(createEntry(null, a,c), filter.filter(userEntry)); 262 263 // 3. replicate_all flag is false, and only config namespaces in peer 264 when(peer.getTableCFs()).thenReturn(null); 265 // empty set 266 Set<String> namespaces = new HashSet<>(); 267 when(peerConfig.replicateAllUserTables()).thenReturn(false); 268 when(peerConfig.getNamespaces()).thenReturn(namespaces); 269 when(peerConfig.getTableCFsMap()).thenReturn(null); 270 when(peer.getPeerConfig()).thenReturn(peerConfig); 271 userEntry = createEntry(null, a, b, c); 272 filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); 273 assertEquals(null, filter.filter(userEntry)); 274 275 // namespace default 276 namespaces.add("default"); 277 when(peerConfig.replicateAllUserTables()).thenReturn(false); 278 when(peerConfig.getNamespaces()).thenReturn(namespaces); 279 when(peer.getPeerConfig()).thenReturn(peerConfig); 280 userEntry = createEntry(null, a, b, c); 281 filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); 282 assertEquals(createEntry(null, a,b,c), filter.filter(userEntry)); 283 284 // namespace ns1 285 namespaces = new HashSet<>(); 286 namespaces.add("ns1"); 287 when(peerConfig.replicateAllUserTables()).thenReturn(false); 288 when(peerConfig.getNamespaces()).thenReturn(namespaces); 289 when(peer.getPeerConfig()).thenReturn(peerConfig); 290 userEntry = createEntry(null, a, b, c); 291 filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); 292 assertEquals(null, filter.filter(userEntry)); 293 294 // 4. replicate_all flag is false, and config namespaces and table-cfs both 295 // Namespaces config should not confict with table-cfs config 296 namespaces = new HashSet<>(); 297 tableCfs = new HashMap<>(); 298 namespaces.add("ns1"); 299 tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a", "c")); 300 when(peerConfig.replicateAllUserTables()).thenReturn(false); 301 when(peerConfig.getNamespaces()).thenReturn(namespaces); 302 when(peerConfig.getTableCFsMap()).thenReturn(tableCfs); 303 when(peer.getPeerConfig()).thenReturn(peerConfig); 304 userEntry = createEntry(null, a, b, c); 305 filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); 306 assertEquals(createEntry(null, a, c), filter.filter(userEntry)); 307 308 namespaces = new HashSet<>(); 309 tableCfs = new HashMap<>(); 310 namespaces.add("default"); 311 tableCfs.put(TableName.valueOf("ns1:foo"), Lists.newArrayList("a", "c")); 312 when(peerConfig.replicateAllUserTables()).thenReturn(false); 313 when(peerConfig.getNamespaces()).thenReturn(namespaces); 314 when(peerConfig.getTableCFsMap()).thenReturn(tableCfs); 315 when(peer.getPeerConfig()).thenReturn(peerConfig); 316 userEntry = createEntry(null, a, b, c); 317 filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); 318 assertEquals(createEntry(null, a, b, c), filter.filter(userEntry)); 319 320 namespaces = new HashSet<>(); 321 tableCfs = new HashMap<>(); 322 namespaces.add("ns1"); 323 tableCfs.put(TableName.valueOf("bar"), null); 324 when(peerConfig.replicateAllUserTables()).thenReturn(false); 325 when(peerConfig.getNamespaces()).thenReturn(namespaces); 326 when(peerConfig.getTableCFsMap()).thenReturn(tableCfs); 327 when(peer.getPeerConfig()).thenReturn(peerConfig); 328 userEntry = createEntry(null, a, b, c); 329 filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); 330 assertEquals(null, filter.filter(userEntry)); 331 } 332 333 @Test 334 public void testNamespaceTableCfWALEntryFilter2() { 335 ReplicationPeer peer = mock(ReplicationPeer.class); 336 ReplicationPeerConfig peerConfig = mock(ReplicationPeerConfig.class); 337 338 // 1. replicate_all flag is true 339 // and no exclude namespaces and no exclude table-cfs config 340 when(peerConfig.replicateAllUserTables()).thenReturn(true); 341 when(peerConfig.getExcludeNamespaces()).thenReturn(null); 342 when(peerConfig.getExcludeTableCFsMap()).thenReturn(null); 343 when(peer.getPeerConfig()).thenReturn(peerConfig); 344 Entry userEntry = createEntry(null, a, b, c); 345 ChainWALEntryFilter filter = 346 new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); 347 assertEquals(createEntry(null, a, b, c), filter.filter(userEntry)); 348 349 // 2. replicate_all flag is true, and only config exclude namespaces 350 // empty set 351 Set<String> namespaces = new HashSet<String>(); 352 when(peerConfig.getExcludeNamespaces()).thenReturn(namespaces); 353 when(peerConfig.getExcludeTableCFsMap()).thenReturn(null); 354 when(peer.getPeerConfig()).thenReturn(peerConfig); 355 userEntry = createEntry(null, a, b, c); 356 filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); 357 assertEquals(createEntry(null, a, b, c), filter.filter(userEntry)); 358 359 // exclude namespace default 360 namespaces.add("default"); 361 when(peerConfig.getExcludeNamespaces()).thenReturn(namespaces); 362 when(peerConfig.getExcludeTableCFsMap()).thenReturn(null); 363 when(peer.getPeerConfig()).thenReturn(peerConfig); 364 userEntry = createEntry(null, a, b, c); 365 filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); 366 assertEquals(null, filter.filter(userEntry)); 367 368 // exclude namespace ns1 369 namespaces = new HashSet<String>(); 370 namespaces.add("ns1"); 371 when(peerConfig.getExcludeNamespaces()).thenReturn(namespaces); 372 when(peerConfig.getExcludeTableCFsMap()).thenReturn(null); 373 when(peer.getPeerConfig()).thenReturn(peerConfig); 374 userEntry = createEntry(null, a, b, c); 375 filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); 376 assertEquals(createEntry(null, a, b, c), filter.filter(userEntry)); 377 378 // 3. replicate_all flag is true, and only config exclude table-cfs 379 // empty table-cfs map 380 Map<TableName, List<String>> tableCfs = new HashMap<TableName, List<String>>(); 381 when(peerConfig.getExcludeNamespaces()).thenReturn(null); 382 when(peerConfig.getExcludeTableCFsMap()).thenReturn(tableCfs); 383 when(peer.getPeerConfig()).thenReturn(peerConfig); 384 userEntry = createEntry(null, a, b, c); 385 filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); 386 assertEquals(createEntry(null, a, b, c), filter.filter(userEntry)); 387 388 // exclude table bar 389 tableCfs = new HashMap<TableName, List<String>>(); 390 tableCfs.put(TableName.valueOf("bar"), null); 391 when(peerConfig.getExcludeNamespaces()).thenReturn(null); 392 when(peerConfig.getExcludeTableCFsMap()).thenReturn(tableCfs); 393 when(peer.getPeerConfig()).thenReturn(peerConfig); 394 userEntry = createEntry(null, a, b, c); 395 filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); 396 assertEquals(createEntry(null, a, b, c), filter.filter(userEntry)); 397 398 // exclude table foo:a 399 tableCfs = new HashMap<TableName, List<String>>(); 400 tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a")); 401 when(peerConfig.getExcludeNamespaces()).thenReturn(null); 402 when(peerConfig.getExcludeTableCFsMap()).thenReturn(tableCfs); 403 when(peer.getPeerConfig()).thenReturn(peerConfig); 404 userEntry = createEntry(null, a, b, c); 405 filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); 406 assertEquals(createEntry(null, b, c), filter.filter(userEntry)); 407 408 // 4. replicate_all flag is true, and config exclude namespaces and table-cfs both 409 // exclude ns1 and table foo:a,c 410 namespaces = new HashSet<String>(); 411 tableCfs = new HashMap<TableName, List<String>>(); 412 namespaces.add("ns1"); 413 tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a", "c")); 414 when(peerConfig.getExcludeNamespaces()).thenReturn(namespaces); 415 when(peerConfig.getExcludeTableCFsMap()).thenReturn(tableCfs); 416 when(peer.getPeerConfig()).thenReturn(peerConfig); 417 userEntry = createEntry(null, a, b, c); 418 filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); 419 assertEquals(createEntry(null, b), filter.filter(userEntry)); 420 421 // exclude namespace default and table ns1:bar 422 namespaces = new HashSet<String>(); 423 tableCfs = new HashMap<TableName, List<String>>(); 424 namespaces.add("default"); 425 tableCfs.put(TableName.valueOf("ns1:bar"), new ArrayList<String>()); 426 when(peerConfig.getExcludeNamespaces()).thenReturn(namespaces); 427 when(peerConfig.getExcludeTableCFsMap()).thenReturn(tableCfs); 428 when(peer.getPeerConfig()).thenReturn(peerConfig); 429 userEntry = createEntry(null, a, b, c); 430 filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); 431 assertEquals(null, filter.filter(userEntry)); 432 } 433 434 private Entry createEntry(TreeMap<byte[], Integer> scopes, byte[]... kvs) { 435 WALKeyImpl key1 = new WALKeyImpl(new byte[0], TableName.valueOf("foo"), 436 System.currentTimeMillis(), scopes); 437 WALEdit edit1 = new WALEdit(); 438 439 for (byte[] kv : kvs) { 440 edit1.add(new KeyValue(kv, kv, kv)); 441 } 442 return new Entry(key1, edit1); 443 } 444 445 private void assertEquals(Entry e1, Entry e2) { 446 Assert.assertEquals(e1 == null, e2 == null); 447 if (e1 == null) { 448 return; 449 } 450 451 // do not compare WALKeys 452 453 // compare kvs 454 Assert.assertEquals(e1.getEdit() == null, e2.getEdit() == null); 455 if (e1.getEdit() == null) { 456 return; 457 } 458 List<Cell> cells1 = e1.getEdit().getCells(); 459 List<Cell> cells2 = e2.getEdit().getCells(); 460 Assert.assertEquals(cells1.size(), cells2.size()); 461 for (int i = 0; i < cells1.size(); i++) { 462 CellComparatorImpl.COMPARATOR.compare(cells1.get(i), cells2.get(i)); 463 } 464 } 465}