001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import java.io.IOException;
021import javax.validation.constraints.Null;
022import org.apache.hadoop.conf.Configuration;
023import org.apache.hadoop.hbase.HBaseClassTestRule;
024import org.apache.hadoop.hbase.HBaseTestingUtil;
025import org.apache.hadoop.hbase.TableName;
026import org.apache.hadoop.hbase.client.Delete;
027import org.apache.hadoop.hbase.client.Durability;
028import org.apache.hadoop.hbase.client.Mutation;
029import org.apache.hadoop.hbase.client.Put;
030import org.apache.hadoop.hbase.testclassification.MediumTests;
031import org.apache.hadoop.hbase.util.Bytes;
032import org.apache.hadoop.mapreduce.JobContext;
033import org.apache.hadoop.mapreduce.OutputCommitter;
034import org.apache.hadoop.mapreduce.RecordWriter;
035import org.apache.hadoop.mapreduce.TaskAttemptContext;
036import org.junit.After;
037import org.junit.AfterClass;
038import org.junit.Assert;
039import org.junit.BeforeClass;
040import org.junit.ClassRule;
041import org.junit.Test;
042import org.junit.experimental.categories.Category;
043import org.mockito.Mockito;
044
045/**
046 * Simple Tests to check whether the durability of the Mutation is changed or not, for
047 * {@link TableOutputFormat} if {@link TableOutputFormat#WAL_PROPERTY} is set to false.
048 */
049@Category(MediumTests.class)
050public class TestTableOutputFormat {
051  @ClassRule
052  public static final HBaseClassTestRule CLASS_RULE =
053    HBaseClassTestRule.forClass(TestTableOutputFormat.class);
054
055  private static final HBaseTestingUtil util = new HBaseTestingUtil();
056  private static final TableName TABLE_NAME = TableName.valueOf("TEST_TABLE");
057  private static final byte[] columnFamily = Bytes.toBytes("f");
058  private static Configuration conf;
059  private static RecordWriter<Null, Mutation> writer;
060  private static TaskAttemptContext context;
061  private static TableOutputFormat<Null> tableOutputFormat;
062
063  @BeforeClass
064  public static void setUp() throws Exception {
065    util.startMiniCluster();
066    util.createTable(TABLE_NAME, columnFamily);
067
068    conf = new Configuration(util.getConfiguration());
069    context = Mockito.mock(TaskAttemptContext.class);
070    tableOutputFormat = new TableOutputFormat<>();
071    conf.set(TableOutputFormat.OUTPUT_TABLE, "TEST_TABLE");
072  }
073
074  @AfterClass
075  public static void tearDown() throws Exception {
076    util.shutdownMiniCluster();
077  }
078
079  @After
080  public void close() throws IOException, InterruptedException {
081    if (writer != null && context != null) {
082      writer.close(context);
083    }
084  }
085
086  @Test
087  public void testTableOutputFormatWhenWalIsOFFForPut() throws IOException, InterruptedException {
088    // setting up the configuration for the TableOutputFormat, with writing to the WAL off.
089    conf.setBoolean(TableOutputFormat.WAL_PROPERTY, TableOutputFormat.WAL_OFF);
090    tableOutputFormat.setConf(conf);
091
092    writer = tableOutputFormat.getRecordWriter(context);
093
094    // creating mutation of the type put
095    Put put = new Put("row1".getBytes());
096    put.addColumn(columnFamily, Bytes.toBytes("aa"), Bytes.toBytes("value"));
097
098    // verifying whether durability of mutation is USE_DEFAULT or not, before commiting write.
099    Assert.assertEquals("Durability of the mutation should be USE_DEFAULT", Durability.USE_DEFAULT,
100      put.getDurability());
101
102    writer.write(null, put);
103
104    // verifying whether durability of mutation got changed to the SKIP_WAL or not.
105    Assert.assertEquals("Durability of the mutation should be SKIP_WAL", Durability.SKIP_WAL,
106      put.getDurability());
107  }
108
109  @Test
110  public void testTableOutputFormatWhenWalIsOFFForDelete()
111    throws IOException, InterruptedException {
112    // setting up the configuration for the TableOutputFormat, with writing to the WAL off.
113    conf.setBoolean(TableOutputFormat.WAL_PROPERTY, TableOutputFormat.WAL_OFF);
114    tableOutputFormat.setConf(conf);
115
116    writer = tableOutputFormat.getRecordWriter(context);
117
118    // creating mutation of the type delete
119    Delete delete = new Delete("row2".getBytes());
120    delete.addColumn(columnFamily, Bytes.toBytes("aa"));
121
122    // verifying whether durability of mutation is USE_DEFAULT or not, before commiting write.
123    Assert.assertEquals("Durability of the mutation should be USE_DEFAULT", Durability.USE_DEFAULT,
124      delete.getDurability());
125
126    writer.write(null, delete);
127
128    // verifying whether durability of mutation got changed from USE_DEFAULT to the SKIP_WAL or not.
129    Assert.assertEquals("Durability of the mutation should be SKIP_WAL", Durability.SKIP_WAL,
130      delete.getDurability());
131  }
132
133  @Test
134  public void testOutputCommitterConfiguration() throws IOException, InterruptedException {
135    // 1. Verify it returns the default committer when the property is not set.
136    conf.unset(TableOutputFormat.OUTPUT_COMMITTER_CLASS);
137    tableOutputFormat.setConf(conf);
138    Assert.assertEquals("Should use default committer", TableOutputCommitter.class,
139      tableOutputFormat.getOutputCommitter(context).getClass());
140
141    // 2. Verify it returns the custom committer when the property is set.
142    conf.set(TableOutputFormat.OUTPUT_COMMITTER_CLASS, DummyCommitter.class.getName());
143    tableOutputFormat.setConf(conf);
144    Assert.assertEquals("Should use custom committer", DummyCommitter.class,
145      tableOutputFormat.getOutputCommitter(context).getClass());
146  }
147
148  // Simple dummy committer for testing
149  public static class DummyCommitter extends OutputCommitter {
150    @Override
151    public void setupJob(JobContext jobContext) {
152    }
153
154    @Override
155    public void setupTask(TaskAttemptContext taskContext) {
156    }
157
158    @Override
159    public boolean needsTaskCommit(TaskAttemptContext taskContext) {
160      return false;
161    }
162
163    @Override
164    public void commitTask(TaskAttemptContext taskContext) {
165    }
166
167    @Override
168    public void abortTask(TaskAttemptContext taskContext) {
169    }
170  }
171}