001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.regionserver.wal;
020
021import static org.junit.Assert.assertArrayEquals;
022import static org.junit.Assert.assertFalse;
023
024import java.io.IOException;
025import java.lang.reflect.Method;
026import java.util.Arrays;
027import java.util.List;
028import org.apache.hadoop.fs.FSDataOutputStream;
029import org.apache.hadoop.fs.Path;
030import org.apache.hadoop.fs.StreamCapabilities;
031import org.apache.hadoop.hbase.HBaseClassTestRule;
032import org.apache.hadoop.hbase.HBaseTestingUtility;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.hadoop.hbase.client.Get;
035import org.apache.hadoop.hbase.client.Put;
036import org.apache.hadoop.hbase.client.Table;
037import org.apache.hadoop.hbase.testclassification.LargeTests;
038import org.apache.hadoop.hbase.testclassification.RegionServerTests;
039import org.apache.hadoop.hbase.util.Bytes;
040import org.apache.hadoop.hbase.util.CommonFSUtils;
041import org.apache.hadoop.hbase.wal.WALFactory;
042import org.apache.hadoop.hdfs.DFSClient;
043import org.apache.hadoop.hdfs.DFSTestUtil;
044import org.apache.hadoop.hdfs.DistributedFileSystem;
045import org.apache.hadoop.hdfs.MiniDFSCluster;
046import org.junit.After;
047import org.junit.Assume;
048import org.junit.Before;
049import org.junit.BeforeClass;
050import org.junit.ClassRule;
051import org.junit.Test;
052import org.junit.experimental.categories.Category;
053import org.junit.runner.RunWith;
054import org.junit.runners.Parameterized;
055import org.junit.runners.Parameterized.Parameter;
056import org.junit.runners.Parameterized.Parameters;
057
058@RunWith(Parameterized.class)
059@Category({ RegionServerTests.class, LargeTests.class })
060public class TestHBaseWalOnEC {
061
062  @ClassRule
063  public static final HBaseClassTestRule CLASS_RULE =
064    HBaseClassTestRule.forClass(TestHBaseWalOnEC.class);
065
066  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
067
068  @BeforeClass
069  public static void setUpBeforeClass() throws Exception {
070    try {
071      MiniDFSCluster cluster = UTIL.startMiniDFSCluster(3); // Need 3 DNs for RS-3-2 policy
072      DistributedFileSystem fs = cluster.getFileSystem();
073
074      Method enableAllECPolicies =
075        DFSTestUtil.class.getMethod("enableAllECPolicies", DistributedFileSystem.class);
076      enableAllECPolicies.invoke(null, fs);
077
078      DFSClient client = fs.getClient();
079      Method setErasureCodingPolicy =
080        DFSClient.class.getMethod("setErasureCodingPolicy", String.class, String.class);
081      setErasureCodingPolicy.invoke(client, "/", "RS-3-2-1024k"); // try a built-in policy
082
083      try (FSDataOutputStream out = fs.create(new Path("/canary"))) {
084        // If this comes back as having hflush then some test setup assumption is wrong.
085        // Fail the test so that a developer has to look and triage
086        assertFalse("Did not enable EC!", out.hasCapability(StreamCapabilities.HFLUSH));
087      }
088    } catch (NoSuchMethodException e) {
089      // We're not testing anything interesting if EC is not available, so skip the rest of the test
090      Assume.assumeNoException("Using an older version of hadoop; EC not available.", e);
091    }
092
093    UTIL.getConfiguration().setBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, true);
094
095  }
096
097  @Parameter
098  public String walProvider;
099
100  @Parameters
101  public static List<Object[]> params() {
102    return Arrays.asList(new Object[] { "asyncfs" }, new Object[] { "filesystem" });
103  }
104
105  @Before
106  public void setUp() throws Exception {
107    UTIL.getConfiguration().set(WALFactory.WAL_PROVIDER, walProvider);
108    UTIL.startMiniCluster(3);
109  }
110
111  @After
112  public void tearDown() throws Exception {
113    UTIL.shutdownMiniCluster();
114  }
115
116  @Test
117  public void testReadWrite() throws IOException {
118    byte[] row = Bytes.toBytes("row");
119    byte[] cf = Bytes.toBytes("cf");
120    byte[] cq = Bytes.toBytes("cq");
121    byte[] value = Bytes.toBytes("value");
122
123    TableName name = TableName.valueOf(getClass().getSimpleName());
124
125    Table t = UTIL.createTable(name, cf);
126    t.put(new Put(row).addColumn(cf, cq, value));
127
128    UTIL.getAdmin().flush(name);
129
130    assertArrayEquals(value, t.get(new Get(row)).getValue(cf, cq));
131  }
132}