001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.coprocessor;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022
023import com.google.protobuf.ByteString;
024import com.google.protobuf.ServiceException;
025import java.util.Collections;
026import java.util.Map;
027import java.util.TreeMap;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.hbase.HBaseClassTestRule;
030import org.apache.hadoop.hbase.HBaseTestingUtility;
031import org.apache.hadoop.hbase.HColumnDescriptor;
032import org.apache.hadoop.hbase.HTableDescriptor;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.hadoop.hbase.client.Admin;
035import org.apache.hadoop.hbase.client.Put;
036import org.apache.hadoop.hbase.client.Table;
037import org.apache.hadoop.hbase.client.coprocessor.Batch;
038import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationProtos;
039import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationProtos.SumResponse;
040import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithErrorsProtos;
041import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithErrorsProtos.ColumnAggregationWithErrorsSumRequest;
042import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithErrorsProtos.ColumnAggregationWithErrorsSumResponse;
043import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithNullResponseProtos.ColumnAggregationNullResponseSumRequest;
044import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithNullResponseProtos.ColumnAggregationNullResponseSumResponse;
045import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithNullResponseProtos.ColumnAggregationServiceNullResponse;
046import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
047import org.apache.hadoop.hbase.testclassification.MediumTests;
048import org.apache.hadoop.hbase.util.Bytes;
049import org.junit.AfterClass;
050import org.junit.BeforeClass;
051import org.junit.ClassRule;
052import org.junit.Test;
053import org.junit.experimental.categories.Category;
054import org.slf4j.Logger;
055import org.slf4j.LoggerFactory;
056
057/**
058 * TestEndpoint: test cases to verify the batch execution of coprocessor Endpoint
059 */
060@Category({CoprocessorTests.class, MediumTests.class})
061public class TestBatchCoprocessorEndpoint {
062
063  @ClassRule
064  public static final HBaseClassTestRule CLASS_RULE =
065      HBaseClassTestRule.forClass(TestBatchCoprocessorEndpoint.class);
066
067  private static final Logger LOG = LoggerFactory.getLogger(TestBatchCoprocessorEndpoint.class);
068
069  private static final TableName TEST_TABLE =
070      TableName.valueOf("TestTable");
071  private static final byte[] TEST_FAMILY = Bytes.toBytes("TestFamily");
072  private static final byte[] TEST_QUALIFIER = Bytes.toBytes("TestQualifier");
073  private static byte[] ROW = Bytes.toBytes("testRow");
074
075  private static final int ROWSIZE = 20;
076  private static final int rowSeperator1 = 5;
077  private static final int rowSeperator2 = 12;
078  private static byte[][] ROWS = makeN(ROW, ROWSIZE);
079
080  private static HBaseTestingUtility util = new HBaseTestingUtility();
081
082  @BeforeClass
083  public static void setupBeforeClass() throws Exception {
084    // set configure to indicate which cp should be loaded
085    Configuration conf = util.getConfiguration();
086    conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
087        org.apache.hadoop.hbase.coprocessor.ColumnAggregationEndpoint.class.getName(),
088        ProtobufCoprocessorService.class.getName(),
089        ColumnAggregationEndpointWithErrors.class.getName(),
090        ColumnAggregationEndpointNullResponse.class.getName());
091    conf.setStrings(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY,
092        ProtobufCoprocessorService.class.getName());
093    util.startMiniCluster(2);
094    Admin admin = util.getAdmin();
095    HTableDescriptor desc = new HTableDescriptor(TEST_TABLE);
096    desc.addFamily(new HColumnDescriptor(TEST_FAMILY));
097    admin.createTable(desc, new byte[][]{ROWS[rowSeperator1], ROWS[rowSeperator2]});
098    util.waitUntilAllRegionsAssigned(TEST_TABLE);
099    admin.close();
100
101    Table table = util.getConnection().getTable(TEST_TABLE);
102    for (int i = 0; i < ROWSIZE; i++) {
103      Put put = new Put(ROWS[i]);
104      put.addColumn(TEST_FAMILY, TEST_QUALIFIER, Bytes.toBytes(i));
105      table.put(put);
106    }
107    table.close();
108  }
109
110  @AfterClass
111  public static void tearDownAfterClass() throws Exception {
112    util.shutdownMiniCluster();
113  }
114
115  @Test
116  public void testAggregationNullResponse() throws Throwable {
117    Table table = util.getConnection().getTable(TEST_TABLE);
118    ColumnAggregationNullResponseSumRequest.Builder builder =
119        ColumnAggregationNullResponseSumRequest
120        .newBuilder();
121    builder.setFamily(ByteString.copyFrom(TEST_FAMILY));
122    if (TEST_QUALIFIER != null && TEST_QUALIFIER.length > 0) {
123      builder.setQualifier(ByteString.copyFrom(TEST_QUALIFIER));
124    }
125    Map<byte[], ColumnAggregationNullResponseSumResponse> results =
126        table.batchCoprocessorService(
127            ColumnAggregationServiceNullResponse.getDescriptor().findMethodByName("sum"),
128            builder.build(), ROWS[0], ROWS[ROWS.length - 1],
129            ColumnAggregationNullResponseSumResponse.getDefaultInstance());
130
131    int sumResult = 0;
132    int expectedResult = 0;
133    for (Map.Entry<byte[], ColumnAggregationNullResponseSumResponse> e :
134        results.entrySet()) {
135      LOG.info("Got value " + e.getValue().getSum() + " for region "
136          + Bytes.toStringBinary(e.getKey()));
137      sumResult += e.getValue().getSum();
138    }
139    for (int i = 0; i < rowSeperator2; i++) {
140      expectedResult += i;
141    }
142    assertEquals("Invalid result", expectedResult, sumResult);
143    table.close();
144  }
145
146  private static byte[][] makeN(byte[] base, int n) {
147    byte[][] ret = new byte[n][];
148    for (int i = 0; i < n; i++) {
149      ret[i] = Bytes.add(base, Bytes.toBytes(String.format("%02d", i)));
150    }
151    return ret;
152  }
153
154  private Map<byte[], SumResponse> sum(final Table table, final byte[] family,
155      final byte[] qualifier, final byte[] start, final byte[] end) throws ServiceException,
156      Throwable {
157    ColumnAggregationProtos.SumRequest.Builder builder = ColumnAggregationProtos.SumRequest
158        .newBuilder();
159    builder.setFamily(ByteString.copyFrom(family));
160    if (qualifier != null && qualifier.length > 0) {
161      builder.setQualifier(ByteString.copyFrom(qualifier));
162    }
163    return table.batchCoprocessorService(
164        ColumnAggregationProtos.ColumnAggregationService.getDescriptor().findMethodByName("sum"),
165        builder.build(), start, end, ColumnAggregationProtos.SumResponse.getDefaultInstance());
166  }
167
168  @Test
169  public void testAggregationWithReturnValue() throws Throwable {
170    Table table = util.getConnection().getTable(TEST_TABLE);
171    Map<byte[], SumResponse> results = sum(table, TEST_FAMILY, TEST_QUALIFIER, ROWS[0],
172        ROWS[ROWS.length - 1]);
173    int sumResult = 0;
174    int expectedResult = 0;
175    for (Map.Entry<byte[], SumResponse> e : results.entrySet()) {
176      LOG.info("Got value " + e.getValue().getSum() + " for region "
177          + Bytes.toStringBinary(e.getKey()));
178      sumResult += e.getValue().getSum();
179    }
180    for (int i = 0; i < ROWSIZE; i++) {
181      expectedResult += i;
182    }
183    assertEquals("Invalid result", expectedResult, sumResult);
184
185    results.clear();
186
187    // scan: for region 2 and region 3
188    results = sum(table, TEST_FAMILY, TEST_QUALIFIER, ROWS[rowSeperator1],
189        ROWS[ROWS.length - 1]);
190    sumResult = 0;
191    expectedResult = 0;
192    for (Map.Entry<byte[], SumResponse> e : results.entrySet()) {
193      LOG.info("Got value " + e.getValue().getSum() + " for region "
194          + Bytes.toStringBinary(e.getKey()));
195      sumResult += e.getValue().getSum();
196    }
197    for (int i = rowSeperator1; i < ROWSIZE; i++) {
198      expectedResult += i;
199    }
200    assertEquals("Invalid result", expectedResult, sumResult);
201    table.close();
202  }
203
204  @Test
205  public void testAggregation() throws Throwable {
206    Table table = util.getConnection().getTable(TEST_TABLE);
207    Map<byte[], SumResponse> results = sum(table, TEST_FAMILY, TEST_QUALIFIER,
208        ROWS[0], ROWS[ROWS.length - 1]);
209    int sumResult = 0;
210    int expectedResult = 0;
211    for (Map.Entry<byte[], SumResponse> e : results.entrySet()) {
212      LOG.info("Got value " + e.getValue().getSum() + " for region "
213          + Bytes.toStringBinary(e.getKey()));
214      sumResult += e.getValue().getSum();
215    }
216    for (int i = 0; i < ROWSIZE; i++) {
217      expectedResult += i;
218    }
219    assertEquals("Invalid result", expectedResult, sumResult);
220
221    // scan: for region 2 and region 3
222    results = sum(table, TEST_FAMILY, TEST_QUALIFIER, ROWS[rowSeperator1], ROWS[ROWS.length - 1]);
223    sumResult = 0;
224    expectedResult = 0;
225    for (Map.Entry<byte[], SumResponse> e : results.entrySet()) {
226      LOG.info("Got value " + e.getValue().getSum() + " for region "
227          + Bytes.toStringBinary(e.getKey()));
228      sumResult += e.getValue().getSum();
229    }
230    for (int i = rowSeperator1; i < ROWSIZE; i++) {
231      expectedResult += i;
232    }
233    assertEquals("Invalid result", expectedResult, sumResult);
234    table.close();
235  }
236
237  @Test
238  public void testAggregationWithErrors() throws Throwable {
239    Table table = util.getConnection().getTable(TEST_TABLE);
240    final Map<byte[], ColumnAggregationWithErrorsSumResponse> results =
241        Collections.synchronizedMap(
242            new TreeMap<byte[], ColumnAggregationWithErrorsSumResponse>(
243                Bytes.BYTES_COMPARATOR
244            ));
245    ColumnAggregationWithErrorsSumRequest.Builder builder =
246        ColumnAggregationWithErrorsSumRequest
247        .newBuilder();
248    builder.setFamily(ByteString.copyFrom(TEST_FAMILY));
249    if (TEST_QUALIFIER != null && TEST_QUALIFIER.length > 0) {
250      builder.setQualifier(ByteString.copyFrom(TEST_QUALIFIER));
251    }
252
253    boolean hasError = false;
254    try {
255      table.batchCoprocessorService(
256          ColumnAggregationWithErrorsProtos.ColumnAggregationServiceWithErrors.getDescriptor()
257              .findMethodByName("sum"),
258          builder.build(), ROWS[0], ROWS[ROWS.length - 1],
259          ColumnAggregationWithErrorsSumResponse.getDefaultInstance(),
260          new Batch.Callback<ColumnAggregationWithErrorsSumResponse>() {
261
262            @Override
263            public void update(byte[] region, byte[] row,
264                ColumnAggregationWithErrorsSumResponse result) {
265              results.put(region, result);
266            }
267          });
268    } catch (Throwable t) {
269      LOG.info("Exceptions in coprocessor service", t);
270      hasError = true;
271    }
272
273    int sumResult = 0;
274    int expectedResult = 0;
275    for (Map.Entry<byte[], ColumnAggregationWithErrorsSumResponse> e : results.entrySet()) {
276      LOG.info("Got value " + e.getValue().getSum() + " for region "
277          + Bytes.toStringBinary(e.getKey()));
278      sumResult += e.getValue().getSum();
279    }
280    for (int i = 0; i < rowSeperator2; i++) {
281      expectedResult += i;
282    }
283    assertEquals("Invalid result", expectedResult, sumResult);
284    assertTrue(hasError);
285    table.close();
286  }
287}