001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.coprocessor;
019
020import static org.junit.Assert.assertEquals;
021
022import com.google.protobuf.ByteString;
023import com.google.protobuf.ServiceException;
024import java.io.IOException;
025import java.util.Map;
026import org.apache.hadoop.hbase.HBaseClassTestRule;
027import org.apache.hadoop.hbase.HBaseTestingUtility;
028import org.apache.hadoop.hbase.HColumnDescriptor;
029import org.apache.hadoop.hbase.HTableDescriptor;
030import org.apache.hadoop.hbase.TableName;
031import org.apache.hadoop.hbase.client.Admin;
032import org.apache.hadoop.hbase.client.Put;
033import org.apache.hadoop.hbase.client.Table;
034import org.apache.hadoop.hbase.client.coprocessor.Batch;
035import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationProtos;
036import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
037import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
038import org.apache.hadoop.hbase.testclassification.MediumTests;
039import org.apache.hadoop.hbase.util.Bytes;
040import org.junit.AfterClass;
041import org.junit.BeforeClass;
042import org.junit.ClassRule;
043import org.junit.Rule;
044import org.junit.Test;
045import org.junit.experimental.categories.Category;
046import org.junit.rules.TestName;
047
048@Category({CoprocessorTests.class, MediumTests.class})
049public class TestCoprocessorTableEndpoint {
050  @ClassRule
051  public static final HBaseClassTestRule CLASS_RULE =
052      HBaseClassTestRule.forClass(TestCoprocessorTableEndpoint.class);
053
054  private static final byte[] TEST_FAMILY = Bytes.toBytes("TestFamily");
055  private static final byte[] TEST_QUALIFIER = Bytes.toBytes("TestQualifier");
056  private static final byte[] ROW = Bytes.toBytes("testRow");
057  private static final int ROWSIZE = 20;
058  private static final int rowSeperator1 = 5;
059  private static final int rowSeperator2 = 12;
060  private static final byte[][] ROWS = makeN(ROW, ROWSIZE);
061
062  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
063
064  @Rule
065  public TestName name = new TestName();
066
067  @BeforeClass
068  public static void setupBeforeClass() throws Exception {
069    TEST_UTIL.startMiniCluster(2);
070  }
071
072  @AfterClass
073  public static void tearDownAfterClass() throws Exception {
074    TEST_UTIL.shutdownMiniCluster();
075  }
076
077  @Test
078  public void testCoprocessorTableEndpoint() throws Throwable {
079    final TableName tableName = TableName.valueOf(name.getMethodName());
080
081    HTableDescriptor desc = new HTableDescriptor(tableName);
082    desc.addFamily(new HColumnDescriptor(TEST_FAMILY));
083    desc.addCoprocessor(ColumnAggregationEndpoint.class.getName());
084
085    createTable(desc);
086    verifyTable(tableName);
087  }
088
089  @Test
090  public void testDynamicCoprocessorTableEndpoint() throws Throwable {
091    final TableName tableName = TableName.valueOf(name.getMethodName());
092
093    HTableDescriptor desc = new HTableDescriptor(tableName);
094    desc.addFamily(new HColumnDescriptor(TEST_FAMILY));
095
096    createTable(desc);
097
098    desc.addCoprocessor(ColumnAggregationEndpoint.class.getName());
099    updateTable(desc);
100
101    verifyTable(tableName);
102  }
103
104  private static byte[][] makeN(byte[] base, int n) {
105    byte[][] ret = new byte[n][];
106    for (int i = 0; i < n; i++) {
107      ret[i] = Bytes.add(base, Bytes.toBytes(String.format("%02d", i)));
108    }
109    return ret;
110  }
111
112  private static Map<byte [], Long> sum(final Table table, final byte [] family,
113    final byte [] qualifier, final byte [] start, final byte [] end)
114      throws ServiceException, Throwable {
115    return table.coprocessorService(ColumnAggregationProtos.ColumnAggregationService.class,
116      start, end,
117      new Batch.Call<ColumnAggregationProtos.ColumnAggregationService, Long>() {
118        @Override
119        public Long call(ColumnAggregationProtos.ColumnAggregationService instance)
120          throws IOException {
121          CoprocessorRpcUtils.BlockingRpcCallback<ColumnAggregationProtos.SumResponse> rpcCallback =
122              new CoprocessorRpcUtils.BlockingRpcCallback<>();
123          ColumnAggregationProtos.SumRequest.Builder builder =
124            ColumnAggregationProtos.SumRequest.newBuilder();
125          builder.setFamily(ByteString.copyFrom(family));
126          if (qualifier != null && qualifier.length > 0) {
127            builder.setQualifier(ByteString.copyFrom(qualifier));
128          }
129          instance.sum(null, builder.build(), rpcCallback);
130          return rpcCallback.get().getSum();
131        }
132      });
133  }
134
135  private static final void createTable(HTableDescriptor desc) throws Exception {
136    Admin admin = TEST_UTIL.getAdmin();
137    admin.createTable(desc, new byte[][]{ROWS[rowSeperator1], ROWS[rowSeperator2]});
138    TEST_UTIL.waitUntilAllRegionsAssigned(desc.getTableName());
139    Table table = TEST_UTIL.getConnection().getTable(desc.getTableName());
140    try {
141      for (int i = 0; i < ROWSIZE; i++) {
142        Put put = new Put(ROWS[i]);
143        put.addColumn(TEST_FAMILY, TEST_QUALIFIER, Bytes.toBytes(i));
144        table.put(put);
145      }
146    } finally {
147      table.close();
148    }
149  }
150
151  private static void updateTable(HTableDescriptor desc) throws Exception {
152    Admin admin = TEST_UTIL.getAdmin();
153    admin.disableTable(desc.getTableName());
154    admin.modifyTable(desc.getTableName(), desc);
155    admin.enableTable(desc.getTableName());
156  }
157
158  private static final void verifyTable(TableName tableName) throws Throwable {
159    Table table = TEST_UTIL.getConnection().getTable(tableName);
160    try {
161      Map<byte[], Long> results = sum(table, TEST_FAMILY, TEST_QUALIFIER, ROWS[0],
162        ROWS[ROWS.length-1]);
163      int sumResult = 0;
164      int expectedResult = 0;
165      for (Map.Entry<byte[], Long> e : results.entrySet()) {
166        sumResult += e.getValue();
167      }
168      for (int i = 0; i < ROWSIZE; i++) {
169        expectedResult += i;
170      }
171      assertEquals("Invalid result", expectedResult, sumResult);
172
173      // scan: for region 2 and region 3
174      results.clear();
175      results = sum(table, TEST_FAMILY, TEST_QUALIFIER, ROWS[rowSeperator1], ROWS[ROWS.length-1]);
176      sumResult = 0;
177      expectedResult = 0;
178      for (Map.Entry<byte[], Long> e : results.entrySet()) {
179        sumResult += e.getValue();
180      }
181      for (int i = rowSeperator1; i < ROWSIZE; i++) {
182        expectedResult += i;
183      }
184      assertEquals("Invalid result", expectedResult, sumResult);
185    } finally {
186      table.close();
187    }
188  }
189}