001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapred;
019
020import static org.junit.jupiter.api.Assertions.assertArrayEquals;
021import static org.junit.jupiter.api.Assertions.assertNull;
022import static org.junit.jupiter.api.Assertions.assertTrue;
023import static org.mockito.ArgumentMatchers.any;
024import static org.mockito.Mockito.mock;
025import static org.mockito.Mockito.times;
026import static org.mockito.Mockito.verify;
027import static org.mockito.Mockito.verifyNoInteractions;
028import static org.mockito.Mockito.verifyNoMoreInteractions;
029import static org.mockito.Mockito.when;
030
031import java.io.IOException;
032import java.util.List;
033import java.util.concurrent.atomic.AtomicBoolean;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.hbase.Cell;
036import org.apache.hadoop.hbase.KeyValue;
037import org.apache.hadoop.hbase.client.Result;
038import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
039import org.apache.hadoop.hbase.testclassification.MapReduceTests;
040import org.apache.hadoop.hbase.testclassification.SmallTests;
041import org.apache.hadoop.hbase.util.Bytes;
042import org.apache.hadoop.mapred.JobConf;
043import org.apache.hadoop.mapred.OutputCollector;
044import org.apache.hadoop.mapred.Reporter;
045import org.junit.jupiter.api.Tag;
046import org.junit.jupiter.api.Test;
047
048import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
049
050@Tag(MapReduceTests.TAG)
051@Tag(SmallTests.TAG)
052public class TestGroupingTableMap {
053
054  @Test
055  @SuppressWarnings("unchecked")
056  public void shouldNotCallCollectonSinceFindUniqueKeyValueMoreThanOnes() throws Exception {
057    GroupingTableMap gTableMap = null;
058    try {
059      Result result = mock(Result.class);
060      Reporter reporter = mock(Reporter.class);
061      gTableMap = new GroupingTableMap();
062      Configuration cfg = new Configuration();
063      cfg.set(GroupingTableMap.GROUP_COLUMNS, "familyA:qualifierA familyB:qualifierB");
064      JobConf jobConf = new JobConf(cfg);
065      gTableMap.configure(jobConf);
066
067      byte[] row = {};
068      List<Cell> keyValues = ImmutableList.<Cell> of(
069        new KeyValue(row, Bytes.toBytes("familyA"), Bytes.toBytes("qualifierA"),
070          Bytes.toBytes("1111")),
071        new KeyValue(row, Bytes.toBytes("familyA"), Bytes.toBytes("qualifierA"),
072          Bytes.toBytes("2222")),
073        new KeyValue(row, Bytes.toBytes("familyB"), Bytes.toBytes("qualifierB"),
074          Bytes.toBytes("3333")));
075      when(result.listCells()).thenReturn(keyValues);
076      OutputCollector<ImmutableBytesWritable, Result> outputCollectorMock =
077        mock(OutputCollector.class);
078      gTableMap.map(null, result, outputCollectorMock, reporter);
079      verify(result).listCells();
080      verifyNoInteractions(outputCollectorMock);
081    } finally {
082      if (gTableMap != null) gTableMap.close();
083    }
084  }
085
086  @Test
087  @SuppressWarnings("unchecked")
088  public void shouldCreateNewKeyAlthoughExtraKey() throws Exception {
089    GroupingTableMap gTableMap = null;
090    try {
091      Result result = mock(Result.class);
092      Reporter reporter = mock(Reporter.class);
093      gTableMap = new GroupingTableMap();
094      Configuration cfg = new Configuration();
095      cfg.set(GroupingTableMap.GROUP_COLUMNS, "familyA:qualifierA familyB:qualifierB");
096      JobConf jobConf = new JobConf(cfg);
097      gTableMap.configure(jobConf);
098
099      byte[] row = {};
100      List<Cell> keyValues = ImmutableList.<Cell> of(
101        new KeyValue(row, Bytes.toBytes("familyA"), Bytes.toBytes("qualifierA"),
102          Bytes.toBytes("1111")),
103        new KeyValue(row, Bytes.toBytes("familyB"), Bytes.toBytes("qualifierB"),
104          Bytes.toBytes("2222")),
105        new KeyValue(row, Bytes.toBytes("familyC"), Bytes.toBytes("qualifierC"),
106          Bytes.toBytes("3333")));
107      when(result.listCells()).thenReturn(keyValues);
108      OutputCollector<ImmutableBytesWritable, Result> outputCollectorMock =
109        mock(OutputCollector.class);
110      gTableMap.map(null, result, outputCollectorMock, reporter);
111      verify(result).listCells();
112      verify(outputCollectorMock, times(1)).collect(any(), any());
113      verifyNoMoreInteractions(outputCollectorMock);
114    } finally {
115      if (gTableMap != null) gTableMap.close();
116    }
117  }
118
119  @Test
120  public void shouldCreateNewKey() throws Exception {
121    GroupingTableMap gTableMap = null;
122    try {
123      Result result = mock(Result.class);
124      Reporter reporter = mock(Reporter.class);
125      final byte[] bSeparator = Bytes.toBytes(" ");
126      gTableMap = new GroupingTableMap();
127      Configuration cfg = new Configuration();
128      cfg.set(GroupingTableMap.GROUP_COLUMNS, "familyA:qualifierA familyB:qualifierB");
129      JobConf jobConf = new JobConf(cfg);
130      gTableMap.configure(jobConf);
131
132      final byte[] firstPartKeyValue = Bytes.toBytes("34879512738945");
133      final byte[] secondPartKeyValue = Bytes.toBytes("35245142671437");
134      byte[] row = {};
135      List<Cell> cells = ImmutableList.<Cell> of(
136        new KeyValue(row, Bytes.toBytes("familyA"), Bytes.toBytes("qualifierA"), firstPartKeyValue),
137        new KeyValue(row, Bytes.toBytes("familyB"), Bytes.toBytes("qualifierB"),
138          secondPartKeyValue));
139      when(result.listCells()).thenReturn(cells);
140
141      final AtomicBoolean outputCollected = new AtomicBoolean();
142      OutputCollector<ImmutableBytesWritable, Result> outputCollector =
143        new OutputCollector<ImmutableBytesWritable, Result>() {
144          @Override
145          public void collect(ImmutableBytesWritable arg, Result result) throws IOException {
146            assertArrayEquals(org.apache.hbase.thirdparty.com.google.common.primitives.Bytes
147              .concat(firstPartKeyValue, bSeparator, secondPartKeyValue), arg.copyBytes());
148            outputCollected.set(true);
149          }
150        };
151
152      gTableMap.map(null, result, outputCollector, reporter);
153      verify(result).listCells();
154      assertTrue(outputCollected.get(), "Output not received");
155
156      final byte[] firstPartValue = Bytes.toBytes("238947928");
157      final byte[] secondPartValue = Bytes.toBytes("4678456942345");
158      byte[][] data = { firstPartValue, secondPartValue };
159      ImmutableBytesWritable byteWritable = gTableMap.createGroupKey(data);
160      assertArrayEquals(org.apache.hbase.thirdparty.com.google.common.primitives.Bytes
161        .concat(firstPartValue, bSeparator, secondPartValue), byteWritable.get());
162    } finally {
163      if (gTableMap != null) gTableMap.close();
164    }
165  }
166
167  @Test
168  public void shouldReturnNullFromCreateGroupKey() throws Exception {
169    GroupingTableMap gTableMap = null;
170    try {
171      gTableMap = new GroupingTableMap();
172      assertNull(gTableMap.createGroupKey(null));
173    } finally {
174      if (gTableMap != null) gTableMap.close();
175    }
176  }
177}