001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.junit.Assert.assertArrayEquals;
021import static org.junit.Assert.fail;
022
023import java.io.IOException;
024import java.util.Collections;
025import java.util.List;
026import java.util.Optional;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.hbase.HBaseClassTestRule;
029import org.apache.hadoop.hbase.HBaseConfiguration;
030import org.apache.hadoop.hbase.HBaseTestingUtil;
031import org.apache.hadoop.hbase.HConstants;
032import org.apache.hadoop.hbase.TableName;
033import org.apache.hadoop.hbase.client.Admin;
034import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
035import org.apache.hadoop.hbase.client.Connection;
036import org.apache.hadoop.hbase.client.ConnectionFactory;
037import org.apache.hadoop.hbase.client.Get;
038import org.apache.hadoop.hbase.client.Mutation;
039import org.apache.hadoop.hbase.client.Put;
040import org.apache.hadoop.hbase.client.Result;
041import org.apache.hadoop.hbase.client.Table;
042import org.apache.hadoop.hbase.client.TableDescriptor;
043import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
044import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags;
045import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
046import org.apache.hadoop.hbase.coprocessor.ObserverContext;
047import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
048import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
049import org.apache.hadoop.hbase.coprocessor.RegionObserver;
050import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessor;
051import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
052import org.apache.hadoop.hbase.coprocessor.RegionServerObserver;
053import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
054import org.apache.hadoop.hbase.testclassification.MediumTests;
055import org.apache.hadoop.hbase.testclassification.ReplicationTests;
056import org.apache.hadoop.hbase.util.Bytes;
057import org.apache.hadoop.hbase.wal.WALEdit;
058import org.apache.hadoop.hbase.wal.WALKey;
059import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
060import org.junit.AfterClass;
061import org.junit.BeforeClass;
062import org.junit.ClassRule;
063import org.junit.Test;
064import org.junit.experimental.categories.Category;
065import org.slf4j.Logger;
066import org.slf4j.LoggerFactory;
067
068import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
069
070import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
071import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
072
073@Category({ ReplicationTests.class, MediumTests.class })
074public class TestReplicationWithWALExtendedAttributes {
075
076  @ClassRule
077  public static final HBaseClassTestRule CLASS_RULE =
078    HBaseClassTestRule.forClass(TestReplicationWithWALExtendedAttributes.class);
079
080  private static final Logger LOG =
081    LoggerFactory.getLogger(TestReplicationWithWALExtendedAttributes.class);
082
083  private static Configuration conf1 = HBaseConfiguration.create();
084
085  private static Admin replicationAdmin;
086
087  private static Connection connection1;
088
089  private static Table htable1;
090  private static Table htable2;
091
092  private static HBaseTestingUtil utility1;
093  private static HBaseTestingUtil utility2;
094  private static final long SLEEP_TIME = 500;
095  private static final int NB_RETRIES = 10;
096
097  private static final TableName TABLE_NAME = TableName.valueOf("TestReplicationWithWALAnnotation");
098  private static final byte[] FAMILY = Bytes.toBytes("f");
099  private static final byte[] ROW = Bytes.toBytes("row");
100  private static final byte[] ROW2 = Bytes.toBytes("row2");
101
102  @BeforeClass
103  public static void setUpBeforeClass() throws Exception {
104    conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
105    conf1.setInt("replication.source.size.capacity", 10240);
106    conf1.setLong("replication.source.sleepforretries", 100);
107    conf1.setInt("hbase.regionserver.maxlogs", 10);
108    conf1.setLong("hbase.master.logcleaner.ttl", 10);
109    conf1.setInt("zookeeper.recovery.retry", 1);
110    conf1.setInt("zookeeper.recovery.retry.intervalmill", 10);
111    conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
112    conf1.setInt("replication.stats.thread.period.seconds", 5);
113    conf1.setBoolean("hbase.tests.use.shortcircuit.reads", false);
114    conf1.setStrings(HConstants.REPLICATION_CODEC_CONF_KEY, KeyValueCodecWithTags.class.getName());
115    conf1.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
116      TestCoprocessorForWALAnnotationAtSource.class.getName());
117
118    utility1 = new HBaseTestingUtil(conf1);
119    utility1.startMiniZKCluster();
120    MiniZooKeeperCluster miniZK = utility1.getZkCluster();
121    // Have to reget conf1 in case zk cluster location different
122    // than default
123    conf1 = utility1.getConfiguration();
124    LOG.info("Setup first Zk");
125
126    // Base conf2 on conf1 so it gets the right zk cluster.
127    Configuration conf2 = HBaseConfiguration.create(conf1);
128    conf2.setInt("hfile.format.version", 3);
129    conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
130    conf2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6);
131    conf2.setBoolean("hbase.tests.use.shortcircuit.reads", false);
132    conf2.setStrings(HConstants.REPLICATION_CODEC_CONF_KEY, KeyValueCodecWithTags.class.getName());
133    conf2.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
134      TestCoprocessorForWALAnnotationAtSink.class.getName());
135    conf2.setStrings(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY,
136      TestReplicationSinkRegionServerEndpoint.class.getName());
137
138    utility2 = new HBaseTestingUtil(conf2);
139    utility2.setZkCluster(miniZK);
140
141    LOG.info("Setup second Zk");
142    utility1.startMiniCluster(2);
143    utility2.startMiniCluster(2);
144
145    connection1 = ConnectionFactory.createConnection(conf1);
146    replicationAdmin = connection1.getAdmin();
147    ReplicationPeerConfig rpc =
148      ReplicationPeerConfig.newBuilder().setClusterKey(utility2.getClusterKey()).build();
149    replicationAdmin.addReplicationPeer("2", rpc);
150
151    TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(TABLE_NAME)
152      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setMaxVersions(3)
153        .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
154      .build();
155    try (Connection conn = ConnectionFactory.createConnection(conf1);
156      Admin admin = conn.getAdmin()) {
157      admin.createTable(tableDescriptor, HBaseTestingUtil.KEYS_FOR_HBA_CREATE_TABLE);
158    }
159    try (Connection conn = ConnectionFactory.createConnection(conf2);
160      Admin admin = conn.getAdmin()) {
161      admin.createTable(tableDescriptor, HBaseTestingUtil.KEYS_FOR_HBA_CREATE_TABLE);
162    }
163    htable1 = utility1.getConnection().getTable(TABLE_NAME);
164    htable2 = utility2.getConnection().getTable(TABLE_NAME);
165  }
166
167  @AfterClass
168  public static void tearDownAfterClass() throws Exception {
169    Closeables.close(replicationAdmin, true);
170    Closeables.close(connection1, true);
171    utility2.shutdownMiniCluster();
172    utility1.shutdownMiniCluster();
173  }
174
175  @Test
176  public void testReplicationWithWALExtendedAttributes() throws Exception {
177    Put put = new Put(ROW);
178    put.addColumn(FAMILY, ROW, ROW);
179
180    htable1 = utility1.getConnection().getTable(TABLE_NAME);
181    htable1.put(put);
182
183    Put put2 = new Put(ROW2);
184    put2.addColumn(FAMILY, ROW2, ROW2);
185
186    htable1.batch(Collections.singletonList(put2), new Object[1]);
187
188    assertGetValues(new Get(ROW), ROW);
189    assertGetValues(new Get(ROW2), ROW2);
190  }
191
192  private static void assertGetValues(Get get, byte[] value)
193    throws IOException, InterruptedException {
194    for (int i = 0; i < NB_RETRIES; i++) {
195      if (i == NB_RETRIES - 1) {
196        fail("Waited too much time for put replication");
197      }
198      Result res = htable2.get(get);
199      if (res.isEmpty()) {
200        LOG.info("Row not available");
201        Thread.sleep(SLEEP_TIME);
202      } else {
203        assertArrayEquals(value, res.value());
204        break;
205      }
206    }
207  }
208
209  public static class TestCoprocessorForWALAnnotationAtSource
210    implements RegionCoprocessor, RegionObserver {
211
212    @Override
213    public Optional<RegionObserver> getRegionObserver() {
214      return Optional.of(this);
215    }
216
217    @Override
218    public void preWALAppend(ObserverContext<RegionCoprocessorEnvironment> ctx, WALKey key,
219      WALEdit edit) throws IOException {
220      key.addExtendedAttribute("extendedAttr1", Bytes.toBytes("Value of Extended attribute 01"));
221      key.addExtendedAttribute("extendedAttr2", Bytes.toBytes("Value of Extended attribute 02"));
222    }
223  }
224
225  public static class TestCoprocessorForWALAnnotationAtSink
226    implements RegionCoprocessor, RegionObserver {
227
228    @Override
229    public Optional<RegionObserver> getRegionObserver() {
230      return Optional.of(this);
231    }
232
233    @Override
234    public void prePut(ObserverContext<RegionCoprocessorEnvironment> c, Put put, WALEdit edit)
235      throws IOException {
236      String attrVal1 = Bytes.toString(put.getAttribute("extendedAttr1"));
237      String attrVal2 = Bytes.toString(put.getAttribute("extendedAttr2"));
238      if (attrVal1 == null || attrVal2 == null) {
239        throw new IOException("Failed to retrieve WAL annotations");
240      }
241      if (
242        attrVal1.equals("Value of Extended attribute 01")
243          && attrVal2.equals("Value of Extended attribute 02")
244      ) {
245        return;
246      }
247      throw new IOException("Failed to retrieve WAL annotations..");
248    }
249
250    @Override
251    public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
252      MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
253      String attrVal1 = Bytes.toString(miniBatchOp.getOperation(0).getAttribute("extendedAttr1"));
254      String attrVal2 = Bytes.toString(miniBatchOp.getOperation(0).getAttribute("extendedAttr2"));
255      if (attrVal1 == null || attrVal2 == null) {
256        throw new IOException("Failed to retrieve WAL annotations");
257      }
258      if (
259        attrVal1.equals("Value of Extended attribute 01")
260          && attrVal2.equals("Value of Extended attribute 02")
261      ) {
262        return;
263      }
264      throw new IOException("Failed to retrieve WAL annotations..");
265    }
266  }
267
268  public static final class TestReplicationSinkRegionServerEndpoint
269    implements RegionServerCoprocessor, RegionServerObserver {
270
271    @Override
272    public Optional<RegionServerObserver> getRegionServerObserver() {
273      return Optional.of(this);
274    }
275
276    @Override
277    public void preReplicationSinkBatchMutate(
278      ObserverContext<RegionServerCoprocessorEnvironment> ctx, AdminProtos.WALEntry walEntry,
279      Mutation mutation) throws IOException {
280      RegionServerObserver.super.preReplicationSinkBatchMutate(ctx, walEntry, mutation);
281      List<WALProtos.Attribute> attributeList = walEntry.getKey().getExtendedAttributesList();
282      attachWALExtendedAttributesToMutation(mutation, attributeList);
283    }
284
285    @Override
286    public void postReplicationSinkBatchMutate(
287      ObserverContext<RegionServerCoprocessorEnvironment> ctx, AdminProtos.WALEntry walEntry,
288      Mutation mutation) throws IOException {
289      RegionServerObserver.super.postReplicationSinkBatchMutate(ctx, walEntry, mutation);
290      LOG.info("WALEntry extended attributes: {}", walEntry.getKey().getExtendedAttributesList());
291      LOG.info("Mutation attributes: {}", mutation.getAttributesMap());
292    }
293
294    private void attachWALExtendedAttributesToMutation(Mutation mutation,
295      List<WALProtos.Attribute> attributeList) {
296      if (attributeList != null) {
297        for (WALProtos.Attribute attribute : attributeList) {
298          mutation.setAttribute(attribute.getKey(), attribute.getValue().toByteArray());
299        }
300      }
301    }
302  }
303
304}