001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.coprocessor;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertNotNull;
022import static org.junit.Assert.assertNull;
023import static org.junit.Assert.assertTrue;
024import static org.junit.Assert.fail;
025
026import com.google.protobuf.RpcController;
027import com.google.protobuf.ServiceException;
028import java.io.IOException;
029import java.util.Collections;
030import java.util.List;
031import java.util.Map;
032import java.util.TreeMap;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.hbase.HBaseClassTestRule;
035import org.apache.hadoop.hbase.HBaseTestingUtility;
036import org.apache.hadoop.hbase.HColumnDescriptor;
037import org.apache.hadoop.hbase.HConstants;
038import org.apache.hadoop.hbase.HRegionInfo;
039import org.apache.hadoop.hbase.HRegionLocation;
040import org.apache.hadoop.hbase.HTableDescriptor;
041import org.apache.hadoop.hbase.TableName;
042import org.apache.hadoop.hbase.client.Admin;
043import org.apache.hadoop.hbase.client.Put;
044import org.apache.hadoop.hbase.client.RegionLocator;
045import org.apache.hadoop.hbase.client.Table;
046import org.apache.hadoop.hbase.client.coprocessor.Batch;
047import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationProtos;
048import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
049import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
050import org.apache.hadoop.hbase.ipc.ServerRpcController;
051import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos;
052import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
053import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
054import org.apache.hadoop.hbase.testclassification.MediumTests;
055import org.apache.hadoop.hbase.util.ByteStringer;
056import org.apache.hadoop.hbase.util.Bytes;
057import org.junit.AfterClass;
058import org.junit.BeforeClass;
059import org.junit.ClassRule;
060import org.junit.Test;
061import org.junit.experimental.categories.Category;
062import org.slf4j.Logger;
063import org.slf4j.LoggerFactory;
064
065/**
066 * TestEndpoint: test cases to verify coprocessor Endpoint
067 */
068@Category({CoprocessorTests.class, MediumTests.class})
069public class TestCoprocessorEndpoint {
070  @ClassRule
071  public static final HBaseClassTestRule CLASS_RULE =
072      HBaseClassTestRule.forClass(TestCoprocessorEndpoint.class);
073
074  private static final Logger LOG = LoggerFactory.getLogger(TestCoprocessorEndpoint.class);
075
076  private static final TableName TEST_TABLE =
077      TableName.valueOf("TestCoprocessorEndpoint");
078  private static final byte[] TEST_FAMILY = Bytes.toBytes("TestFamily");
079  private static final byte[] TEST_QUALIFIER = Bytes.toBytes("TestQualifier");
080  private static byte[] ROW = Bytes.toBytes("testRow");
081
082  private static final int ROWSIZE = 20;
083  private static final int rowSeperator1 = 5;
084  private static final int rowSeperator2 = 12;
085  private static byte[][] ROWS = makeN(ROW, ROWSIZE);
086
087  private static HBaseTestingUtility util = new HBaseTestingUtility();
088
089  @BeforeClass
090  public static void setupBeforeClass() throws Exception {
091    // set configure to indicate which cp should be loaded
092    Configuration conf = util.getConfiguration();
093    conf.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 5000);
094    conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
095        org.apache.hadoop.hbase.coprocessor.ColumnAggregationEndpoint.class.getName(),
096        ProtobufCoprocessorService.class.getName());
097    conf.setStrings(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY,
098        ProtobufCoprocessorService.class.getName());
099    util.startMiniCluster(2);
100    Admin admin = util.getAdmin();
101    HTableDescriptor desc = new HTableDescriptor(TEST_TABLE);
102    desc.addFamily(new HColumnDescriptor(TEST_FAMILY));
103    admin.createTable(desc, new byte[][]{ROWS[rowSeperator1], ROWS[rowSeperator2]});
104    util.waitUntilAllRegionsAssigned(TEST_TABLE);
105
106    Table table = util.getConnection().getTable(TEST_TABLE);
107    for (int i = 0; i < ROWSIZE; i++) {
108      Put put = new Put(ROWS[i]);
109      put.addColumn(TEST_FAMILY, TEST_QUALIFIER, Bytes.toBytes(i));
110      table.put(put);
111    }
112    table.close();
113  }
114
115  @AfterClass
116  public static void tearDownAfterClass() throws Exception {
117    util.shutdownMiniCluster();
118  }
119
120  private Map<byte [], Long> sum(final Table table, final byte [] family,
121          final byte [] qualifier, final byte [] start, final byte [] end)
122          throws ServiceException, Throwable {
123    return table.coprocessorService(ColumnAggregationProtos.ColumnAggregationService.class,
124        start, end,
125      new Batch.Call<ColumnAggregationProtos.ColumnAggregationService, Long>() {
126        @Override
127        public Long call(ColumnAggregationProtos.ColumnAggregationService instance)
128          throws IOException {
129          CoprocessorRpcUtils.BlockingRpcCallback<ColumnAggregationProtos.SumResponse> rpcCallback =
130              new CoprocessorRpcUtils.BlockingRpcCallback<>();
131          ColumnAggregationProtos.SumRequest.Builder builder =
132            ColumnAggregationProtos.SumRequest.newBuilder();
133          builder.setFamily(ByteStringer.wrap(family));
134          if (qualifier != null && qualifier.length > 0) {
135            builder.setQualifier(ByteStringer.wrap(qualifier));
136          }
137          instance.sum(null, builder.build(), rpcCallback);
138          return rpcCallback.get().getSum();
139        }
140      });
141  }
142
143  @Test
144  public void testAggregation() throws Throwable {
145    Table table = util.getConnection().getTable(TEST_TABLE);
146    Map<byte[], Long> results = sum(table, TEST_FAMILY, TEST_QUALIFIER,
147      ROWS[0], ROWS[ROWS.length-1]);
148    int sumResult = 0;
149    int expectedResult = 0;
150    for (Map.Entry<byte[], Long> e : results.entrySet()) {
151      LOG.info("Got value "+e.getValue()+" for region "+Bytes.toStringBinary(e.getKey()));
152      sumResult += e.getValue();
153    }
154    for (int i = 0; i < ROWSIZE; i++) {
155      expectedResult += i;
156    }
157    assertEquals("Invalid result", expectedResult, sumResult);
158
159    results.clear();
160
161    // scan: for region 2 and region 3
162    results = sum(table, TEST_FAMILY, TEST_QUALIFIER,
163      ROWS[rowSeperator1], ROWS[ROWS.length-1]);
164    sumResult = 0;
165    expectedResult = 0;
166    for (Map.Entry<byte[], Long> e : results.entrySet()) {
167      LOG.info("Got value "+e.getValue()+" for region "+Bytes.toStringBinary(e.getKey()));
168      sumResult += e.getValue();
169    }
170    for (int i = rowSeperator1; i < ROWSIZE; i++) {
171      expectedResult += i;
172    }
173    assertEquals("Invalid result", expectedResult, sumResult);
174    table.close();
175  }
176
177  @Test
178  public void testCoprocessorService() throws Throwable {
179    Table table = util.getConnection().getTable(TEST_TABLE);
180
181    List<HRegionLocation> regions;
182    try(RegionLocator rl = util.getConnection().getRegionLocator(TEST_TABLE)) {
183      regions = rl.getAllRegionLocations();
184    }
185    final TestProtos.EchoRequestProto request =
186        TestProtos.EchoRequestProto.newBuilder().setMessage("hello").build();
187    final Map<byte[], String> results = Collections.synchronizedMap(
188        new TreeMap<byte[], String>(Bytes.BYTES_COMPARATOR));
189    try {
190      // scan: for all regions
191      final RpcController controller = new ServerRpcController();
192      table.coprocessorService(TestRpcServiceProtos.TestProtobufRpcProto.class,
193        ROWS[0], ROWS[ROWS.length - 1],
194        new Batch.Call<TestRpcServiceProtos.TestProtobufRpcProto, TestProtos.EchoResponseProto>() {
195          @Override
196          public TestProtos.EchoResponseProto call(
197              TestRpcServiceProtos.TestProtobufRpcProto instance) throws IOException {
198            LOG.debug("Default response is " + TestProtos.EchoRequestProto.getDefaultInstance());
199            CoprocessorRpcUtils.BlockingRpcCallback<TestProtos.EchoResponseProto> callback =
200                new CoprocessorRpcUtils.BlockingRpcCallback<>();
201            instance.echo(controller, request, callback);
202            TestProtos.EchoResponseProto response = callback.get();
203            LOG.debug("Batch.Call returning result " + response);
204            return response;
205          }
206        },
207        new Batch.Callback<TestProtos.EchoResponseProto>() {
208          @Override
209          public void update(byte[] region, byte[] row, TestProtos.EchoResponseProto result) {
210            assertNotNull(result);
211            assertEquals("hello", result.getMessage());
212            results.put(region, result.getMessage());
213          }
214        }
215      );
216      for (Map.Entry<byte[], String> e : results.entrySet()) {
217        LOG.info("Got value "+e.getValue()+" for region "+Bytes.toStringBinary(e.getKey()));
218      }
219      assertEquals(3, results.size());
220      for (HRegionLocation info : regions) {
221        LOG.info("Region info is "+info.getRegionInfo().getRegionNameAsString());
222        assertTrue(results.containsKey(info.getRegionInfo().getRegionName()));
223      }
224      results.clear();
225
226      // scan: for region 2 and region 3
227      table.coprocessorService(TestRpcServiceProtos.TestProtobufRpcProto.class,
228        ROWS[rowSeperator1], ROWS[ROWS.length - 1],
229        new Batch.Call<TestRpcServiceProtos.TestProtobufRpcProto, TestProtos.EchoResponseProto>() {
230          @Override
231          public TestProtos.EchoResponseProto call(
232              TestRpcServiceProtos.TestProtobufRpcProto instance) throws IOException {
233            LOG.debug("Default response is " + TestProtos.EchoRequestProto.getDefaultInstance());
234            CoprocessorRpcUtils.BlockingRpcCallback<TestProtos.EchoResponseProto> callback =
235                new CoprocessorRpcUtils.BlockingRpcCallback<>();
236            instance.echo(controller, request, callback);
237            TestProtos.EchoResponseProto response = callback.get();
238            LOG.debug("Batch.Call returning result " + response);
239            return response;
240          }
241        },
242        new Batch.Callback<TestProtos.EchoResponseProto>() {
243          @Override
244          public void update(byte[] region, byte[] row, TestProtos.EchoResponseProto result) {
245            assertNotNull(result);
246            assertEquals("hello", result.getMessage());
247            results.put(region, result.getMessage());
248          }
249        }
250      );
251      for (Map.Entry<byte[], String> e : results.entrySet()) {
252        LOG.info("Got value "+e.getValue()+" for region "+Bytes.toStringBinary(e.getKey()));
253      }
254      assertEquals(2, results.size());
255    } finally {
256      table.close();
257    }
258  }
259
260  @Test
261  public void testCoprocessorServiceNullResponse() throws Throwable {
262    Table table = util.getConnection().getTable(TEST_TABLE);
263    List<HRegionLocation> regions;
264    try(RegionLocator rl = util.getConnection().getRegionLocator(TEST_TABLE)) {
265      regions = rl.getAllRegionLocations();
266    }
267
268    final TestProtos.EchoRequestProto request =
269        TestProtos.EchoRequestProto.newBuilder().setMessage("hello").build();
270    try {
271      // scan: for all regions
272      final RpcController controller = new ServerRpcController();
273      // test that null results are supported
274      Map<byte[], String> results =
275            table.coprocessorService(TestRpcServiceProtos.TestProtobufRpcProto.class,
276          ROWS[0], ROWS[ROWS.length - 1],
277          new Batch.Call<TestRpcServiceProtos.TestProtobufRpcProto, String>() {
278            public String call(TestRpcServiceProtos.TestProtobufRpcProto instance)
279                throws IOException {
280              CoprocessorRpcUtils.BlockingRpcCallback<TestProtos.EchoResponseProto> callback =
281                  new CoprocessorRpcUtils.BlockingRpcCallback<>();
282              instance.echo(controller, request, callback);
283              TestProtos.EchoResponseProto response = callback.get();
284              LOG.debug("Batch.Call got result " + response);
285              return null;
286            }
287          }
288      );
289      for (Map.Entry<byte[], String> e : results.entrySet()) {
290        LOG.info("Got value "+e.getValue()+" for region "+Bytes.toStringBinary(e.getKey()));
291      }
292      assertEquals(3, results.size());
293      for (HRegionLocation region : regions) {
294        HRegionInfo info = region.getRegionInfo();
295        LOG.info("Region info is "+info.getRegionNameAsString());
296        assertTrue(results.containsKey(info.getRegionName()));
297        assertNull(results.get(info.getRegionName()));
298      }
299    } finally {
300      table.close();
301    }
302  }
303
304  @Test
305  public void testMasterCoprocessorService() throws Throwable {
306    Admin admin = util.getAdmin();
307    final TestProtos.EchoRequestProto request =
308        TestProtos.EchoRequestProto.newBuilder().setMessage("hello").build();
309    TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface service =
310        TestRpcServiceProtos.TestProtobufRpcProto.newBlockingStub(admin.coprocessorService());
311    assertEquals("hello", service.echo(null, request).getMessage());
312  }
313
314  @Test
315  public void testCoprocessorError() throws Exception {
316    Configuration configuration = new Configuration(util.getConfiguration());
317    // Make it not retry forever
318    configuration.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
319    Table table = util.getConnection().getTable(TEST_TABLE);
320
321    try {
322      CoprocessorRpcChannel protocol = table.coprocessorService(ROWS[0]);
323
324      TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface service =
325          TestRpcServiceProtos.TestProtobufRpcProto.newBlockingStub(protocol);
326
327      service.error(null, TestProtos.EmptyRequestProto.getDefaultInstance());
328      fail("Should have thrown an exception");
329    } catch (ServiceException e) {
330    } finally {
331      table.close();
332    }
333  }
334
335  @Test
336  public void testMasterCoprocessorError() throws Throwable {
337    Admin admin = util.getAdmin();
338    TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface service =
339        TestRpcServiceProtos.TestProtobufRpcProto.newBlockingStub(admin.coprocessorService());
340    try {
341      service.error(null, TestProtos.EmptyRequestProto.getDefaultInstance());
342      fail("Should have thrown an exception");
343    } catch (ServiceException e) {
344    }
345  }
346
347  private static byte[][] makeN(byte[] base, int n) {
348    byte[][] ret = new byte[n][];
349    for (int i = 0; i < n; i++) {
350      ret[i] = Bytes.add(base, Bytes.toBytes(String.format("%02d", i)));
351    }
352    return ret;
353  }
354}