001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.junit.Assert.assertArrayEquals;
021import static org.junit.Assert.fail;
022
023import java.io.IOException;
024
025import org.apache.hadoop.hbase.HBaseClassTestRule;
026import org.apache.hadoop.hbase.HConstants;
027import org.apache.hadoop.hbase.client.Get;
028import org.apache.hadoop.hbase.client.Put;
029import org.apache.hadoop.hbase.client.Result;
030import org.apache.hadoop.hbase.replication.TestReplicationBase;
031import org.apache.hadoop.hbase.testclassification.MediumTests;
032import org.apache.hadoop.hbase.util.Bytes;
033import org.junit.AfterClass;
034import org.junit.BeforeClass;
035import org.junit.ClassRule;
036import org.junit.Test;
037import org.junit.experimental.categories.Category;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040
041@Category(MediumTests.class)
042public class TestReplicationCompressedWAL extends TestReplicationBase {
043
044  @ClassRule
045  public static final HBaseClassTestRule CLASS_RULE =
046      HBaseClassTestRule.forClass(TestReplicationCompressedWAL.class);
047
048  static final Logger LOG = LoggerFactory.getLogger(TestReplicationCompressedWAL.class);
049  static final int NUM_BATCHES = 20;
050  static final int NUM_ROWS_PER_BATCH = 100;
051
052  @BeforeClass
053  public static void setUpBeforeClass() throws Exception {
054    CONF1.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true);
055    TestReplicationBase.setUpBeforeClass();
056  }
057
058  @AfterClass
059  public static void tearDownAfterClass() throws Exception {
060    TestReplicationBase.tearDownAfterClass();
061  }
062
063  @Test
064  public void testMultiplePuts() throws Exception {
065    runMultiplePutTest();
066  }
067
068  protected static void runMultiplePutTest() throws IOException, InterruptedException {
069    for (int i = 0; i < NUM_BATCHES; i++) {
070      putBatch(i);
071      getBatch(i);
072    }
073  }
074
075  protected static void getBatch(int batch) throws IOException, InterruptedException {
076    for (int i = 0; i < NUM_ROWS_PER_BATCH; i++) {
077      byte[] row = getRowKey(batch, i);
078      Get get = new Get(row);
079      for (int j = 0; j < NB_RETRIES; j++) {
080        if (j == NB_RETRIES - 1) {
081          fail("Waited too much time for replication");
082        }
083        Result res = htable2.get(get);
084        if (res.isEmpty()) {
085          LOG.info("Row not available");
086          Thread.sleep(SLEEP_TIME);
087        } else {
088          assertArrayEquals(row, res.value());
089          break;
090        }
091      }
092    }
093  }
094
095  protected static byte[] getRowKey(int batch, int count) {
096    return Bytes.toBytes("row" + ((batch * NUM_ROWS_PER_BATCH) + count));
097  }
098
099  protected static void putBatch(int batch) throws IOException {
100    for (int i = 0; i < NUM_ROWS_PER_BATCH; i++) {
101      byte[] row = getRowKey(batch, i);
102      Put put = new Put(row);
103      put.addColumn(famName, row, row);
104      htable1.put(put);
105    }
106  }
107
108}