001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import java.io.IOException; 024import java.net.SocketAddress; 025import java.util.ArrayList; 026import java.util.List; 027import java.util.concurrent.CompletableFuture; 028import java.util.concurrent.atomic.AtomicInteger; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.hbase.Cell; 031import org.apache.hadoop.hbase.CellBuilder; 032import org.apache.hadoop.hbase.CellBuilderFactory; 033import org.apache.hadoop.hbase.CellBuilderType; 034import org.apache.hadoop.hbase.CellScanner; 035import org.apache.hadoop.hbase.HBaseClassTestRule; 036import org.apache.hadoop.hbase.HBaseConfiguration; 037import org.apache.hadoop.hbase.HConstants; 038import org.apache.hadoop.hbase.TableName; 039import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer; 040import org.apache.hadoop.hbase.client.AsyncClusterConnection; 041import org.apache.hadoop.hbase.client.AsyncConnection; 042import org.apache.hadoop.hbase.client.AsyncTable; 043import org.apache.hadoop.hbase.client.ClusterConnectionFactory; 044import org.apache.hadoop.hbase.client.ConnectionRegistry; 045import org.apache.hadoop.hbase.client.DoNothingConnectionRegistry; 046import org.apache.hadoop.hbase.client.DummyAsyncClusterConnection; 047import org.apache.hadoop.hbase.client.DummyAsyncTable; 048import org.apache.hadoop.hbase.client.Row; 049import org.apache.hadoop.hbase.security.User; 050import org.apache.hadoop.hbase.testclassification.ReplicationTests; 051import org.apache.hadoop.hbase.testclassification.SmallTests; 052import org.apache.hadoop.hbase.util.Bytes; 053import org.junit.ClassRule; 054import org.junit.Rule; 055import org.junit.Test; 056import org.junit.experimental.categories.Category; 057import org.junit.rules.TestName; 058 059import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 060 061import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; 062 063/** 064 * Simple test of sink-side wal entry filter facility. 065 */ 066@Category({ ReplicationTests.class, SmallTests.class }) 067public class TestWALEntrySinkFilter { 068 069 @ClassRule 070 public static final HBaseClassTestRule CLASS_RULE = 071 HBaseClassTestRule.forClass(TestWALEntrySinkFilter.class); 072 073 @Rule 074 public TestName name = new TestName(); 075 static final int BOUNDARY = 5; 076 static final AtomicInteger UNFILTERED = new AtomicInteger(); 077 static final AtomicInteger FILTERED = new AtomicInteger(); 078 079 /** 080 * Test filter. Filter will filter out any write time that is <= 5 (BOUNDARY). We count how many 081 * items we filter out and we count how many cells make it through for distribution way down below 082 * in the Table#batch implementation. Puts in place a custom DevNullConnection so we can insert 083 * our counting Table. 084 */ 085 @Test 086 public void testWALEntryFilter() throws IOException { 087 Configuration conf = HBaseConfiguration.create(); 088 // Make it so our filter is instantiated on construction of ReplicationSink. 089 conf.setClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, 090 DevNullConnectionRegistry.class, ConnectionRegistry.class); 091 conf.setClass(WALEntrySinkFilter.WAL_ENTRY_FILTER_KEY, 092 IfTimeIsGreaterThanBOUNDARYWALEntrySinkFilterImpl.class, WALEntrySinkFilter.class); 093 conf.setClass(ClusterConnectionFactory.HBASE_SERVER_CLUSTER_CONNECTION_IMPL, 094 DevNullAsyncClusterConnection.class, AsyncClusterConnection.class); 095 ReplicationSink sink = new ReplicationSink(conf, null); 096 // Create some dumb walentries. 097 List<AdminProtos.WALEntry> entries = new ArrayList<>(); 098 AdminProtos.WALEntry.Builder entryBuilder = AdminProtos.WALEntry.newBuilder(); 099 // Need a tablename. 100 ByteString tableName = 101 ByteString.copyFromUtf8(TableName.valueOf(this.name.getMethodName()).toString()); 102 // Add WALEdit Cells to Cells List. The way edits arrive at the sink is with protos 103 // describing the edit with all Cells from all edits aggregated in a single CellScanner. 104 final List<Cell> cells = new ArrayList<>(); 105 int count = BOUNDARY * 2; 106 for (int i = 0; i < count; i++) { 107 byte[] bytes = Bytes.toBytes(i); 108 // Create a wal entry. Everything is set to the current index as bytes or int/long. 109 entryBuilder.clear(); 110 entryBuilder.setKey(entryBuilder.getKeyBuilder().setLogSequenceNumber(i) 111 .setEncodedRegionName(ByteString.copyFrom(bytes)).setWriteTime(i).setTableName(tableName) 112 .build()); 113 // Lets have one Cell associated with each WALEdit. 114 entryBuilder.setAssociatedCellCount(1); 115 entries.add(entryBuilder.build()); 116 // We need to add a Cell per WALEdit to the cells array. 117 CellBuilder cellBuilder = CellBuilderFactory.create(CellBuilderType.DEEP_COPY); 118 // Make cells whose row, family, cell, value, and ts are == 'i'. 119 Cell cell = cellBuilder.setRow(bytes).setFamily(bytes).setQualifier(bytes) 120 .setType(Cell.Type.Put).setTimestamp(i).setValue(bytes).build(); 121 cells.add(cell); 122 } 123 // Now wrap our cells array in a CellScanner that we can pass in to replicateEntries. It has 124 // all Cells from all the WALEntries made above. 125 CellScanner cellScanner = new CellScanner() { 126 // Set to -1 because advance gets called before current. 127 int index = -1; 128 129 @Override 130 public Cell current() { 131 return cells.get(index); 132 } 133 134 @Override 135 public boolean advance() throws IOException { 136 index++; 137 return index < cells.size(); 138 } 139 }; 140 // Call our sink. 141 sink.replicateEntries(entries, cellScanner, null, null, null); 142 // Check what made it through and what was filtered. 143 assertTrue(FILTERED.get() > 0); 144 assertTrue(UNFILTERED.get() > 0); 145 assertEquals(count, FILTERED.get() + UNFILTERED.get()); 146 } 147 148 /** 149 * Simple filter that will filter out any entry wholse writeTime is <= 5. 150 */ 151 public static class IfTimeIsGreaterThanBOUNDARYWALEntrySinkFilterImpl 152 implements WALEntrySinkFilter { 153 public IfTimeIsGreaterThanBOUNDARYWALEntrySinkFilterImpl() { 154 } 155 156 @Override 157 public void init(AsyncConnection conn) { 158 // Do nothing. 159 } 160 161 @Override 162 public boolean filter(TableName table, long writeTime) { 163 boolean b = writeTime <= BOUNDARY; 164 if (b) { 165 FILTERED.incrementAndGet(); 166 } 167 return b; 168 } 169 } 170 171 public static class DevNullConnectionRegistry extends DoNothingConnectionRegistry { 172 173 public DevNullConnectionRegistry(Configuration conf, User user) { 174 super(conf, user); 175 } 176 177 @Override 178 public CompletableFuture<String> getClusterId() { 179 return CompletableFuture.completedFuture("test"); 180 } 181 } 182 183 public static class DevNullAsyncClusterConnection extends DummyAsyncClusterConnection { 184 185 private final Configuration conf; 186 187 public DevNullAsyncClusterConnection(Configuration conf, Object registry, String clusterId, 188 SocketAddress localAddress, User user) { 189 this.conf = conf; 190 } 191 192 @Override 193 public AsyncTable<AdvancedScanResultConsumer> getTable(TableName tableName) { 194 return new DummyAsyncTable<AdvancedScanResultConsumer>() { 195 196 @Override 197 public <T> CompletableFuture<List<T>> batchAll(List<? extends Row> actions) { 198 List<T> list = new ArrayList<>(actions.size()); 199 for (Row action : actions) { 200 // Row is the index of the loop above where we make WALEntry and Cells. 201 int row = Bytes.toInt(action.getRow()); 202 assertTrue("" + row, row > BOUNDARY); 203 UNFILTERED.incrementAndGet(); 204 list.add(null); 205 } 206 return CompletableFuture.completedFuture(list); 207 } 208 }; 209 } 210 211 @Override 212 public Configuration getConfiguration() { 213 return conf; 214 } 215 } 216}