001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.junit.jupiter.api.Assertions.assertArrayEquals; 021import static org.junit.jupiter.api.Assertions.fail; 022 023import java.io.IOException; 024import java.util.Collections; 025import java.util.List; 026import java.util.Optional; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.hbase.HBaseConfiguration; 029import org.apache.hadoop.hbase.HBaseTestingUtil; 030import org.apache.hadoop.hbase.HConstants; 031import org.apache.hadoop.hbase.TableName; 032import org.apache.hadoop.hbase.client.Admin; 033import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 034import org.apache.hadoop.hbase.client.Connection; 035import org.apache.hadoop.hbase.client.ConnectionFactory; 036import org.apache.hadoop.hbase.client.Get; 037import org.apache.hadoop.hbase.client.Mutation; 038import org.apache.hadoop.hbase.client.Put; 039import org.apache.hadoop.hbase.client.Result; 040import org.apache.hadoop.hbase.client.Table; 041import org.apache.hadoop.hbase.client.TableDescriptor; 042import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 043import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags; 044import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 045import org.apache.hadoop.hbase.coprocessor.ObserverContext; 046import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 047import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 048import org.apache.hadoop.hbase.coprocessor.RegionObserver; 049import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessor; 050import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment; 051import org.apache.hadoop.hbase.coprocessor.RegionServerObserver; 052import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; 053import org.apache.hadoop.hbase.testclassification.MediumTests; 054import org.apache.hadoop.hbase.testclassification.ReplicationTests; 055import org.apache.hadoop.hbase.util.Bytes; 056import org.apache.hadoop.hbase.wal.WALEdit; 057import org.apache.hadoop.hbase.wal.WALKey; 058import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; 059import org.junit.jupiter.api.AfterAll; 060import org.junit.jupiter.api.BeforeAll; 061import org.junit.jupiter.api.Tag; 062import org.junit.jupiter.api.Test; 063import org.slf4j.Logger; 064import org.slf4j.LoggerFactory; 065 066import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 067 068import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; 069import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; 070 071@Tag(ReplicationTests.TAG) 072@Tag(MediumTests.TAG) 073public class TestReplicationWithWALExtendedAttributes { 074 075 private static final Logger LOG = 076 LoggerFactory.getLogger(TestReplicationWithWALExtendedAttributes.class); 077 078 private static Configuration conf1 = HBaseConfiguration.create(); 079 080 private static Admin replicationAdmin; 081 082 private static Connection connection1; 083 084 private static Table htable1; 085 private static Table htable2; 086 087 private static HBaseTestingUtil utility1; 088 private static HBaseTestingUtil utility2; 089 private static final long SLEEP_TIME = 500; 090 private static final int NB_RETRIES = 10; 091 092 private static final TableName TABLE_NAME = TableName.valueOf("TestReplicationWithWALAnnotation"); 093 private static final byte[] FAMILY = Bytes.toBytes("f"); 094 private static final byte[] ROW = Bytes.toBytes("row"); 095 private static final byte[] ROW2 = Bytes.toBytes("row2"); 096 097 @BeforeAll 098 public static void setUpBeforeClass() throws Exception { 099 conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1"); 100 conf1.setInt("replication.source.size.capacity", 10240); 101 conf1.setLong("replication.source.sleepforretries", 100); 102 conf1.setInt("hbase.regionserver.maxlogs", 10); 103 conf1.setLong("hbase.master.logcleaner.ttl", 10); 104 conf1.setInt("zookeeper.recovery.retry", 1); 105 conf1.setInt("zookeeper.recovery.retry.intervalmill", 10); 106 conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100); 107 conf1.setInt("replication.stats.thread.period.seconds", 5); 108 conf1.setBoolean("hbase.tests.use.shortcircuit.reads", false); 109 conf1.setStrings(HConstants.REPLICATION_CODEC_CONF_KEY, KeyValueCodecWithTags.class.getName()); 110 conf1.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY, 111 TestCoprocessorForWALAnnotationAtSource.class.getName()); 112 113 utility1 = new HBaseTestingUtil(conf1); 114 utility1.startMiniZKCluster(); 115 MiniZooKeeperCluster miniZK = utility1.getZkCluster(); 116 // Have to reget conf1 in case zk cluster location different 117 // than default 118 conf1 = utility1.getConfiguration(); 119 LOG.info("Setup first Zk"); 120 121 // Base conf2 on conf1 so it gets the right zk cluster. 122 Configuration conf2 = HBaseConfiguration.create(conf1); 123 conf2.setInt("hfile.format.version", 3); 124 conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2"); 125 conf2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6); 126 conf2.setBoolean("hbase.tests.use.shortcircuit.reads", false); 127 conf2.setStrings(HConstants.REPLICATION_CODEC_CONF_KEY, KeyValueCodecWithTags.class.getName()); 128 conf2.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY, 129 TestCoprocessorForWALAnnotationAtSink.class.getName()); 130 conf2.setStrings(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY, 131 TestReplicationSinkRegionServerEndpoint.class.getName()); 132 133 utility2 = new HBaseTestingUtil(conf2); 134 utility2.setZkCluster(miniZK); 135 136 LOG.info("Setup second Zk"); 137 utility1.startMiniCluster(2); 138 utility2.startMiniCluster(2); 139 140 connection1 = ConnectionFactory.createConnection(conf1); 141 replicationAdmin = connection1.getAdmin(); 142 ReplicationPeerConfig rpc = 143 ReplicationPeerConfig.newBuilder().setClusterKey(utility2.getRpcConnnectionURI()).build(); 144 replicationAdmin.addReplicationPeer("2", rpc); 145 146 TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(TABLE_NAME) 147 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setMaxVersions(3) 148 .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) 149 .build(); 150 try (Connection conn = ConnectionFactory.createConnection(conf1); 151 Admin admin = conn.getAdmin()) { 152 admin.createTable(tableDescriptor, HBaseTestingUtil.KEYS_FOR_HBA_CREATE_TABLE); 153 } 154 try (Connection conn = ConnectionFactory.createConnection(conf2); 155 Admin admin = conn.getAdmin()) { 156 admin.createTable(tableDescriptor, HBaseTestingUtil.KEYS_FOR_HBA_CREATE_TABLE); 157 } 158 htable1 = utility1.getConnection().getTable(TABLE_NAME); 159 htable2 = utility2.getConnection().getTable(TABLE_NAME); 160 } 161 162 @AfterAll 163 public static void tearDownAfterClass() throws Exception { 164 Closeables.close(replicationAdmin, true); 165 Closeables.close(connection1, true); 166 utility2.shutdownMiniCluster(); 167 utility1.shutdownMiniCluster(); 168 } 169 170 @Test 171 public void testReplicationWithWALExtendedAttributes() throws Exception { 172 Put put = new Put(ROW); 173 put.addColumn(FAMILY, ROW, ROW); 174 175 htable1 = utility1.getConnection().getTable(TABLE_NAME); 176 htable1.put(put); 177 178 Put put2 = new Put(ROW2); 179 put2.addColumn(FAMILY, ROW2, ROW2); 180 181 htable1.batch(Collections.singletonList(put2), new Object[1]); 182 183 assertGetValues(new Get(ROW), ROW); 184 assertGetValues(new Get(ROW2), ROW2); 185 } 186 187 private static void assertGetValues(Get get, byte[] value) 188 throws IOException, InterruptedException { 189 for (int i = 0; i < NB_RETRIES; i++) { 190 if (i == NB_RETRIES - 1) { 191 fail("Waited too much time for put replication"); 192 } 193 Result res = htable2.get(get); 194 if (res.isEmpty()) { 195 LOG.info("Row not available"); 196 Thread.sleep(SLEEP_TIME); 197 } else { 198 assertArrayEquals(value, res.value()); 199 break; 200 } 201 } 202 } 203 204 public static class TestCoprocessorForWALAnnotationAtSource 205 implements RegionCoprocessor, RegionObserver { 206 207 @Override 208 public Optional<RegionObserver> getRegionObserver() { 209 return Optional.of(this); 210 } 211 212 @Override 213 public void preWALAppend(ObserverContext<? extends RegionCoprocessorEnvironment> ctx, 214 WALKey key, WALEdit edit) throws IOException { 215 key.addExtendedAttribute("extendedAttr1", Bytes.toBytes("Value of Extended attribute 01")); 216 key.addExtendedAttribute("extendedAttr2", Bytes.toBytes("Value of Extended attribute 02")); 217 } 218 } 219 220 public static class TestCoprocessorForWALAnnotationAtSink 221 implements RegionCoprocessor, RegionObserver { 222 223 @Override 224 public Optional<RegionObserver> getRegionObserver() { 225 return Optional.of(this); 226 } 227 228 @Override 229 public void prePut(ObserverContext<? extends RegionCoprocessorEnvironment> c, Put put, 230 WALEdit edit) throws IOException { 231 String attrVal1 = Bytes.toString(put.getAttribute("extendedAttr1")); 232 String attrVal2 = Bytes.toString(put.getAttribute("extendedAttr2")); 233 if (attrVal1 == null || attrVal2 == null) { 234 throw new IOException("Failed to retrieve WAL annotations"); 235 } 236 if ( 237 attrVal1.equals("Value of Extended attribute 01") 238 && attrVal2.equals("Value of Extended attribute 02") 239 ) { 240 return; 241 } 242 throw new IOException("Failed to retrieve WAL annotations.."); 243 } 244 245 @Override 246 public void preBatchMutate(ObserverContext<? extends RegionCoprocessorEnvironment> c, 247 MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { 248 String attrVal1 = Bytes.toString(miniBatchOp.getOperation(0).getAttribute("extendedAttr1")); 249 String attrVal2 = Bytes.toString(miniBatchOp.getOperation(0).getAttribute("extendedAttr2")); 250 if (attrVal1 == null || attrVal2 == null) { 251 throw new IOException("Failed to retrieve WAL annotations"); 252 } 253 if ( 254 attrVal1.equals("Value of Extended attribute 01") 255 && attrVal2.equals("Value of Extended attribute 02") 256 ) { 257 return; 258 } 259 throw new IOException("Failed to retrieve WAL annotations.."); 260 } 261 } 262 263 public static final class TestReplicationSinkRegionServerEndpoint 264 implements RegionServerCoprocessor, RegionServerObserver { 265 266 @Override 267 public Optional<RegionServerObserver> getRegionServerObserver() { 268 return Optional.of(this); 269 } 270 271 @Override 272 public void preReplicationSinkBatchMutate( 273 ObserverContext<RegionServerCoprocessorEnvironment> ctx, AdminProtos.WALEntry walEntry, 274 Mutation mutation) throws IOException { 275 RegionServerObserver.super.preReplicationSinkBatchMutate(ctx, walEntry, mutation); 276 List<WALProtos.Attribute> attributeList = walEntry.getKey().getExtendedAttributesList(); 277 attachWALExtendedAttributesToMutation(mutation, attributeList); 278 } 279 280 @Override 281 public void postReplicationSinkBatchMutate( 282 ObserverContext<RegionServerCoprocessorEnvironment> ctx, AdminProtos.WALEntry walEntry, 283 Mutation mutation) throws IOException { 284 RegionServerObserver.super.postReplicationSinkBatchMutate(ctx, walEntry, mutation); 285 LOG.info("WALEntry extended attributes: {}", walEntry.getKey().getExtendedAttributesList()); 286 LOG.info("Mutation attributes: {}", mutation.getAttributesMap()); 287 } 288 289 private void attachWALExtendedAttributesToMutation(Mutation mutation, 290 List<WALProtos.Attribute> attributeList) { 291 if (attributeList != null) { 292 for (WALProtos.Attribute attribute : attributeList) { 293 mutation.setAttribute(attribute.getKey(), attribute.getValue().toByteArray()); 294 } 295 } 296 } 297 } 298 299}