001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapred;
019
020import static org.junit.Assert.assertArrayEquals;
021import static org.junit.Assert.assertNull;
022import static org.mockito.ArgumentMatchers.any;
023import static org.mockito.Mockito.mock;
024import static org.mockito.Mockito.times;
025import static org.mockito.Mockito.verify;
026import static org.mockito.Mockito.verifyNoInteractions;
027import static org.mockito.Mockito.verifyNoMoreInteractions;
028import static org.mockito.Mockito.when;
029
030import java.io.IOException;
031import java.util.List;
032import java.util.concurrent.atomic.AtomicBoolean;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.hbase.Cell;
035import org.apache.hadoop.hbase.HBaseClassTestRule;
036import org.apache.hadoop.hbase.KeyValue;
037import org.apache.hadoop.hbase.client.Result;
038import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
039import org.apache.hadoop.hbase.testclassification.MapReduceTests;
040import org.apache.hadoop.hbase.testclassification.SmallTests;
041import org.apache.hadoop.hbase.util.Bytes;
042import org.apache.hadoop.mapred.JobConf;
043import org.apache.hadoop.mapred.OutputCollector;
044import org.apache.hadoop.mapred.Reporter;
045import org.junit.Assert;
046import org.junit.ClassRule;
047import org.junit.Test;
048import org.junit.experimental.categories.Category;
049
050import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
051
052@Category({ MapReduceTests.class, SmallTests.class })
053public class TestGroupingTableMap {
054
055  @ClassRule
056  public static final HBaseClassTestRule CLASS_RULE =
057    HBaseClassTestRule.forClass(TestGroupingTableMap.class);
058
059  @Test
060  @SuppressWarnings("unchecked")
061  public void shouldNotCallCollectonSinceFindUniqueKeyValueMoreThanOnes() throws Exception {
062    GroupingTableMap gTableMap = null;
063    try {
064      Result result = mock(Result.class);
065      Reporter reporter = mock(Reporter.class);
066      gTableMap = new GroupingTableMap();
067      Configuration cfg = new Configuration();
068      cfg.set(GroupingTableMap.GROUP_COLUMNS, "familyA:qualifierA familyB:qualifierB");
069      JobConf jobConf = new JobConf(cfg);
070      gTableMap.configure(jobConf);
071
072      byte[] row = {};
073      List<Cell> keyValues = ImmutableList.<Cell> of(
074        new KeyValue(row, Bytes.toBytes("familyA"), Bytes.toBytes("qualifierA"),
075          Bytes.toBytes("1111")),
076        new KeyValue(row, Bytes.toBytes("familyA"), Bytes.toBytes("qualifierA"),
077          Bytes.toBytes("2222")),
078        new KeyValue(row, Bytes.toBytes("familyB"), Bytes.toBytes("qualifierB"),
079          Bytes.toBytes("3333")));
080      when(result.listCells()).thenReturn(keyValues);
081      OutputCollector<ImmutableBytesWritable, Result> outputCollectorMock =
082        mock(OutputCollector.class);
083      gTableMap.map(null, result, outputCollectorMock, reporter);
084      verify(result).listCells();
085      verifyNoInteractions(outputCollectorMock);
086    } finally {
087      if (gTableMap != null) gTableMap.close();
088    }
089  }
090
091  @Test
092  @SuppressWarnings("unchecked")
093  public void shouldCreateNewKeyAlthoughExtraKey() throws Exception {
094    GroupingTableMap gTableMap = null;
095    try {
096      Result result = mock(Result.class);
097      Reporter reporter = mock(Reporter.class);
098      gTableMap = new GroupingTableMap();
099      Configuration cfg = new Configuration();
100      cfg.set(GroupingTableMap.GROUP_COLUMNS, "familyA:qualifierA familyB:qualifierB");
101      JobConf jobConf = new JobConf(cfg);
102      gTableMap.configure(jobConf);
103
104      byte[] row = {};
105      List<Cell> keyValues = ImmutableList.<Cell> of(
106        new KeyValue(row, Bytes.toBytes("familyA"), Bytes.toBytes("qualifierA"),
107          Bytes.toBytes("1111")),
108        new KeyValue(row, Bytes.toBytes("familyB"), Bytes.toBytes("qualifierB"),
109          Bytes.toBytes("2222")),
110        new KeyValue(row, Bytes.toBytes("familyC"), Bytes.toBytes("qualifierC"),
111          Bytes.toBytes("3333")));
112      when(result.listCells()).thenReturn(keyValues);
113      OutputCollector<ImmutableBytesWritable, Result> outputCollectorMock =
114        mock(OutputCollector.class);
115      gTableMap.map(null, result, outputCollectorMock, reporter);
116      verify(result).listCells();
117      verify(outputCollectorMock, times(1)).collect(any(), any());
118      verifyNoMoreInteractions(outputCollectorMock);
119    } finally {
120      if (gTableMap != null) gTableMap.close();
121    }
122  }
123
124  @Test
125  public void shouldCreateNewKey() throws Exception {
126    GroupingTableMap gTableMap = null;
127    try {
128      Result result = mock(Result.class);
129      Reporter reporter = mock(Reporter.class);
130      final byte[] bSeparator = Bytes.toBytes(" ");
131      gTableMap = new GroupingTableMap();
132      Configuration cfg = new Configuration();
133      cfg.set(GroupingTableMap.GROUP_COLUMNS, "familyA:qualifierA familyB:qualifierB");
134      JobConf jobConf = new JobConf(cfg);
135      gTableMap.configure(jobConf);
136
137      final byte[] firstPartKeyValue = Bytes.toBytes("34879512738945");
138      final byte[] secondPartKeyValue = Bytes.toBytes("35245142671437");
139      byte[] row = {};
140      List<Cell> cells = ImmutableList.<Cell> of(
141        new KeyValue(row, Bytes.toBytes("familyA"), Bytes.toBytes("qualifierA"), firstPartKeyValue),
142        new KeyValue(row, Bytes.toBytes("familyB"), Bytes.toBytes("qualifierB"),
143          secondPartKeyValue));
144      when(result.listCells()).thenReturn(cells);
145
146      final AtomicBoolean outputCollected = new AtomicBoolean();
147      OutputCollector<ImmutableBytesWritable, Result> outputCollector =
148        new OutputCollector<ImmutableBytesWritable, Result>() {
149          @Override
150          public void collect(ImmutableBytesWritable arg, Result result) throws IOException {
151            assertArrayEquals(org.apache.hbase.thirdparty.com.google.common.primitives.Bytes
152              .concat(firstPartKeyValue, bSeparator, secondPartKeyValue), arg.copyBytes());
153            outputCollected.set(true);
154          }
155        };
156
157      gTableMap.map(null, result, outputCollector, reporter);
158      verify(result).listCells();
159      Assert.assertTrue("Output not received", outputCollected.get());
160
161      final byte[] firstPartValue = Bytes.toBytes("238947928");
162      final byte[] secondPartValue = Bytes.toBytes("4678456942345");
163      byte[][] data = { firstPartValue, secondPartValue };
164      ImmutableBytesWritable byteWritable = gTableMap.createGroupKey(data);
165      assertArrayEquals(org.apache.hbase.thirdparty.com.google.common.primitives.Bytes
166        .concat(firstPartValue, bSeparator, secondPartValue), byteWritable.get());
167    } finally {
168      if (gTableMap != null) gTableMap.close();
169    }
170  }
171
172  @Test
173  public void shouldReturnNullFromCreateGroupKey() throws Exception {
174    GroupingTableMap gTableMap = null;
175    try {
176      gTableMap = new GroupingTableMap();
177      assertNull(gTableMap.createGroupKey(null));
178    } finally {
179      if (gTableMap != null) gTableMap.close();
180    }
181  }
182}