001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021
022import java.io.IOException;
023import javax.validation.constraints.Null;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.hbase.HBaseTestingUtil;
026import org.apache.hadoop.hbase.TableName;
027import org.apache.hadoop.hbase.client.Delete;
028import org.apache.hadoop.hbase.client.Durability;
029import org.apache.hadoop.hbase.client.Mutation;
030import org.apache.hadoop.hbase.client.Put;
031import org.apache.hadoop.hbase.testclassification.MediumTests;
032import org.apache.hadoop.hbase.util.Bytes;
033import org.apache.hadoop.mapreduce.JobContext;
034import org.apache.hadoop.mapreduce.OutputCommitter;
035import org.apache.hadoop.mapreduce.RecordWriter;
036import org.apache.hadoop.mapreduce.TaskAttemptContext;
037import org.junit.jupiter.api.AfterAll;
038import org.junit.jupiter.api.AfterEach;
039import org.junit.jupiter.api.BeforeAll;
040import org.junit.jupiter.api.Tag;
041import org.junit.jupiter.api.Test;
042import org.mockito.Mockito;
043
044/**
045 * Simple Tests to check whether the durability of the Mutation is changed or not, for
046 * {@link TableOutputFormat} if {@link TableOutputFormat#WAL_PROPERTY} is set to false.
047 */
048@Tag(MediumTests.TAG)
049public class TestTableOutputFormat {
050
051  private static final HBaseTestingUtil util = new HBaseTestingUtil();
052  private static final TableName TABLE_NAME = TableName.valueOf("TEST_TABLE");
053  private static final byte[] columnFamily = Bytes.toBytes("f");
054  private static Configuration conf;
055  private static RecordWriter<Null, Mutation> writer;
056  private static TaskAttemptContext context;
057  private static TableOutputFormat<Null> tableOutputFormat;
058
059  @BeforeAll
060  public static void setUp() throws Exception {
061    util.startMiniCluster();
062    util.createTable(TABLE_NAME, columnFamily);
063
064    conf = new Configuration(util.getConfiguration());
065    context = Mockito.mock(TaskAttemptContext.class);
066    tableOutputFormat = new TableOutputFormat<>();
067    conf.set(TableOutputFormat.OUTPUT_TABLE, "TEST_TABLE");
068  }
069
070  @AfterAll
071  public static void tearDown() throws Exception {
072    util.shutdownMiniCluster();
073  }
074
075  @AfterEach
076  public void close() throws IOException, InterruptedException {
077    if (writer != null && context != null) {
078      writer.close(context);
079    }
080  }
081
082  @Test
083  public void testTableOutputFormatWhenWalIsOFFForPut() throws IOException, InterruptedException {
084    // setting up the configuration for the TableOutputFormat, with writing to the WAL off.
085    conf.setBoolean(TableOutputFormat.WAL_PROPERTY, TableOutputFormat.WAL_OFF);
086    tableOutputFormat.setConf(conf);
087
088    writer = tableOutputFormat.getRecordWriter(context);
089
090    // creating mutation of the type put
091    Put put = new Put("row1".getBytes());
092    put.addColumn(columnFamily, Bytes.toBytes("aa"), Bytes.toBytes("value"));
093
094    // verifying whether durability of mutation is USE_DEFAULT or not, before commiting write.
095    assertEquals(Durability.USE_DEFAULT, put.getDurability(),
096      "Durability of the mutation should be USE_DEFAULT");
097
098    writer.write(null, put);
099
100    // verifying whether durability of mutation got changed to the SKIP_WAL or not.
101    assertEquals(Durability.SKIP_WAL, put.getDurability(),
102      "Durability of the mutation should be SKIP_WAL");
103  }
104
105  @Test
106  public void testTableOutputFormatWhenWalIsOFFForDelete()
107    throws IOException, InterruptedException {
108    // setting up the configuration for the TableOutputFormat, with writing to the WAL off.
109    conf.setBoolean(TableOutputFormat.WAL_PROPERTY, TableOutputFormat.WAL_OFF);
110    tableOutputFormat.setConf(conf);
111
112    writer = tableOutputFormat.getRecordWriter(context);
113
114    // creating mutation of the type delete
115    Delete delete = new Delete("row2".getBytes());
116    delete.addColumn(columnFamily, Bytes.toBytes("aa"));
117
118    // verifying whether durability of mutation is USE_DEFAULT or not, before commiting write.
119    assertEquals(Durability.USE_DEFAULT, delete.getDurability(),
120      "Durability of the mutation should be USE_DEFAULT");
121
122    writer.write(null, delete);
123
124    // verifying whether durability of mutation got changed from USE_DEFAULT to the SKIP_WAL or not.
125    assertEquals(Durability.SKIP_WAL, delete.getDurability(),
126      "Durability of the mutation should be SKIP_WAL");
127  }
128
129  @Test
130  public void testOutputCommitterConfiguration() throws IOException, InterruptedException {
131    // 1. Verify it returns the default committer when the property is not set.
132    conf.unset(TableOutputFormat.OUTPUT_COMMITTER_CLASS);
133    tableOutputFormat.setConf(conf);
134    assertEquals(TableOutputCommitter.class,
135      tableOutputFormat.getOutputCommitter(context).getClass(), "Should use default committer");
136
137    // 2. Verify it returns the custom committer when the property is set.
138    conf.set(TableOutputFormat.OUTPUT_COMMITTER_CLASS, DummyCommitter.class.getName());
139    tableOutputFormat.setConf(conf);
140    assertEquals(DummyCommitter.class, tableOutputFormat.getOutputCommitter(context).getClass(),
141      "Should use custom committer");
142  }
143
144  // Simple dummy committer for testing
145  public static class DummyCommitter extends OutputCommitter {
146    @Override
147    public void setupJob(JobContext jobContext) {
148    }
149
150    @Override
151    public void setupTask(TaskAttemptContext taskContext) {
152    }
153
154    @Override
155    public boolean needsTaskCommit(TaskAttemptContext taskContext) {
156      return false;
157    }
158
159    @Override
160    public void commitTask(TaskAttemptContext taskContext) {
161    }
162
163    @Override
164    public void abortTask(TaskAttemptContext taskContext) {
165    }
166  }
167}