001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.junit.jupiter.api.Assertions.assertArrayEquals;
021import static org.junit.jupiter.api.Assertions.fail;
022
023import java.io.IOException;
024import java.util.Collections;
025import java.util.List;
026import java.util.Optional;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.hbase.HBaseConfiguration;
029import org.apache.hadoop.hbase.HBaseTestingUtil;
030import org.apache.hadoop.hbase.HConstants;
031import org.apache.hadoop.hbase.TableName;
032import org.apache.hadoop.hbase.client.Admin;
033import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
034import org.apache.hadoop.hbase.client.Connection;
035import org.apache.hadoop.hbase.client.ConnectionFactory;
036import org.apache.hadoop.hbase.client.Get;
037import org.apache.hadoop.hbase.client.Mutation;
038import org.apache.hadoop.hbase.client.Put;
039import org.apache.hadoop.hbase.client.Result;
040import org.apache.hadoop.hbase.client.Table;
041import org.apache.hadoop.hbase.client.TableDescriptor;
042import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
043import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags;
044import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
045import org.apache.hadoop.hbase.coprocessor.ObserverContext;
046import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
047import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
048import org.apache.hadoop.hbase.coprocessor.RegionObserver;
049import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessor;
050import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
051import org.apache.hadoop.hbase.coprocessor.RegionServerObserver;
052import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
053import org.apache.hadoop.hbase.testclassification.MediumTests;
054import org.apache.hadoop.hbase.testclassification.ReplicationTests;
055import org.apache.hadoop.hbase.util.Bytes;
056import org.apache.hadoop.hbase.wal.WALEdit;
057import org.apache.hadoop.hbase.wal.WALKey;
058import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
059import org.junit.jupiter.api.AfterAll;
060import org.junit.jupiter.api.BeforeAll;
061import org.junit.jupiter.api.Tag;
062import org.junit.jupiter.api.Test;
063import org.slf4j.Logger;
064import org.slf4j.LoggerFactory;
065
066import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
067
068import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
069import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
070
071@Tag(ReplicationTests.TAG)
072@Tag(MediumTests.TAG)
073public class TestReplicationWithWALExtendedAttributes {
074
075  private static final Logger LOG =
076    LoggerFactory.getLogger(TestReplicationWithWALExtendedAttributes.class);
077
078  private static Configuration conf1 = HBaseConfiguration.create();
079
080  private static Admin replicationAdmin;
081
082  private static Connection connection1;
083
084  private static Table htable1;
085  private static Table htable2;
086
087  private static HBaseTestingUtil utility1;
088  private static HBaseTestingUtil utility2;
089  private static final long SLEEP_TIME = 500;
090  private static final int NB_RETRIES = 10;
091
092  private static final TableName TABLE_NAME = TableName.valueOf("TestReplicationWithWALAnnotation");
093  private static final byte[] FAMILY = Bytes.toBytes("f");
094  private static final byte[] ROW = Bytes.toBytes("row");
095  private static final byte[] ROW2 = Bytes.toBytes("row2");
096
097  @BeforeAll
098  public static void setUpBeforeClass() throws Exception {
099    conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
100    conf1.setInt("replication.source.size.capacity", 10240);
101    conf1.setLong("replication.source.sleepforretries", 100);
102    conf1.setInt("hbase.regionserver.maxlogs", 10);
103    conf1.setLong("hbase.master.logcleaner.ttl", 10);
104    conf1.setInt("zookeeper.recovery.retry", 1);
105    conf1.setInt("zookeeper.recovery.retry.intervalmill", 10);
106    conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
107    conf1.setInt("replication.stats.thread.period.seconds", 5);
108    conf1.setBoolean("hbase.tests.use.shortcircuit.reads", false);
109    conf1.setStrings(HConstants.REPLICATION_CODEC_CONF_KEY, KeyValueCodecWithTags.class.getName());
110    conf1.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
111      TestCoprocessorForWALAnnotationAtSource.class.getName());
112
113    utility1 = new HBaseTestingUtil(conf1);
114    utility1.startMiniZKCluster();
115    MiniZooKeeperCluster miniZK = utility1.getZkCluster();
116    // Have to reget conf1 in case zk cluster location different
117    // than default
118    conf1 = utility1.getConfiguration();
119    LOG.info("Setup first Zk");
120
121    // Base conf2 on conf1 so it gets the right zk cluster.
122    Configuration conf2 = HBaseConfiguration.create(conf1);
123    conf2.setInt("hfile.format.version", 3);
124    conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
125    conf2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6);
126    conf2.setBoolean("hbase.tests.use.shortcircuit.reads", false);
127    conf2.setStrings(HConstants.REPLICATION_CODEC_CONF_KEY, KeyValueCodecWithTags.class.getName());
128    conf2.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
129      TestCoprocessorForWALAnnotationAtSink.class.getName());
130    conf2.setStrings(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY,
131      TestReplicationSinkRegionServerEndpoint.class.getName());
132
133    utility2 = new HBaseTestingUtil(conf2);
134    utility2.setZkCluster(miniZK);
135
136    LOG.info("Setup second Zk");
137    utility1.startMiniCluster(2);
138    utility2.startMiniCluster(2);
139
140    connection1 = ConnectionFactory.createConnection(conf1);
141    replicationAdmin = connection1.getAdmin();
142    ReplicationPeerConfig rpc =
143      ReplicationPeerConfig.newBuilder().setClusterKey(utility2.getRpcConnnectionURI()).build();
144    replicationAdmin.addReplicationPeer("2", rpc);
145
146    TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(TABLE_NAME)
147      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setMaxVersions(3)
148        .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
149      .build();
150    try (Connection conn = ConnectionFactory.createConnection(conf1);
151      Admin admin = conn.getAdmin()) {
152      admin.createTable(tableDescriptor, HBaseTestingUtil.KEYS_FOR_HBA_CREATE_TABLE);
153    }
154    try (Connection conn = ConnectionFactory.createConnection(conf2);
155      Admin admin = conn.getAdmin()) {
156      admin.createTable(tableDescriptor, HBaseTestingUtil.KEYS_FOR_HBA_CREATE_TABLE);
157    }
158    htable1 = utility1.getConnection().getTable(TABLE_NAME);
159    htable2 = utility2.getConnection().getTable(TABLE_NAME);
160  }
161
162  @AfterAll
163  public static void tearDownAfterClass() throws Exception {
164    Closeables.close(replicationAdmin, true);
165    Closeables.close(connection1, true);
166    utility2.shutdownMiniCluster();
167    utility1.shutdownMiniCluster();
168  }
169
170  @Test
171  public void testReplicationWithWALExtendedAttributes() throws Exception {
172    Put put = new Put(ROW);
173    put.addColumn(FAMILY, ROW, ROW);
174
175    htable1 = utility1.getConnection().getTable(TABLE_NAME);
176    htable1.put(put);
177
178    Put put2 = new Put(ROW2);
179    put2.addColumn(FAMILY, ROW2, ROW2);
180
181    htable1.batch(Collections.singletonList(put2), new Object[1]);
182
183    assertGetValues(new Get(ROW), ROW);
184    assertGetValues(new Get(ROW2), ROW2);
185  }
186
187  private static void assertGetValues(Get get, byte[] value)
188    throws IOException, InterruptedException {
189    for (int i = 0; i < NB_RETRIES; i++) {
190      if (i == NB_RETRIES - 1) {
191        fail("Waited too much time for put replication");
192      }
193      Result res = htable2.get(get);
194      if (res.isEmpty()) {
195        LOG.info("Row not available");
196        Thread.sleep(SLEEP_TIME);
197      } else {
198        assertArrayEquals(value, res.value());
199        break;
200      }
201    }
202  }
203
204  public static class TestCoprocessorForWALAnnotationAtSource
205    implements RegionCoprocessor, RegionObserver {
206
207    @Override
208    public Optional<RegionObserver> getRegionObserver() {
209      return Optional.of(this);
210    }
211
212    @Override
213    public void preWALAppend(ObserverContext<? extends RegionCoprocessorEnvironment> ctx,
214      WALKey key, WALEdit edit) throws IOException {
215      key.addExtendedAttribute("extendedAttr1", Bytes.toBytes("Value of Extended attribute 01"));
216      key.addExtendedAttribute("extendedAttr2", Bytes.toBytes("Value of Extended attribute 02"));
217    }
218  }
219
220  public static class TestCoprocessorForWALAnnotationAtSink
221    implements RegionCoprocessor, RegionObserver {
222
223    @Override
224    public Optional<RegionObserver> getRegionObserver() {
225      return Optional.of(this);
226    }
227
228    @Override
229    public void prePut(ObserverContext<? extends RegionCoprocessorEnvironment> c, Put put,
230      WALEdit edit) throws IOException {
231      String attrVal1 = Bytes.toString(put.getAttribute("extendedAttr1"));
232      String attrVal2 = Bytes.toString(put.getAttribute("extendedAttr2"));
233      if (attrVal1 == null || attrVal2 == null) {
234        throw new IOException("Failed to retrieve WAL annotations");
235      }
236      if (
237        attrVal1.equals("Value of Extended attribute 01")
238          && attrVal2.equals("Value of Extended attribute 02")
239      ) {
240        return;
241      }
242      throw new IOException("Failed to retrieve WAL annotations..");
243    }
244
245    @Override
246    public void preBatchMutate(ObserverContext<? extends RegionCoprocessorEnvironment> c,
247      MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
248      String attrVal1 = Bytes.toString(miniBatchOp.getOperation(0).getAttribute("extendedAttr1"));
249      String attrVal2 = Bytes.toString(miniBatchOp.getOperation(0).getAttribute("extendedAttr2"));
250      if (attrVal1 == null || attrVal2 == null) {
251        throw new IOException("Failed to retrieve WAL annotations");
252      }
253      if (
254        attrVal1.equals("Value of Extended attribute 01")
255          && attrVal2.equals("Value of Extended attribute 02")
256      ) {
257        return;
258      }
259      throw new IOException("Failed to retrieve WAL annotations..");
260    }
261  }
262
263  public static final class TestReplicationSinkRegionServerEndpoint
264    implements RegionServerCoprocessor, RegionServerObserver {
265
266    @Override
267    public Optional<RegionServerObserver> getRegionServerObserver() {
268      return Optional.of(this);
269    }
270
271    @Override
272    public void preReplicationSinkBatchMutate(
273      ObserverContext<RegionServerCoprocessorEnvironment> ctx, AdminProtos.WALEntry walEntry,
274      Mutation mutation) throws IOException {
275      RegionServerObserver.super.preReplicationSinkBatchMutate(ctx, walEntry, mutation);
276      List<WALProtos.Attribute> attributeList = walEntry.getKey().getExtendedAttributesList();
277      attachWALExtendedAttributesToMutation(mutation, attributeList);
278    }
279
280    @Override
281    public void postReplicationSinkBatchMutate(
282      ObserverContext<RegionServerCoprocessorEnvironment> ctx, AdminProtos.WALEntry walEntry,
283      Mutation mutation) throws IOException {
284      RegionServerObserver.super.postReplicationSinkBatchMutate(ctx, walEntry, mutation);
285      LOG.info("WALEntry extended attributes: {}", walEntry.getKey().getExtendedAttributesList());
286      LOG.info("Mutation attributes: {}", mutation.getAttributesMap());
287    }
288
289    private void attachWALExtendedAttributesToMutation(Mutation mutation,
290      List<WALProtos.Attribute> attributeList) {
291      if (attributeList != null) {
292        for (WALProtos.Attribute attribute : attributeList) {
293          mutation.setAttribute(attribute.getKey(), attribute.getValue().toByteArray());
294        }
295      }
296    }
297  }
298
299}