001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.junit.Assert.assertArrayEquals;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.fail;
023
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.List;
027import java.util.Optional;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.hbase.ArrayBackedTag;
030import org.apache.hadoop.hbase.Cell;
031import org.apache.hadoop.hbase.CellUtil;
032import org.apache.hadoop.hbase.ExtendedCell;
033import org.apache.hadoop.hbase.HBaseClassTestRule;
034import org.apache.hadoop.hbase.HBaseConfiguration;
035import org.apache.hadoop.hbase.HBaseTestingUtil;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.KeyValue;
038import org.apache.hadoop.hbase.KeyValueUtil;
039import org.apache.hadoop.hbase.PrivateCellUtil;
040import org.apache.hadoop.hbase.TableName;
041import org.apache.hadoop.hbase.Tag;
042import org.apache.hadoop.hbase.client.Admin;
043import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
044import org.apache.hadoop.hbase.client.Connection;
045import org.apache.hadoop.hbase.client.ConnectionFactory;
046import org.apache.hadoop.hbase.client.Durability;
047import org.apache.hadoop.hbase.client.Get;
048import org.apache.hadoop.hbase.client.Put;
049import org.apache.hadoop.hbase.client.Result;
050import org.apache.hadoop.hbase.client.Table;
051import org.apache.hadoop.hbase.client.TableDescriptor;
052import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
053import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags;
054import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
055import org.apache.hadoop.hbase.coprocessor.ObserverContext;
056import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
057import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
058import org.apache.hadoop.hbase.coprocessor.RegionObserver;
059import org.apache.hadoop.hbase.testclassification.MediumTests;
060import org.apache.hadoop.hbase.testclassification.ReplicationTests;
061import org.apache.hadoop.hbase.util.Bytes;
062import org.apache.hadoop.hbase.wal.WALEdit;
063import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
064import org.junit.AfterClass;
065import org.junit.BeforeClass;
066import org.junit.ClassRule;
067import org.junit.Test;
068import org.junit.experimental.categories.Category;
069import org.slf4j.Logger;
070import org.slf4j.LoggerFactory;
071
072import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
073
074@Category({ ReplicationTests.class, MediumTests.class })
075public class TestReplicationWithTags {
076
077  @ClassRule
078  public static final HBaseClassTestRule CLASS_RULE =
079    HBaseClassTestRule.forClass(TestReplicationWithTags.class);
080
081  private static final Logger LOG = LoggerFactory.getLogger(TestReplicationWithTags.class);
082  private static final byte TAG_TYPE = 1;
083
084  private static Configuration conf1 = HBaseConfiguration.create();
085  private static Configuration conf2;
086
087  private static Admin replicationAdmin;
088
089  private static Connection connection1;
090
091  private static Table htable1;
092  private static Table htable2;
093
094  private static HBaseTestingUtil utility1;
095  private static HBaseTestingUtil utility2;
096  private static final long SLEEP_TIME = 500;
097  private static final int NB_RETRIES = 10;
098
099  private static final TableName TABLE_NAME = TableName.valueOf("TestReplicationWithTags");
100  private static final byte[] FAMILY = Bytes.toBytes("f");
101  private static final byte[] ROW = Bytes.toBytes("row");
102
103  @BeforeClass
104  public static void setUpBeforeClass() throws Exception {
105    conf1.setInt("hfile.format.version", 3);
106    conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
107    conf1.setInt("replication.source.size.capacity", 10240);
108    conf1.setLong("replication.source.sleepforretries", 100);
109    conf1.setInt("hbase.regionserver.maxlogs", 10);
110    conf1.setLong("hbase.master.logcleaner.ttl", 10);
111    conf1.setInt("zookeeper.recovery.retry", 1);
112    conf1.setInt("zookeeper.recovery.retry.intervalmill", 10);
113    conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
114    conf1.setInt("replication.stats.thread.period.seconds", 5);
115    conf1.setBoolean("hbase.tests.use.shortcircuit.reads", false);
116    conf1.setStrings(HConstants.REPLICATION_CODEC_CONF_KEY, KeyValueCodecWithTags.class.getName());
117    conf1.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
118      TestCoprocessorForTagsAtSource.class.getName());
119
120    utility1 = new HBaseTestingUtil(conf1);
121    utility1.startMiniZKCluster();
122    MiniZooKeeperCluster miniZK = utility1.getZkCluster();
123    // Have to reget conf1 in case zk cluster location different
124    // than default
125    conf1 = utility1.getConfiguration();
126    LOG.info("Setup first Zk");
127
128    // Base conf2 on conf1 so it gets the right zk cluster.
129    conf2 = HBaseConfiguration.create(conf1);
130    conf2.setInt("hfile.format.version", 3);
131    conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
132    conf2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6);
133    conf2.setBoolean("hbase.tests.use.shortcircuit.reads", false);
134    conf2.setStrings(HConstants.REPLICATION_CODEC_CONF_KEY, KeyValueCodecWithTags.class.getName());
135    conf2.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
136      TestCoprocessorForTagsAtSink.class.getName());
137
138    utility2 = new HBaseTestingUtil(conf2);
139    utility2.setZkCluster(miniZK);
140
141    LOG.info("Setup second Zk");
142    utility1.startMiniCluster(2);
143    utility2.startMiniCluster(2);
144
145    connection1 = ConnectionFactory.createConnection(conf1);
146    replicationAdmin = connection1.getAdmin();
147    ReplicationPeerConfig rpc =
148      ReplicationPeerConfig.newBuilder().setClusterKey(utility2.getRpcConnnectionURI()).build();
149    replicationAdmin.addReplicationPeer("2", rpc);
150
151    TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(TABLE_NAME)
152      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setMaxVersions(3)
153        .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
154      .build();
155    try (Connection conn = ConnectionFactory.createConnection(conf1);
156      Admin admin = conn.getAdmin()) {
157      admin.createTable(tableDescriptor, HBaseTestingUtil.KEYS_FOR_HBA_CREATE_TABLE);
158    }
159    try (Connection conn = ConnectionFactory.createConnection(conf2);
160      Admin admin = conn.getAdmin()) {
161      admin.createTable(tableDescriptor, HBaseTestingUtil.KEYS_FOR_HBA_CREATE_TABLE);
162    }
163    htable1 = utility1.getConnection().getTable(TABLE_NAME);
164    htable2 = utility2.getConnection().getTable(TABLE_NAME);
165  }
166
167  @AfterClass
168  public static void tearDownAfterClass() throws Exception {
169    Closeables.close(replicationAdmin, true);
170    Closeables.close(connection1, true);
171    utility2.shutdownMiniCluster();
172    utility1.shutdownMiniCluster();
173  }
174
175  @Test
176  public void testReplicationWithCellTags() throws Exception {
177    LOG.info("testSimplePutDelete");
178    Put put = new Put(ROW);
179    put.setAttribute("visibility", Bytes.toBytes("myTag3"));
180    put.addColumn(FAMILY, ROW, ROW);
181
182    htable1 = utility1.getConnection().getTable(TABLE_NAME);
183    htable1.put(put);
184
185    Get get = new Get(ROW);
186    try {
187      for (int i = 0; i < NB_RETRIES; i++) {
188        if (i == NB_RETRIES - 1) {
189          fail("Waited too much time for put replication");
190        }
191        Result res = htable2.get(get);
192        if (res.isEmpty()) {
193          LOG.info("Row not available");
194          Thread.sleep(SLEEP_TIME);
195        } else {
196          assertArrayEquals(ROW, res.value());
197          assertEquals(1, TestCoprocessorForTagsAtSink.TAGS.size());
198          Tag tag = TestCoprocessorForTagsAtSink.TAGS.get(0);
199          assertEquals(TAG_TYPE, tag.getType());
200          break;
201        }
202      }
203    } finally {
204      TestCoprocessorForTagsAtSink.TAGS = null;
205    }
206  }
207
208  public static class TestCoprocessorForTagsAtSource implements RegionCoprocessor, RegionObserver {
209    @Override
210    public Optional<RegionObserver> getRegionObserver() {
211      return Optional.of(this);
212    }
213
214    @Override
215    public void prePut(final ObserverContext<? extends RegionCoprocessorEnvironment> e,
216      final Put put, final WALEdit edit, final Durability durability) throws IOException {
217      byte[] attribute = put.getAttribute("visibility");
218      byte[] cf = null;
219      List<Cell> updatedCells = new ArrayList<>();
220      if (attribute != null) {
221        for (List<? extends Cell> edits : put.getFamilyCellMap().values()) {
222          for (Cell cell : edits) {
223            KeyValue kv = KeyValueUtil.ensureKeyValue((ExtendedCell) cell);
224            if (cf == null) {
225              cf = CellUtil.cloneFamily(kv);
226            }
227            Tag tag = new ArrayBackedTag(TAG_TYPE, attribute);
228            List<Tag> tagList = new ArrayList<>(1);
229            tagList.add(tag);
230
231            KeyValue newKV =
232              new KeyValue(CellUtil.cloneRow(kv), 0, kv.getRowLength(), CellUtil.cloneFamily(kv), 0,
233                kv.getFamilyLength(), CellUtil.cloneQualifier(kv), 0, kv.getQualifierLength(),
234                kv.getTimestamp(), KeyValue.Type.codeToType(kv.getTypeByte()),
235                CellUtil.cloneValue(kv), 0, kv.getValueLength(), tagList);
236            ((List<Cell>) updatedCells).add(newKV);
237          }
238        }
239        put.getFamilyCellMap().remove(cf);
240        // Update the family map
241        put.getFamilyCellMap().put(cf, updatedCells);
242      }
243    }
244  }
245
246  public static class TestCoprocessorForTagsAtSink implements RegionCoprocessor, RegionObserver {
247    private static List<Tag> TAGS = null;
248
249    @Override
250    public Optional<RegionObserver> getRegionObserver() {
251      return Optional.of(this);
252    }
253
254    @Override
255    public void postGetOp(ObserverContext<? extends RegionCoprocessorEnvironment> e, Get get,
256      List<Cell> results) throws IOException {
257      if (results.size() > 0) {
258        // Check tag presence in the 1st cell in 1st Result
259        if (!results.isEmpty()) {
260          Cell cell = results.get(0);
261          TAGS = PrivateCellUtil.getTags((ExtendedCell) cell);
262        }
263      }
264    }
265  }
266}