001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.coprocessor; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import com.google.protobuf.ByteString; 024import com.google.protobuf.ServiceException; 025import java.util.Collections; 026import java.util.Map; 027import java.util.TreeMap; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.hbase.HBaseClassTestRule; 030import org.apache.hadoop.hbase.HBaseTestingUtility; 031import org.apache.hadoop.hbase.HColumnDescriptor; 032import org.apache.hadoop.hbase.HTableDescriptor; 033import org.apache.hadoop.hbase.TableName; 034import org.apache.hadoop.hbase.client.Admin; 035import org.apache.hadoop.hbase.client.Put; 036import org.apache.hadoop.hbase.client.Table; 037import org.apache.hadoop.hbase.client.coprocessor.Batch; 038import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationProtos; 039import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationProtos.SumResponse; 040import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithErrorsProtos; 041import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithErrorsProtos.ColumnAggregationWithErrorsSumRequest; 042import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithErrorsProtos.ColumnAggregationWithErrorsSumResponse; 043import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithNullResponseProtos.ColumnAggregationNullResponseSumRequest; 044import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithNullResponseProtos.ColumnAggregationNullResponseSumResponse; 045import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationWithNullResponseProtos.ColumnAggregationServiceNullResponse; 046import org.apache.hadoop.hbase.testclassification.CoprocessorTests; 047import org.apache.hadoop.hbase.testclassification.MediumTests; 048import org.apache.hadoop.hbase.util.Bytes; 049import org.junit.AfterClass; 050import org.junit.BeforeClass; 051import org.junit.ClassRule; 052import org.junit.Test; 053import org.junit.experimental.categories.Category; 054import org.slf4j.Logger; 055import org.slf4j.LoggerFactory; 056 057/** 058 * TestEndpoint: test cases to verify the batch execution of coprocessor Endpoint 059 */ 060@Category({CoprocessorTests.class, MediumTests.class}) 061public class TestBatchCoprocessorEndpoint { 062 063 @ClassRule 064 public static final HBaseClassTestRule CLASS_RULE = 065 HBaseClassTestRule.forClass(TestBatchCoprocessorEndpoint.class); 066 067 private static final Logger LOG = LoggerFactory.getLogger(TestBatchCoprocessorEndpoint.class); 068 069 private static final TableName TEST_TABLE = 070 TableName.valueOf("TestTable"); 071 private static final byte[] TEST_FAMILY = Bytes.toBytes("TestFamily"); 072 private static final byte[] TEST_QUALIFIER = Bytes.toBytes("TestQualifier"); 073 private static byte[] ROW = Bytes.toBytes("testRow"); 074 075 private static final int ROWSIZE = 20; 076 private static final int rowSeperator1 = 5; 077 private static final int rowSeperator2 = 12; 078 private static byte[][] ROWS = makeN(ROW, ROWSIZE); 079 080 private static HBaseTestingUtility util = new HBaseTestingUtility(); 081 082 @BeforeClass 083 public static void setupBeforeClass() throws Exception { 084 // set configure to indicate which cp should be loaded 085 Configuration conf = util.getConfiguration(); 086 conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, 087 org.apache.hadoop.hbase.coprocessor.ColumnAggregationEndpoint.class.getName(), 088 ProtobufCoprocessorService.class.getName(), 089 ColumnAggregationEndpointWithErrors.class.getName(), 090 ColumnAggregationEndpointNullResponse.class.getName()); 091 conf.setStrings(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, 092 ProtobufCoprocessorService.class.getName()); 093 util.startMiniCluster(2); 094 Admin admin = util.getAdmin(); 095 HTableDescriptor desc = new HTableDescriptor(TEST_TABLE); 096 desc.addFamily(new HColumnDescriptor(TEST_FAMILY)); 097 admin.createTable(desc, new byte[][]{ROWS[rowSeperator1], ROWS[rowSeperator2]}); 098 util.waitUntilAllRegionsAssigned(TEST_TABLE); 099 admin.close(); 100 101 Table table = util.getConnection().getTable(TEST_TABLE); 102 for (int i = 0; i < ROWSIZE; i++) { 103 Put put = new Put(ROWS[i]); 104 put.addColumn(TEST_FAMILY, TEST_QUALIFIER, Bytes.toBytes(i)); 105 table.put(put); 106 } 107 table.close(); 108 } 109 110 @AfterClass 111 public static void tearDownAfterClass() throws Exception { 112 util.shutdownMiniCluster(); 113 } 114 115 @Test 116 public void testAggregationNullResponse() throws Throwable { 117 Table table = util.getConnection().getTable(TEST_TABLE); 118 ColumnAggregationNullResponseSumRequest.Builder builder = 119 ColumnAggregationNullResponseSumRequest 120 .newBuilder(); 121 builder.setFamily(ByteString.copyFrom(TEST_FAMILY)); 122 if (TEST_QUALIFIER != null && TEST_QUALIFIER.length > 0) { 123 builder.setQualifier(ByteString.copyFrom(TEST_QUALIFIER)); 124 } 125 Map<byte[], ColumnAggregationNullResponseSumResponse> results = 126 table.batchCoprocessorService( 127 ColumnAggregationServiceNullResponse.getDescriptor().findMethodByName("sum"), 128 builder.build(), ROWS[0], ROWS[ROWS.length - 1], 129 ColumnAggregationNullResponseSumResponse.getDefaultInstance()); 130 131 int sumResult = 0; 132 int expectedResult = 0; 133 for (Map.Entry<byte[], ColumnAggregationNullResponseSumResponse> e : 134 results.entrySet()) { 135 LOG.info("Got value " + e.getValue().getSum() + " for region " 136 + Bytes.toStringBinary(e.getKey())); 137 sumResult += e.getValue().getSum(); 138 } 139 for (int i = 0; i < rowSeperator2; i++) { 140 expectedResult += i; 141 } 142 assertEquals("Invalid result", expectedResult, sumResult); 143 table.close(); 144 } 145 146 private static byte[][] makeN(byte[] base, int n) { 147 byte[][] ret = new byte[n][]; 148 for (int i = 0; i < n; i++) { 149 ret[i] = Bytes.add(base, Bytes.toBytes(String.format("%02d", i))); 150 } 151 return ret; 152 } 153 154 private Map<byte[], SumResponse> sum(final Table table, final byte[] family, 155 final byte[] qualifier, final byte[] start, final byte[] end) throws ServiceException, 156 Throwable { 157 ColumnAggregationProtos.SumRequest.Builder builder = ColumnAggregationProtos.SumRequest 158 .newBuilder(); 159 builder.setFamily(ByteString.copyFrom(family)); 160 if (qualifier != null && qualifier.length > 0) { 161 builder.setQualifier(ByteString.copyFrom(qualifier)); 162 } 163 return table.batchCoprocessorService( 164 ColumnAggregationProtos.ColumnAggregationService.getDescriptor().findMethodByName("sum"), 165 builder.build(), start, end, ColumnAggregationProtos.SumResponse.getDefaultInstance()); 166 } 167 168 @Test 169 public void testAggregationWithReturnValue() throws Throwable { 170 Table table = util.getConnection().getTable(TEST_TABLE); 171 Map<byte[], SumResponse> results = sum(table, TEST_FAMILY, TEST_QUALIFIER, ROWS[0], 172 ROWS[ROWS.length - 1]); 173 int sumResult = 0; 174 int expectedResult = 0; 175 for (Map.Entry<byte[], SumResponse> e : results.entrySet()) { 176 LOG.info("Got value " + e.getValue().getSum() + " for region " 177 + Bytes.toStringBinary(e.getKey())); 178 sumResult += e.getValue().getSum(); 179 } 180 for (int i = 0; i < ROWSIZE; i++) { 181 expectedResult += i; 182 } 183 assertEquals("Invalid result", expectedResult, sumResult); 184 185 results.clear(); 186 187 // scan: for region 2 and region 3 188 results = sum(table, TEST_FAMILY, TEST_QUALIFIER, ROWS[rowSeperator1], 189 ROWS[ROWS.length - 1]); 190 sumResult = 0; 191 expectedResult = 0; 192 for (Map.Entry<byte[], SumResponse> e : results.entrySet()) { 193 LOG.info("Got value " + e.getValue().getSum() + " for region " 194 + Bytes.toStringBinary(e.getKey())); 195 sumResult += e.getValue().getSum(); 196 } 197 for (int i = rowSeperator1; i < ROWSIZE; i++) { 198 expectedResult += i; 199 } 200 assertEquals("Invalid result", expectedResult, sumResult); 201 table.close(); 202 } 203 204 @Test 205 public void testAggregation() throws Throwable { 206 Table table = util.getConnection().getTable(TEST_TABLE); 207 Map<byte[], SumResponse> results = sum(table, TEST_FAMILY, TEST_QUALIFIER, 208 ROWS[0], ROWS[ROWS.length - 1]); 209 int sumResult = 0; 210 int expectedResult = 0; 211 for (Map.Entry<byte[], SumResponse> e : results.entrySet()) { 212 LOG.info("Got value " + e.getValue().getSum() + " for region " 213 + Bytes.toStringBinary(e.getKey())); 214 sumResult += e.getValue().getSum(); 215 } 216 for (int i = 0; i < ROWSIZE; i++) { 217 expectedResult += i; 218 } 219 assertEquals("Invalid result", expectedResult, sumResult); 220 221 // scan: for region 2 and region 3 222 results = sum(table, TEST_FAMILY, TEST_QUALIFIER, ROWS[rowSeperator1], ROWS[ROWS.length - 1]); 223 sumResult = 0; 224 expectedResult = 0; 225 for (Map.Entry<byte[], SumResponse> e : results.entrySet()) { 226 LOG.info("Got value " + e.getValue().getSum() + " for region " 227 + Bytes.toStringBinary(e.getKey())); 228 sumResult += e.getValue().getSum(); 229 } 230 for (int i = rowSeperator1; i < ROWSIZE; i++) { 231 expectedResult += i; 232 } 233 assertEquals("Invalid result", expectedResult, sumResult); 234 table.close(); 235 } 236 237 @Test 238 public void testAggregationWithErrors() throws Throwable { 239 Table table = util.getConnection().getTable(TEST_TABLE); 240 final Map<byte[], ColumnAggregationWithErrorsSumResponse> results = 241 Collections.synchronizedMap( 242 new TreeMap<byte[], ColumnAggregationWithErrorsSumResponse>( 243 Bytes.BYTES_COMPARATOR 244 )); 245 ColumnAggregationWithErrorsSumRequest.Builder builder = 246 ColumnAggregationWithErrorsSumRequest 247 .newBuilder(); 248 builder.setFamily(ByteString.copyFrom(TEST_FAMILY)); 249 if (TEST_QUALIFIER != null && TEST_QUALIFIER.length > 0) { 250 builder.setQualifier(ByteString.copyFrom(TEST_QUALIFIER)); 251 } 252 253 boolean hasError = false; 254 try { 255 table.batchCoprocessorService( 256 ColumnAggregationWithErrorsProtos.ColumnAggregationServiceWithErrors.getDescriptor() 257 .findMethodByName("sum"), 258 builder.build(), ROWS[0], ROWS[ROWS.length - 1], 259 ColumnAggregationWithErrorsSumResponse.getDefaultInstance(), 260 new Batch.Callback<ColumnAggregationWithErrorsSumResponse>() { 261 262 @Override 263 public void update(byte[] region, byte[] row, 264 ColumnAggregationWithErrorsSumResponse result) { 265 results.put(region, result); 266 } 267 }); 268 } catch (Throwable t) { 269 LOG.info("Exceptions in coprocessor service", t); 270 hasError = true; 271 } 272 273 int sumResult = 0; 274 int expectedResult = 0; 275 for (Map.Entry<byte[], ColumnAggregationWithErrorsSumResponse> e : results.entrySet()) { 276 LOG.info("Got value " + e.getValue().getSum() + " for region " 277 + Bytes.toStringBinary(e.getKey())); 278 sumResult += e.getValue().getSum(); 279 } 280 for (int i = 0; i < rowSeperator2; i++) { 281 expectedResult += i; 282 } 283 assertEquals("Invalid result", expectedResult, sumResult); 284 assertTrue(hasError); 285 table.close(); 286 } 287}