001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import java.io.IOException; 024import java.net.SocketAddress; 025import java.util.ArrayList; 026import java.util.List; 027import java.util.concurrent.CompletableFuture; 028import java.util.concurrent.atomic.AtomicInteger; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.hbase.Cell; 031import org.apache.hadoop.hbase.CellBuilderType; 032import org.apache.hadoop.hbase.ExtendedCell; 033import org.apache.hadoop.hbase.ExtendedCellBuilder; 034import org.apache.hadoop.hbase.ExtendedCellBuilderFactory; 035import org.apache.hadoop.hbase.ExtendedCellScanner; 036import org.apache.hadoop.hbase.HBaseClassTestRule; 037import org.apache.hadoop.hbase.HBaseConfiguration; 038import org.apache.hadoop.hbase.HConstants; 039import org.apache.hadoop.hbase.TableName; 040import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer; 041import org.apache.hadoop.hbase.client.AsyncClusterConnection; 042import org.apache.hadoop.hbase.client.AsyncConnection; 043import org.apache.hadoop.hbase.client.AsyncTable; 044import org.apache.hadoop.hbase.client.ClusterConnectionFactory; 045import org.apache.hadoop.hbase.client.ConnectionRegistry; 046import org.apache.hadoop.hbase.client.DoNothingConnectionRegistry; 047import org.apache.hadoop.hbase.client.DummyAsyncClusterConnection; 048import org.apache.hadoop.hbase.client.DummyAsyncTable; 049import org.apache.hadoop.hbase.client.Row; 050import org.apache.hadoop.hbase.security.User; 051import org.apache.hadoop.hbase.testclassification.ReplicationTests; 052import org.apache.hadoop.hbase.testclassification.SmallTests; 053import org.apache.hadoop.hbase.util.Bytes; 054import org.junit.ClassRule; 055import org.junit.Rule; 056import org.junit.Test; 057import org.junit.experimental.categories.Category; 058import org.junit.rules.TestName; 059 060import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 061 062import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; 063 064/** 065 * Simple test of sink-side wal entry filter facility. 066 */ 067@Category({ ReplicationTests.class, SmallTests.class }) 068public class TestWALEntrySinkFilter { 069 070 @ClassRule 071 public static final HBaseClassTestRule CLASS_RULE = 072 HBaseClassTestRule.forClass(TestWALEntrySinkFilter.class); 073 074 @Rule 075 public TestName name = new TestName(); 076 static final int BOUNDARY = 5; 077 static final AtomicInteger UNFILTERED = new AtomicInteger(); 078 static final AtomicInteger FILTERED = new AtomicInteger(); 079 080 /** 081 * Test filter. Filter will filter out any write time that is <= 5 (BOUNDARY). We count how many 082 * items we filter out and we count how many cells make it through for distribution way down below 083 * in the Table#batch implementation. Puts in place a custom DevNullConnection so we can insert 084 * our counting Table. 085 */ 086 @Test 087 public void testWALEntryFilter() throws IOException { 088 Configuration conf = HBaseConfiguration.create(); 089 // Make it so our filter is instantiated on construction of ReplicationSink. 090 conf.setClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, 091 DevNullConnectionRegistry.class, ConnectionRegistry.class); 092 conf.setClass(WALEntrySinkFilter.WAL_ENTRY_FILTER_KEY, 093 IfTimeIsGreaterThanBOUNDARYWALEntrySinkFilterImpl.class, WALEntrySinkFilter.class); 094 conf.setClass(ClusterConnectionFactory.HBASE_SERVER_CLUSTER_CONNECTION_IMPL, 095 DevNullAsyncClusterConnection.class, AsyncClusterConnection.class); 096 ReplicationSink sink = new ReplicationSink(conf, null); 097 // Create some dumb walentries. 098 List<AdminProtos.WALEntry> entries = new ArrayList<>(); 099 AdminProtos.WALEntry.Builder entryBuilder = AdminProtos.WALEntry.newBuilder(); 100 // Need a tablename. 101 ByteString tableName = 102 ByteString.copyFromUtf8(TableName.valueOf(this.name.getMethodName()).toString()); 103 // Add WALEdit Cells to Cells List. The way edits arrive at the sink is with protos 104 // describing the edit with all Cells from all edits aggregated in a single CellScanner. 105 final List<ExtendedCell> cells = new ArrayList<>(); 106 int count = BOUNDARY * 2; 107 for (int i = 0; i < count; i++) { 108 byte[] bytes = Bytes.toBytes(i); 109 // Create a wal entry. Everything is set to the current index as bytes or int/long. 110 entryBuilder.clear(); 111 entryBuilder.setKey(entryBuilder.getKeyBuilder().setLogSequenceNumber(i) 112 .setEncodedRegionName(ByteString.copyFrom(bytes)).setWriteTime(i).setTableName(tableName) 113 .build()); 114 // Lets have one Cell associated with each WALEdit. 115 entryBuilder.setAssociatedCellCount(1); 116 entries.add(entryBuilder.build()); 117 // We need to add a Cell per WALEdit to the cells array. 118 ExtendedCellBuilder cellBuilder = 119 ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY); 120 // Make cells whose row, family, cell, value, and ts are == 'i'. 121 ExtendedCell cell = cellBuilder.setRow(bytes).setFamily(bytes).setQualifier(bytes) 122 .setType(Cell.Type.Put).setTimestamp(i).setValue(bytes).build(); 123 cells.add(cell); 124 } 125 // Now wrap our cells array in a CellScanner that we can pass in to replicateEntries. It has 126 // all Cells from all the WALEntries made above. 127 ExtendedCellScanner cellScanner = new ExtendedCellScanner() { 128 // Set to -1 because advance gets called before current. 129 int index = -1; 130 131 @Override 132 public ExtendedCell current() { 133 return cells.get(index); 134 } 135 136 @Override 137 public boolean advance() throws IOException { 138 index++; 139 return index < cells.size(); 140 } 141 }; 142 // Call our sink. 143 sink.replicateEntries(entries, cellScanner, null, null, null); 144 // Check what made it through and what was filtered. 145 assertTrue(FILTERED.get() > 0); 146 assertTrue(UNFILTERED.get() > 0); 147 assertEquals(count, FILTERED.get() + UNFILTERED.get()); 148 } 149 150 /** 151 * Simple filter that will filter out any entry wholse writeTime is <= 5. 152 */ 153 public static class IfTimeIsGreaterThanBOUNDARYWALEntrySinkFilterImpl 154 implements WALEntrySinkFilter { 155 public IfTimeIsGreaterThanBOUNDARYWALEntrySinkFilterImpl() { 156 } 157 158 @Override 159 public void init(AsyncConnection conn) { 160 // Do nothing. 161 } 162 163 @Override 164 public boolean filter(TableName table, long writeTime) { 165 boolean b = writeTime <= BOUNDARY; 166 if (b) { 167 FILTERED.incrementAndGet(); 168 } 169 return b; 170 } 171 } 172 173 public static class DevNullConnectionRegistry extends DoNothingConnectionRegistry { 174 175 public DevNullConnectionRegistry(Configuration conf, User user) { 176 super(conf, user); 177 } 178 179 @Override 180 public CompletableFuture<String> getClusterId() { 181 return CompletableFuture.completedFuture("test"); 182 } 183 } 184 185 public static class DevNullAsyncClusterConnection extends DummyAsyncClusterConnection { 186 187 private final Configuration conf; 188 189 public DevNullAsyncClusterConnection(Configuration conf, Object registry, String clusterId, 190 SocketAddress localAddress, User user) { 191 this.conf = conf; 192 } 193 194 @Override 195 public AsyncTable<AdvancedScanResultConsumer> getTable(TableName tableName) { 196 return new DummyAsyncTable<AdvancedScanResultConsumer>() { 197 198 @Override 199 public <T> CompletableFuture<List<T>> batchAll(List<? extends Row> actions) { 200 List<T> list = new ArrayList<>(actions.size()); 201 for (Row action : actions) { 202 // Row is the index of the loop above where we make WALEntry and Cells. 203 int row = Bytes.toInt(action.getRow()); 204 assertTrue("" + row, row > BOUNDARY); 205 UNFILTERED.incrementAndGet(); 206 list.add(null); 207 } 208 return CompletableFuture.completedFuture(list); 209 } 210 }; 211 } 212 213 @Override 214 public Configuration getConfiguration() { 215 return conf; 216 } 217 } 218}