001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.junit.Assert.assertArrayEquals; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.fail; 023 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.List; 027import java.util.Optional; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.hbase.ArrayBackedTag; 030import org.apache.hadoop.hbase.Cell; 031import org.apache.hadoop.hbase.CellUtil; 032import org.apache.hadoop.hbase.ExtendedCell; 033import org.apache.hadoop.hbase.HBaseClassTestRule; 034import org.apache.hadoop.hbase.HBaseConfiguration; 035import org.apache.hadoop.hbase.HBaseTestingUtil; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.KeyValue; 038import org.apache.hadoop.hbase.KeyValueUtil; 039import org.apache.hadoop.hbase.PrivateCellUtil; 040import org.apache.hadoop.hbase.TableName; 041import org.apache.hadoop.hbase.Tag; 042import org.apache.hadoop.hbase.client.Admin; 043import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 044import org.apache.hadoop.hbase.client.Connection; 045import org.apache.hadoop.hbase.client.ConnectionFactory; 046import org.apache.hadoop.hbase.client.Durability; 047import org.apache.hadoop.hbase.client.Get; 048import org.apache.hadoop.hbase.client.Put; 049import org.apache.hadoop.hbase.client.Result; 050import org.apache.hadoop.hbase.client.Table; 051import org.apache.hadoop.hbase.client.TableDescriptor; 052import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 053import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags; 054import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 055import org.apache.hadoop.hbase.coprocessor.ObserverContext; 056import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 057import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 058import org.apache.hadoop.hbase.coprocessor.RegionObserver; 059import org.apache.hadoop.hbase.testclassification.MediumTests; 060import org.apache.hadoop.hbase.testclassification.ReplicationTests; 061import org.apache.hadoop.hbase.util.Bytes; 062import org.apache.hadoop.hbase.wal.WALEdit; 063import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; 064import org.junit.AfterClass; 065import org.junit.BeforeClass; 066import org.junit.ClassRule; 067import org.junit.Test; 068import org.junit.experimental.categories.Category; 069import org.slf4j.Logger; 070import org.slf4j.LoggerFactory; 071 072import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 073 074@Category({ ReplicationTests.class, MediumTests.class }) 075public class TestReplicationWithTags { 076 077 @ClassRule 078 public static final HBaseClassTestRule CLASS_RULE = 079 HBaseClassTestRule.forClass(TestReplicationWithTags.class); 080 081 private static final Logger LOG = LoggerFactory.getLogger(TestReplicationWithTags.class); 082 private static final byte TAG_TYPE = 1; 083 084 private static Configuration conf1 = HBaseConfiguration.create(); 085 private static Configuration conf2; 086 087 private static Admin replicationAdmin; 088 089 private static Connection connection1; 090 091 private static Table htable1; 092 private static Table htable2; 093 094 private static HBaseTestingUtil utility1; 095 private static HBaseTestingUtil utility2; 096 private static final long SLEEP_TIME = 500; 097 private static final int NB_RETRIES = 10; 098 099 private static final TableName TABLE_NAME = TableName.valueOf("TestReplicationWithTags"); 100 private static final byte[] FAMILY = Bytes.toBytes("f"); 101 private static final byte[] ROW = Bytes.toBytes("row"); 102 103 @BeforeClass 104 public static void setUpBeforeClass() throws Exception { 105 conf1.setInt("hfile.format.version", 3); 106 conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1"); 107 conf1.setInt("replication.source.size.capacity", 10240); 108 conf1.setLong("replication.source.sleepforretries", 100); 109 conf1.setInt("hbase.regionserver.maxlogs", 10); 110 conf1.setLong("hbase.master.logcleaner.ttl", 10); 111 conf1.setInt("zookeeper.recovery.retry", 1); 112 conf1.setInt("zookeeper.recovery.retry.intervalmill", 10); 113 conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100); 114 conf1.setInt("replication.stats.thread.period.seconds", 5); 115 conf1.setBoolean("hbase.tests.use.shortcircuit.reads", false); 116 conf1.setStrings(HConstants.REPLICATION_CODEC_CONF_KEY, KeyValueCodecWithTags.class.getName()); 117 conf1.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY, 118 TestCoprocessorForTagsAtSource.class.getName()); 119 120 utility1 = new HBaseTestingUtil(conf1); 121 utility1.startMiniZKCluster(); 122 MiniZooKeeperCluster miniZK = utility1.getZkCluster(); 123 // Have to reget conf1 in case zk cluster location different 124 // than default 125 conf1 = utility1.getConfiguration(); 126 LOG.info("Setup first Zk"); 127 128 // Base conf2 on conf1 so it gets the right zk cluster. 129 conf2 = HBaseConfiguration.create(conf1); 130 conf2.setInt("hfile.format.version", 3); 131 conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2"); 132 conf2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6); 133 conf2.setBoolean("hbase.tests.use.shortcircuit.reads", false); 134 conf2.setStrings(HConstants.REPLICATION_CODEC_CONF_KEY, KeyValueCodecWithTags.class.getName()); 135 conf2.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY, 136 TestCoprocessorForTagsAtSink.class.getName()); 137 138 utility2 = new HBaseTestingUtil(conf2); 139 utility2.setZkCluster(miniZK); 140 141 LOG.info("Setup second Zk"); 142 utility1.startMiniCluster(2); 143 utility2.startMiniCluster(2); 144 145 connection1 = ConnectionFactory.createConnection(conf1); 146 replicationAdmin = connection1.getAdmin(); 147 ReplicationPeerConfig rpc = 148 ReplicationPeerConfig.newBuilder().setClusterKey(utility2.getRpcConnnectionURI()).build(); 149 replicationAdmin.addReplicationPeer("2", rpc); 150 151 TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(TABLE_NAME) 152 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setMaxVersions(3) 153 .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) 154 .build(); 155 try (Connection conn = ConnectionFactory.createConnection(conf1); 156 Admin admin = conn.getAdmin()) { 157 admin.createTable(tableDescriptor, HBaseTestingUtil.KEYS_FOR_HBA_CREATE_TABLE); 158 } 159 try (Connection conn = ConnectionFactory.createConnection(conf2); 160 Admin admin = conn.getAdmin()) { 161 admin.createTable(tableDescriptor, HBaseTestingUtil.KEYS_FOR_HBA_CREATE_TABLE); 162 } 163 htable1 = utility1.getConnection().getTable(TABLE_NAME); 164 htable2 = utility2.getConnection().getTable(TABLE_NAME); 165 } 166 167 @AfterClass 168 public static void tearDownAfterClass() throws Exception { 169 Closeables.close(replicationAdmin, true); 170 Closeables.close(connection1, true); 171 utility2.shutdownMiniCluster(); 172 utility1.shutdownMiniCluster(); 173 } 174 175 @Test 176 public void testReplicationWithCellTags() throws Exception { 177 LOG.info("testSimplePutDelete"); 178 Put put = new Put(ROW); 179 put.setAttribute("visibility", Bytes.toBytes("myTag3")); 180 put.addColumn(FAMILY, ROW, ROW); 181 182 htable1 = utility1.getConnection().getTable(TABLE_NAME); 183 htable1.put(put); 184 185 Get get = new Get(ROW); 186 try { 187 for (int i = 0; i < NB_RETRIES; i++) { 188 if (i == NB_RETRIES - 1) { 189 fail("Waited too much time for put replication"); 190 } 191 Result res = htable2.get(get); 192 if (res.isEmpty()) { 193 LOG.info("Row not available"); 194 Thread.sleep(SLEEP_TIME); 195 } else { 196 assertArrayEquals(ROW, res.value()); 197 assertEquals(1, TestCoprocessorForTagsAtSink.TAGS.size()); 198 Tag tag = TestCoprocessorForTagsAtSink.TAGS.get(0); 199 assertEquals(TAG_TYPE, tag.getType()); 200 break; 201 } 202 } 203 } finally { 204 TestCoprocessorForTagsAtSink.TAGS = null; 205 } 206 } 207 208 public static class TestCoprocessorForTagsAtSource implements RegionCoprocessor, RegionObserver { 209 @Override 210 public Optional<RegionObserver> getRegionObserver() { 211 return Optional.of(this); 212 } 213 214 @Override 215 public void prePut(final ObserverContext<? extends RegionCoprocessorEnvironment> e, 216 final Put put, final WALEdit edit, final Durability durability) throws IOException { 217 byte[] attribute = put.getAttribute("visibility"); 218 byte[] cf = null; 219 List<Cell> updatedCells = new ArrayList<>(); 220 if (attribute != null) { 221 for (List<? extends Cell> edits : put.getFamilyCellMap().values()) { 222 for (Cell cell : edits) { 223 KeyValue kv = KeyValueUtil.ensureKeyValue((ExtendedCell) cell); 224 if (cf == null) { 225 cf = CellUtil.cloneFamily(kv); 226 } 227 Tag tag = new ArrayBackedTag(TAG_TYPE, attribute); 228 List<Tag> tagList = new ArrayList<>(1); 229 tagList.add(tag); 230 231 KeyValue newKV = 232 new KeyValue(CellUtil.cloneRow(kv), 0, kv.getRowLength(), CellUtil.cloneFamily(kv), 0, 233 kv.getFamilyLength(), CellUtil.cloneQualifier(kv), 0, kv.getQualifierLength(), 234 kv.getTimestamp(), KeyValue.Type.codeToType(kv.getTypeByte()), 235 CellUtil.cloneValue(kv), 0, kv.getValueLength(), tagList); 236 ((List<Cell>) updatedCells).add(newKV); 237 } 238 } 239 put.getFamilyCellMap().remove(cf); 240 // Update the family map 241 put.getFamilyCellMap().put(cf, updatedCells); 242 } 243 } 244 } 245 246 public static class TestCoprocessorForTagsAtSink implements RegionCoprocessor, RegionObserver { 247 private static List<Tag> TAGS = null; 248 249 @Override 250 public Optional<RegionObserver> getRegionObserver() { 251 return Optional.of(this); 252 } 253 254 @Override 255 public void postGetOp(ObserverContext<? extends RegionCoprocessorEnvironment> e, Get get, 256 List<Cell> results) throws IOException { 257 if (results.size() > 0) { 258 // Check tag presence in the 1st cell in 1st Result 259 if (!results.isEmpty()) { 260 Cell cell = results.get(0); 261 TAGS = PrivateCellUtil.getTags((ExtendedCell) cell); 262 } 263 } 264 } 265 } 266}