001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertArrayEquals;
021import static org.junit.Assert.assertTrue;
022import static org.mockito.Mockito.doAnswer;
023import static org.mockito.Mockito.spy;
024
025import java.io.IOException;
026import java.util.Collection;
027import java.util.Map;
028import org.apache.commons.lang3.mutable.MutableBoolean;
029import org.apache.hadoop.hbase.DroppedSnapshotException;
030import org.apache.hadoop.hbase.HBaseClassTestRule;
031import org.apache.hadoop.hbase.HBaseTestingUtility;
032import org.apache.hadoop.hbase.HConstants;
033import org.apache.hadoop.hbase.NamespaceDescriptor;
034import org.apache.hadoop.hbase.TableName;
035import org.apache.hadoop.hbase.client.Admin;
036import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
037import org.apache.hadoop.hbase.client.Connection;
038import org.apache.hadoop.hbase.client.Get;
039import org.apache.hadoop.hbase.client.Put;
040import org.apache.hadoop.hbase.client.Result;
041import org.apache.hadoop.hbase.client.Table;
042import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
043import org.apache.hadoop.hbase.monitoring.MonitoredTask;
044import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult;
045import org.apache.hadoop.hbase.regionserver.HRegion.PrepareFlushResult;
046import org.apache.hadoop.hbase.testclassification.LargeTests;
047import org.apache.hadoop.hbase.util.Bytes;
048import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
049import org.apache.hadoop.hbase.wal.WAL;
050import org.junit.After;
051import org.junit.Before;
052import org.junit.ClassRule;
053import org.junit.Test;
054import org.junit.experimental.categories.Category;
055import org.mockito.Matchers;
056import org.mockito.invocation.InvocationOnMock;
057import org.mockito.stubbing.Answer;
058import org.slf4j.Logger;
059import org.slf4j.LoggerFactory;
060
061/**
062 * Testcase for https://issues.apache.org/jira/browse/HBASE-13811
063 */
064@Category({ LargeTests.class })
065public class TestSplitWalDataLoss {
066
067  @ClassRule
068  public static final HBaseClassTestRule CLASS_RULE =
069      HBaseClassTestRule.forClass(TestSplitWalDataLoss.class);
070
071  private static final Logger LOG = LoggerFactory.getLogger(TestSplitWalDataLoss.class);
072
073  private final HBaseTestingUtility testUtil = new HBaseTestingUtility();
074
075  private NamespaceDescriptor namespace = NamespaceDescriptor.create(getClass().getSimpleName())
076      .build();
077
078  private TableName tableName = TableName.valueOf(namespace.getName(), "dataloss");
079
080  private byte[] family = Bytes.toBytes("f");
081
082  private byte[] qualifier = Bytes.toBytes("q");
083
084  @Before
085  public void setUp() throws Exception {
086    testUtil.getConfiguration().setInt("hbase.regionserver.msginterval", 30000);
087    testUtil.startMiniCluster(2);
088    Admin admin = testUtil.getAdmin();
089    admin.createNamespace(namespace);
090    admin.createTable(TableDescriptorBuilder.newBuilder(tableName)
091        .setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)).build());
092    testUtil.waitTableAvailable(tableName);
093  }
094
095  @After
096  public void tearDown() throws Exception {
097    testUtil.shutdownMiniCluster();
098  }
099
100  @Test
101  public void test() throws IOException, InterruptedException {
102    final HRegionServer rs = testUtil.getRSForFirstRegionInTable(tableName);
103    final HRegion region = (HRegion) rs.getRegions(tableName).get(0);
104    HRegion spiedRegion = spy(region);
105    final MutableBoolean flushed = new MutableBoolean(false);
106    final MutableBoolean reported = new MutableBoolean(false);
107    doAnswer(new Answer<FlushResult>() {
108      @Override
109      public FlushResult answer(InvocationOnMock invocation) throws Throwable {
110        synchronized (flushed) {
111          flushed.setValue(true);
112          flushed.notifyAll();
113        }
114        synchronized (reported) {
115          while (!reported.booleanValue()) {
116            reported.wait();
117          }
118        }
119        rs.getWAL(region.getRegionInfo()).abortCacheFlush(
120          region.getRegionInfo().getEncodedNameAsBytes());
121        throw new DroppedSnapshotException("testcase");
122      }
123    }).when(spiedRegion).internalFlushCacheAndCommit(Matchers.<WAL> any(),
124      Matchers.<MonitoredTask> any(), Matchers.<PrepareFlushResult> any(),
125      Matchers.<Collection<HStore>> any());
126    // Find region key; don't pick up key for hbase:meta by mistake.
127    String key = null;
128    for (Map.Entry<String, HRegion> entry: rs.getOnlineRegions().entrySet()) {
129      if (entry.getValue().getRegionInfo().getTable().equals(this.tableName)) {
130        key = entry.getKey();
131        break;
132      }
133    }
134    rs.getOnlineRegions().put(key, spiedRegion);
135    Connection conn = testUtil.getConnection();
136
137    try (Table table = conn.getTable(tableName)) {
138      table.put(new Put(Bytes.toBytes("row0"))
139              .addColumn(family, qualifier, Bytes.toBytes("val0")));
140    }
141    long oldestSeqIdOfStore = region.getOldestSeqIdOfStore(family);
142    LOG.info("CHANGE OLDEST " + oldestSeqIdOfStore);
143    assertTrue(oldestSeqIdOfStore > HConstants.NO_SEQNUM);
144    rs.getMemStoreFlusher().requestFlush(spiedRegion, false, FlushLifeCycleTracker.DUMMY);
145    synchronized (flushed) {
146      while (!flushed.booleanValue()) {
147        flushed.wait();
148      }
149    }
150    try (Table table = conn.getTable(tableName)) {
151      table.put(new Put(Bytes.toBytes("row1"))
152              .addColumn(family, qualifier, Bytes.toBytes("val1")));
153    }
154    long now = EnvironmentEdgeManager.currentTime();
155    rs.tryRegionServerReport(now - 500, now);
156    synchronized (reported) {
157      reported.setValue(true);
158      reported.notifyAll();
159    }
160    while (testUtil.getRSForFirstRegionInTable(tableName) == rs) {
161      Thread.sleep(100);
162    }
163    try (Table table = conn.getTable(tableName)) {
164      Result result = table.get(new Get(Bytes.toBytes("row0")));
165      assertArrayEquals(Bytes.toBytes("val0"), result.getValue(family, qualifier));
166    }
167  }
168}